feat: add rpc call between sockets

This commit is contained in:
Steve Korshakov 2025-07-14 19:41:15 -07:00
parent e2854faaa8
commit 1f9bf24af3

View File

@ -355,6 +355,9 @@ export async function startApi() {
// Track connected users
const userSockets = new Map<string, Set<Socket>>();
// Track RPC listeners: Map<userId, Map<rpcName, Socket>>
const rpcListeners = new Map<string, Map<string, Socket>>();
io.on("connection", async (socket) => {
log({ module: 'websocket' }, `New connection attempt from socket: ${socket.id}`);
const token = socket.handshake.auth.token as string;
@ -412,6 +415,24 @@ export async function startApi() {
userSockets.delete(userId);
}
}
// Clean up RPC listeners for this socket
const userRpcMap = rpcListeners.get(userId);
if (userRpcMap) {
// Remove all RPC methods registered by this socket
const methodsToRemove: string[] = [];
for (const [method, registeredSocket] of userRpcMap.entries()) {
if (registeredSocket === socket) {
methodsToRemove.push(method);
}
}
methodsToRemove.forEach(method => userRpcMap.delete(method));
if (userRpcMap.size === 0) {
rpcListeners.delete(userId);
}
}
pubsub.off('update', updateHandler);
pubsub.off('update-ephemeral', updateEphemeralHandler);
log({ module: 'websocket' }, `User disconnected: ${userId}`);
@ -760,6 +781,126 @@ export async function startApi() {
callback({ result: 'success', version: result.newAgentStateVersion, agentState: agentState });
});
// RPC register - Register this socket as a listener for an RPC method
socket.on('rpc-register', async (data: any) => {
const { method } = data;
if (!method || typeof method !== 'string') {
socket.emit('rpc-error', { type: 'register', error: 'Invalid method name' });
return;
}
// Get or create user's RPC map
let userRpcMap = rpcListeners.get(userId);
if (!userRpcMap) {
userRpcMap = new Map<string, Socket>();
rpcListeners.set(userId, userRpcMap);
}
// Register this socket as the listener for this method
userRpcMap.set(method, socket);
socket.emit('rpc-registered', { method });
log({ module: 'websocket-rpc' }, `User ${userId} registered RPC method: ${method}`);
});
// RPC unregister - Remove this socket as a listener for an RPC method
socket.on('rpc-unregister', async (data: any) => {
const { method } = data;
if (!method || typeof method !== 'string') {
socket.emit('rpc-error', { type: 'unregister', error: 'Invalid method name' });
return;
}
const userRpcMap = rpcListeners.get(userId);
if (userRpcMap && userRpcMap.get(method) === socket) {
userRpcMap.delete(method);
if (userRpcMap.size === 0) {
rpcListeners.delete(userId);
}
}
socket.emit('rpc-unregistered', { method });
log({ module: 'websocket-rpc' }, `User ${userId} unregistered RPC method: ${method}`);
});
// RPC call - Call an RPC method on another socket of the same user
socket.on('rpc-call', async (data: any, callback: (response: any) => void) => {
const { method, params } = data;
if (!method || typeof method !== 'string') {
if (callback) {
callback({
ok: false,
error: 'Invalid parameters: method is required'
});
}
return;
}
// Find the RPC listener for this method within the same user
const userRpcMap = rpcListeners.get(userId);
if (!userRpcMap) {
if (callback) {
callback({
ok: false,
error: 'No RPC methods registered'
});
}
return;
}
const targetSocket = userRpcMap.get(method);
if (!targetSocket || !targetSocket.connected) {
if (callback) {
callback({
ok: false,
error: 'RPC method not available'
});
}
return;
}
// Don't allow calling your own socket
if (targetSocket === socket) {
if (callback) {
callback({
ok: false,
error: 'Cannot call RPC on the same socket'
});
}
return;
}
// Forward the RPC request to the target socket using emitWithAck
try {
const response = await targetSocket.timeout(30000).emitWithAck('rpc-request', {
method,
params
});
// Forward the response back to the caller via callback
if (callback) {
callback({
ok: true,
result: response
});
}
} catch (error) {
// Timeout or error occurred
if (callback) {
callback({
ok: false,
error: error instanceof Error ? error.message : 'RPC call failed'
});
}
}
log({ module: 'websocket-rpc' }, `RPC call from socket ${socket.id} to method ${method}`);
});
socket.emit('auth', { success: true, user: userId });
log({ module: 'websocket' }, `User connected: ${userId}`);
});