diff --git a/sources/app/api.ts b/sources/app/api.ts index 906bb5a..c72030f 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -355,6 +355,9 @@ export async function startApi() { // Track connected users const userSockets = new Map>(); + // Track RPC listeners: Map> + const rpcListeners = new Map>(); + io.on("connection", async (socket) => { log({ module: 'websocket' }, `New connection attempt from socket: ${socket.id}`); const token = socket.handshake.auth.token as string; @@ -412,6 +415,24 @@ export async function startApi() { userSockets.delete(userId); } } + + // Clean up RPC listeners for this socket + const userRpcMap = rpcListeners.get(userId); + if (userRpcMap) { + // Remove all RPC methods registered by this socket + const methodsToRemove: string[] = []; + for (const [method, registeredSocket] of userRpcMap.entries()) { + if (registeredSocket === socket) { + methodsToRemove.push(method); + } + } + methodsToRemove.forEach(method => userRpcMap.delete(method)); + + if (userRpcMap.size === 0) { + rpcListeners.delete(userId); + } + } + pubsub.off('update', updateHandler); pubsub.off('update-ephemeral', updateEphemeralHandler); log({ module: 'websocket' }, `User disconnected: ${userId}`); @@ -760,6 +781,126 @@ export async function startApi() { callback({ result: 'success', version: result.newAgentStateVersion, agentState: agentState }); }); + // RPC register - Register this socket as a listener for an RPC method + socket.on('rpc-register', async (data: any) => { + const { method } = data; + + if (!method || typeof method !== 'string') { + socket.emit('rpc-error', { type: 'register', error: 'Invalid method name' }); + return; + } + + // Get or create user's RPC map + let userRpcMap = rpcListeners.get(userId); + if (!userRpcMap) { + userRpcMap = new Map(); + rpcListeners.set(userId, userRpcMap); + } + + // Register this socket as the listener for this method + userRpcMap.set(method, socket); + + socket.emit('rpc-registered', { method }); + log({ module: 'websocket-rpc' }, `User ${userId} registered RPC method: ${method}`); + }); + + // RPC unregister - Remove this socket as a listener for an RPC method + socket.on('rpc-unregister', async (data: any) => { + const { method } = data; + + if (!method || typeof method !== 'string') { + socket.emit('rpc-error', { type: 'unregister', error: 'Invalid method name' }); + return; + } + + const userRpcMap = rpcListeners.get(userId); + if (userRpcMap && userRpcMap.get(method) === socket) { + userRpcMap.delete(method); + if (userRpcMap.size === 0) { + rpcListeners.delete(userId); + } + } + + socket.emit('rpc-unregistered', { method }); + log({ module: 'websocket-rpc' }, `User ${userId} unregistered RPC method: ${method}`); + }); + + // RPC call - Call an RPC method on another socket of the same user + socket.on('rpc-call', async (data: any, callback: (response: any) => void) => { + const { method, params } = data; + + if (!method || typeof method !== 'string') { + if (callback) { + callback({ + ok: false, + error: 'Invalid parameters: method is required' + }); + } + return; + } + + // Find the RPC listener for this method within the same user + const userRpcMap = rpcListeners.get(userId); + if (!userRpcMap) { + if (callback) { + callback({ + ok: false, + error: 'No RPC methods registered' + }); + } + return; + } + + const targetSocket = userRpcMap.get(method); + if (!targetSocket || !targetSocket.connected) { + if (callback) { + callback({ + ok: false, + error: 'RPC method not available' + }); + } + return; + } + + // Don't allow calling your own socket + if (targetSocket === socket) { + if (callback) { + callback({ + ok: false, + error: 'Cannot call RPC on the same socket' + }); + } + return; + } + + // Forward the RPC request to the target socket using emitWithAck + try { + const response = await targetSocket.timeout(30000).emitWithAck('rpc-request', { + method, + params + }); + + // Forward the response back to the caller via callback + if (callback) { + callback({ + ok: true, + result: response + }); + } + + } catch (error) { + // Timeout or error occurred + if (callback) { + callback({ + ok: false, + error: error instanceof Error ? error.message : 'RPC call failed' + }); + } + } + + log({ module: 'websocket-rpc' }, `RPC call from socket ${socket.id} to method ${method}`); + }); + socket.emit('auth', { success: true, user: userId }); log({ module: 'websocket' }, `User connected: ${userId}`); });