diff --git a/sources/app/api.ts b/sources/app/api.ts index 4d5ba40..2c4b8d4 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -789,557 +789,613 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }); socket.on('session-alive', async (data: any) => { - const { sid, time, thinking } = data; - let t = time; - if (typeof t !== 'number') { - return; - } - if (t > Date.now()) { - t = Date.now(); - } - if (t < Date.now() - 1000 * 60 * 10) { // Ignore if time is in the past 10 minutes - return; - } - - // Resolve session - const session = await db.session.findUnique({ - where: { id: sid, accountId: userId } - }); - if (!session) { - return; - } - - // Update last active at - await db.session.update({ - where: { id: sid }, - data: { lastActiveAt: new Date(t), active: true } - }); - - // Emit update to connected sockets - emitUpdateToInterestedClients({ - event: 'ephemeral', - userId, - sessionId: sid, - payload: { - type: 'activity', - id: sid, - active: true, - activeAt: t, - thinking + try { + const { sid, time, thinking } = data; + let t = time; + if (typeof t !== 'number') { + return; } - }); + if (t > Date.now()) { + t = Date.now(); + } + if (t < Date.now() - 1000 * 60 * 10) { // Ignore if time is in the past 10 minutes + return; + } + + // Resolve session + const session = await db.session.findUnique({ + where: { id: sid, accountId: userId } + }); + if (!session) { + return; + } + + // Update last active at + await db.session.update({ + where: { id: sid }, + data: { lastActiveAt: new Date(t), active: true } + }); + + // Emit update to connected sockets + emitUpdateToInterestedClients({ + event: 'ephemeral', + userId, + sessionId: sid, + payload: { + type: 'activity', + id: sid, + active: true, + activeAt: t, + thinking + } + }); + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Error in session-alive: ${error}`); + } }); socket.on('session-end', async (data: any) => { - const { sid, time } = data; - let t = time; - if (typeof t !== 'number') { - return; - } - if (t > Date.now()) { - t = Date.now(); - } - if (t < Date.now() - 1000 * 60 * 10) { // Ignore if time is in the past 10 minutes - return; - } - - // Resolve session - const session = await db.session.findUnique({ - where: { id: sid, accountId: userId } - }); - if (!session) { - return; - } - - // Update last active at - await db.session.update({ - where: { id: sid }, - data: { lastActiveAt: new Date(t), active: false } - }); - - // Emit update to connected sockets - emitUpdateToInterestedClients({ - event: 'ephemeral', - userId, - sessionId: sid, - payload: { - type: 'activity', - id: sid, - active: false, - activeAt: t, - thinking: false + try { + const { sid, time } = data; + let t = time; + if (typeof t !== 'number') { + return; } - }); + if (t > Date.now()) { + t = Date.now(); + } + if (t < Date.now() - 1000 * 60 * 10) { // Ignore if time is in the past 10 minutes + return; + } + + // Resolve session + const session = await db.session.findUnique({ + where: { id: sid, accountId: userId } + }); + if (!session) { + return; + } + + // Update last active at + await db.session.update({ + where: { id: sid }, + data: { lastActiveAt: new Date(t), active: false } + }); + + // Emit update to connected sockets + emitUpdateToInterestedClients({ + event: 'ephemeral', + userId, + sessionId: sid, + payload: { + type: 'activity', + id: sid, + active: false, + activeAt: t, + thinking: false + } + }); + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Error in session-end: ${error}`); + } }); socket.on('message', async (data: any) => { - const { sid, message, localId } = data; + try { + const { sid, message, localId } = data; - log({ module: 'websocket' }, `Received message from socket ${socket.id}: sessionId=${sid}, messageLength=${message.length} bytes, connectionType=${connection.connectionType}, connectionSessionId=${connection.connectionType === 'session-scoped' ? connection.sessionId : 'N/A'}`); + log({ module: 'websocket' }, `Received message from socket ${socket.id}: sessionId=${sid}, messageLength=${message.length} bytes, connectionType=${connection.connectionType}, connectionSessionId=${connection.connectionType === 'session-scoped' ? connection.sessionId : 'N/A'}`); - // Resolve session - const session = await db.session.findUnique({ - where: { id: sid, accountId: userId } - }); - if (!session) { - return; - } - let useLocalId = typeof localId === 'string' ? localId : null; - - // Create encrypted message - const msgContent: PrismaJson.SessionMessageContent = { - t: 'encrypted', - c: message - }; - - // Resolve seq - const updSeq = await allocateUserSeq(userId); - const msgSeq = await allocateSessionSeq(sid); - - // Check if message already exists - if (useLocalId) { - const existing = await db.sessionMessage.findFirst({ - where: { sessionId: sid, localId: useLocalId } + // Resolve session + const session = await db.session.findUnique({ + where: { id: sid, accountId: userId } }); - if (existing) { - return { msg: existing, update: null }; + if (!session) { + return; } - } + let useLocalId = typeof localId === 'string' ? localId : null; - // Create message - const msg = await db.sessionMessage.create({ - data: { + // Create encrypted message + const msgContent: PrismaJson.SessionMessageContent = { + t: 'encrypted', + c: message + }; + + // Resolve seq + const updSeq = await allocateUserSeq(userId); + const msgSeq = await allocateSessionSeq(sid); + + // Check if message already exists + if (useLocalId) { + const existing = await db.sessionMessage.findFirst({ + where: { sessionId: sid, localId: useLocalId } + }); + if (existing) { + return { msg: existing, update: null }; + } + } + + // Create message + const msg = await db.sessionMessage.create({ + data: { + sessionId: sid, + seq: msgSeq, + content: msgContent, + localId: useLocalId + } + }); + + // Create update + const update: PrismaJson.UpdateBody = { + t: 'new-message', + sid: sid, + message: { + id: msg.id, + seq: msg.seq, + content: msgContent, + localId: useLocalId, + createdAt: msg.createdAt.getTime(), + updatedAt: msg.updatedAt.getTime() + } + }; + + // Emit update to relevant clients + emitUpdateToInterestedClients({ + event: 'update', + userId, sessionId: sid, - seq: msgSeq, - content: msgContent, - localId: useLocalId - } - }); - - // Create update - const update: PrismaJson.UpdateBody = { - t: 'new-message', - sid: sid, - message: { - id: msg.id, - seq: msg.seq, - content: msgContent, - localId: useLocalId, - createdAt: msg.createdAt.getTime(), - updatedAt: msg.updatedAt.getTime() - } - }; - - // Emit update to relevant clients - emitUpdateToInterestedClients({ - event: 'update', - userId, - sessionId: sid, - payload: { - id: randomKeyNaked(12), - seq: updSeq, - body: update, - createdAt: Date.now() - }, - skipSenderConnection: connection - }); + payload: { + id: randomKeyNaked(12), + seq: updSeq, + body: update, + createdAt: Date.now() + }, + skipSenderConnection: connection + }); + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Error in message handler: ${error}`); + } }); socket.on('update-metadata', async (data: any, callback: (response: any) => void) => { - const { sid, metadata, expectedVersion } = data; + try { + const { sid, metadata, expectedVersion } = data; - // Validate input - if (!sid || typeof metadata !== 'string' || typeof expectedVersion !== 'number') { + // Validate input + if (!sid || typeof metadata !== 'string' || typeof expectedVersion !== 'number') { + if (callback) { + callback({ result: 'error' }); + } + return; + } + + // Resolve session + const session = await db.session.findUnique({ + where: { id: sid, accountId: userId } + }); + if (!session) { + return; + } + + // Check version + if (session.metadataVersion !== expectedVersion) { + callback({ result: 'version-mismatch', version: session.metadataVersion, metadata: session.metadata }); + return null; + } + + // Update metadata + const { count } = await db.session.updateMany({ + where: { id: sid, metadataVersion: expectedVersion }, + data: { + metadata: metadata, + metadataVersion: expectedVersion + 1 + } + }); + if (count === 0) { + callback({ result: 'version-mismatch', version: session.metadataVersion, metadata: session.metadata }); + return null; + } + + // Generate update + const updSeq = await allocateUserSeq(userId); + const updContent: PrismaJson.UpdateBody = { + t: 'update-session', + id: sid, + metadata: { + value: metadata, + version: expectedVersion + 1 + } + }; + emitUpdateToInterestedClients({ + event: 'update', + userId, + sessionId: sid, + payload: { + id: randomKeyNaked(12), + seq: updSeq, + body: updContent, + createdAt: Date.now() + } + }); + + // Send success response with new version via callback + callback({ result: 'success', version: expectedVersion + 1, metadata: metadata }); + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Error in update-metadata: ${error}`); if (callback) { callback({ result: 'error' }); } - return; } - - // Resolve session - const session = await db.session.findUnique({ - where: { id: sid, accountId: userId } - }); - if (!session) { - return; - } - - // Check version - if (session.metadataVersion !== expectedVersion) { - callback({ result: 'version-mismatch', version: session.metadataVersion, metadata: session.metadata }); - return null; - } - - // Update metadata - const { count } = await db.session.updateMany({ - where: { id: sid, metadataVersion: expectedVersion }, - data: { - metadata: metadata, - metadataVersion: expectedVersion + 1 - } - }); - if (count === 0) { - callback({ result: 'version-mismatch', version: session.metadataVersion, metadata: session.metadata }); - return null; - } - - // Generate update - const updSeq = await allocateUserSeq(userId); - const updContent: PrismaJson.UpdateBody = { - t: 'update-session', - id: sid, - metadata: { - value: metadata, - version: expectedVersion + 1 - } - }; - emitUpdateToInterestedClients({ - event: 'update', - userId, - sessionId: sid, - payload: { - id: randomKeyNaked(12), - seq: updSeq, - body: updContent, - createdAt: Date.now() - } - }); - - // Send success response with new version via callback - callback({ result: 'success', version: expectedVersion + 1, metadata: metadata }); - }); socket.on('update-state', async (data: any, callback: (response: any) => void) => { - const { sid, agentState, expectedVersion } = data; + try { + const { sid, agentState, expectedVersion } = data; - // Validate input - if (!sid || (typeof agentState !== 'string' && agentState !== null) || typeof expectedVersion !== 'number') { + // Validate input + if (!sid || (typeof agentState !== 'string' && agentState !== null) || typeof expectedVersion !== 'number') { + if (callback) { + callback({ result: 'error' }); + } + return; + } + + // Resolve session + const session = await db.session.findUnique({ + where: { + id: sid, + accountId: userId + } + }); + if (!session) { + callback({ result: 'error' }); + return null; + } + + // Check version + if (session.agentStateVersion !== expectedVersion) { + callback({ result: 'version-mismatch', version: session.agentStateVersion, agentState: session.agentState }); + return null; + } + + // Update agent state + const { count } = await db.session.updateMany({ + where: { id: sid, agentStateVersion: expectedVersion }, + data: { + agentState: agentState, + agentStateVersion: expectedVersion + 1 + } + }); + if (count === 0) { + callback({ result: 'version-mismatch', version: session.agentStateVersion, agentState: session.agentState }); + return null; + } + + // Generate update + const updSeq = await allocateUserSeq(userId); + const updContent: PrismaJson.UpdateBody = { + t: 'update-session', + id: sid, + agentState: { + value: agentState, + version: expectedVersion + 1 + } + }; + + // Emit update to connected sockets + emitUpdateToInterestedClients({ + event: 'update', + userId, + sessionId: sid, + payload: { + id: randomKeyNaked(12), + seq: updSeq, + body: updContent, + createdAt: Date.now() + } + }); + + // Send success response with new version via callback + callback({ result: 'success', version: expectedVersion + 1, agentState: agentState }); + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Error in update-state: ${error}`); if (callback) { callback({ result: 'error' }); } - return; } - - // Resolve session - const session = await db.session.findUnique({ - where: { - id: sid, - accountId: userId - } - }); - if (!session) { - callback({ result: 'error' }); - return null; - } - - // Check version - if (session.agentStateVersion !== expectedVersion) { - callback({ result: 'version-mismatch', version: session.agentStateVersion, agentState: session.agentState }); - return null; - } - - // Update agent state - const { count } = await db.session.updateMany({ - where: { id: sid, agentStateVersion: expectedVersion }, - data: { - agentState: agentState, - agentStateVersion: expectedVersion + 1 - } - }); - if (count === 0) { - callback({ result: 'version-mismatch', version: session.agentStateVersion, agentState: session.agentState }); - return null; - } - - // Generate update - const updSeq = await allocateUserSeq(userId); - const updContent: PrismaJson.UpdateBody = { - t: 'update-session', - id: sid, - agentState: { - value: agentState, - version: expectedVersion + 1 - } - }; - - // Emit update to connected sockets - emitUpdateToInterestedClients({ - event: 'update', - userId, - sessionId: sid, - payload: { - id: randomKeyNaked(12), - seq: updSeq, - body: updContent, - createdAt: Date.now() - } - }); - - // Send success response with new version via callback - callback({ result: 'success', version: expectedVersion + 1, agentState: agentState }); }); // RPC register - Register this socket as a listener for an RPC method socket.on('rpc-register', async (data: any) => { - const { method } = data; + try { + const { method } = data; - if (!method || typeof method !== 'string') { - socket.emit('rpc-error', { type: 'register', error: 'Invalid method name' }); - return; + if (!method || typeof method !== 'string') { + socket.emit('rpc-error', { type: 'register', error: 'Invalid method name' }); + return; + } + + // Get or create user's RPC map + let userRpcMap = rpcListeners.get(userId); + if (!userRpcMap) { + userRpcMap = new Map(); + rpcListeners.set(userId, userRpcMap); + } + + // Check if method was already registered + const previousSocket = userRpcMap.get(method); + if (previousSocket && previousSocket !== socket) { + log({ module: 'websocket-rpc' }, `RPC method ${method} re-registered: ${previousSocket.id} -> ${socket.id}`); + } + + // Register this socket as the listener for this method + userRpcMap.set(method, socket); + + socket.emit('rpc-registered', { method }); + log({ module: 'websocket-rpc' }, `RPC method registered: ${method} on socket ${socket.id} (user: ${userId})`); + log({ module: 'websocket-rpc' }, `Active RPC methods for user ${userId}: ${Array.from(userRpcMap.keys()).join(', ')}`); + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Error in rpc-register: ${error}`); + socket.emit('rpc-error', { type: 'register', error: 'Internal error' }); } - - // Get or create user's RPC map - let userRpcMap = rpcListeners.get(userId); - if (!userRpcMap) { - userRpcMap = new Map(); - rpcListeners.set(userId, userRpcMap); - } - - // Check if method was already registered - const previousSocket = userRpcMap.get(method); - if (previousSocket && previousSocket !== socket) { - log({ module: 'websocket-rpc' }, `RPC method ${method} re-registered: ${previousSocket.id} -> ${socket.id}`); - } - - // Register this socket as the listener for this method - userRpcMap.set(method, socket); - - socket.emit('rpc-registered', { method }); - log({ module: 'websocket-rpc' }, `RPC method registered: ${method} on socket ${socket.id} (user: ${userId})`); - log({ module: 'websocket-rpc' }, `Active RPC methods for user ${userId}: ${Array.from(userRpcMap.keys()).join(', ')}`); }); // RPC unregister - Remove this socket as a listener for an RPC method socket.on('rpc-unregister', async (data: any) => { - const { method } = data; + try { + const { method } = data; - if (!method || typeof method !== 'string') { - socket.emit('rpc-error', { type: 'unregister', error: 'Invalid method name' }); - return; - } - - const userRpcMap = rpcListeners.get(userId); - if (userRpcMap && userRpcMap.get(method) === socket) { - userRpcMap.delete(method); - log({ module: 'websocket-rpc' }, `RPC method unregistered: ${method} from socket ${socket.id} (user: ${userId})`); - - if (userRpcMap.size === 0) { - rpcListeners.delete(userId); - log({ module: 'websocket-rpc' }, `All RPC methods unregistered for user ${userId}`); - } else { - log({ module: 'websocket-rpc' }, `Remaining RPC methods for user ${userId}: ${Array.from(userRpcMap.keys()).join(', ')}`); + if (!method || typeof method !== 'string') { + socket.emit('rpc-error', { type: 'unregister', error: 'Invalid method name' }); + return; } - } else { - log({ module: 'websocket-rpc' }, `RPC unregister ignored: ${method} not registered on socket ${socket.id}`); - } - socket.emit('rpc-unregistered', { method }); + const userRpcMap = rpcListeners.get(userId); + if (userRpcMap && userRpcMap.get(method) === socket) { + userRpcMap.delete(method); + log({ module: 'websocket-rpc' }, `RPC method unregistered: ${method} from socket ${socket.id} (user: ${userId})`); + + if (userRpcMap.size === 0) { + rpcListeners.delete(userId); + log({ module: 'websocket-rpc' }, `All RPC methods unregistered for user ${userId}`); + } else { + log({ module: 'websocket-rpc' }, `Remaining RPC methods for user ${userId}: ${Array.from(userRpcMap.keys()).join(', ')}`); + } + } else { + log({ module: 'websocket-rpc' }, `RPC unregister ignored: ${method} not registered on socket ${socket.id}`); + } + + socket.emit('rpc-unregistered', { method }); + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Error in rpc-unregister: ${error}`); + socket.emit('rpc-error', { type: 'unregister', error: 'Internal error' }); + } }); // RPC call - Call an RPC method on another socket of the same user socket.on('rpc-call', async (data: any, callback: (response: any) => void) => { - const { method, params } = data; - - if (!method || typeof method !== 'string') { - if (callback) { - callback({ - ok: false, - error: 'Invalid parameters: method is required' - }); - } - return; - } - - // Find the RPC listener for this method within the same user - const userRpcMap = rpcListeners.get(userId); - if (!userRpcMap) { - log({ module: 'websocket-rpc' }, `RPC call failed: No RPC methods registered for user ${userId}`); - if (callback) { - callback({ - ok: false, - error: 'No RPC methods registered' - }); - } - return; - } - - const targetSocket = userRpcMap.get(method); - if (!targetSocket || !targetSocket.connected) { - log({ module: 'websocket-rpc' }, `RPC call failed: Method ${method} not available (disconnected or not registered)`); - if (callback) { - callback({ - ok: false, - error: 'RPC method not available' - }); - } - return; - } - - // Don't allow calling your own socket - if (targetSocket === socket) { - log({ module: 'websocket-rpc' }, `RPC call failed: Attempted self-call on method ${method}`); - if (callback) { - callback({ - ok: false, - error: 'Cannot call RPC on the same socket' - }); - } - return; - } - - // Log RPC call initiation - const startTime = Date.now(); - log({ module: 'websocket-rpc' }, `RPC call initiated: ${socket.id} -> ${method} (target: ${targetSocket.id})`); - - // Forward the RPC request to the target socket using emitWithAck try { - const response = await targetSocket.timeout(30000).emitWithAck('rpc-request', { - method, - params - }); + const { method, params } = data; - const duration = Date.now() - startTime; - log({ module: 'websocket-rpc' }, `RPC call succeeded: ${method} (${duration}ms)`); - - // Forward the response back to the caller via callback - if (callback) { - callback({ - ok: true, - result: response - }); + if (!method || typeof method !== 'string') { + if (callback) { + callback({ + ok: false, + error: 'Invalid parameters: method is required' + }); + } + return; } - } catch (error) { - const duration = Date.now() - startTime; - const errorMsg = error instanceof Error ? error.message : 'RPC call failed'; - log({ module: 'websocket-rpc' }, `RPC call failed: ${method} - ${errorMsg} (${duration}ms)`); + // Find the RPC listener for this method within the same user + const userRpcMap = rpcListeners.get(userId); + if (!userRpcMap) { + log({ module: 'websocket-rpc' }, `RPC call failed: No RPC methods registered for user ${userId}`); + if (callback) { + callback({ + ok: false, + error: 'No RPC methods registered' + }); + } + return; + } - // Timeout or error occurred + const targetSocket = userRpcMap.get(method); + if (!targetSocket || !targetSocket.connected) { + log({ module: 'websocket-rpc' }, `RPC call failed: Method ${method} not available (disconnected or not registered)`); + if (callback) { + callback({ + ok: false, + error: 'RPC method not available' + }); + } + return; + } + + // Don't allow calling your own socket + if (targetSocket === socket) { + log({ module: 'websocket-rpc' }, `RPC call failed: Attempted self-call on method ${method}`); + if (callback) { + callback({ + ok: false, + error: 'Cannot call RPC on the same socket' + }); + } + return; + } + + // Log RPC call initiation + const startTime = Date.now(); + log({ module: 'websocket-rpc' }, `RPC call initiated: ${socket.id} -> ${method} (target: ${targetSocket.id})`); + + // Forward the RPC request to the target socket using emitWithAck + try { + const response = await targetSocket.timeout(30000).emitWithAck('rpc-request', { + method, + params + }); + + const duration = Date.now() - startTime; + log({ module: 'websocket-rpc' }, `RPC call succeeded: ${method} (${duration}ms)`); + + // Forward the response back to the caller via callback + if (callback) { + callback({ + ok: true, + result: response + }); + } + + } catch (error) { + const duration = Date.now() - startTime; + const errorMsg = error instanceof Error ? error.message : 'RPC call failed'; + log({ module: 'websocket-rpc' }, `RPC call failed: ${method} - ${errorMsg} (${duration}ms)`); + + // Timeout or error occurred + if (callback) { + callback({ + ok: false, + error: errorMsg + }); + } + } + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Error in rpc-call: ${error}`); if (callback) { callback({ ok: false, - error: errorMsg + error: 'Internal error' }); } } }); socket.on('ping', async (callback: (response: any) => void) => { - callback({}); + try { + callback({}); + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Error in ping: ${error}`); + } }); // Usage reporting socket.on('usage-report', async (data: any, callback?: (response: any) => void) => { - const { key, sessionId, tokens, cost } = data; - - // Validate required fields - if (!key || typeof key !== 'string') { - if (callback) { - callback({ success: false, error: 'Invalid key' }); - } - return; - } - - // Validate tokens and cost objects - if (!tokens || typeof tokens !== 'object' || typeof tokens.total !== 'number') { - if (callback) { - callback({ success: false, error: 'Invalid tokens object - must include total' }); - } - return; - } - - if (!cost || typeof cost !== 'object' || typeof cost.total !== 'number') { - if (callback) { - callback({ success: false, error: 'Invalid cost object - must include total' }); - } - return; - } - - // Validate sessionId if provided - if (sessionId && typeof sessionId !== 'string') { - if (callback) { - callback({ success: false, error: 'Invalid sessionId' }); - } - return; - } - try { - // If sessionId provided, verify it belongs to the user - if (sessionId) { - const session = await db.session.findFirst({ - where: { - id: sessionId, - accountId: userId - } - }); + const { key, sessionId, tokens, cost } = data; - if (!session) { - if (callback) { - callback({ success: false, error: 'Session not found' }); - } - return; + // Validate required fields + if (!key || typeof key !== 'string') { + if (callback) { + callback({ success: false, error: 'Invalid key' }); } + return; } - // Prepare usage data - const usageData: PrismaJson.UsageReportData = { - tokens, - cost - }; + // Validate tokens and cost objects + if (!tokens || typeof tokens !== 'object' || typeof tokens.total !== 'number') { + if (callback) { + callback({ success: false, error: 'Invalid tokens object - must include total' }); + } + return; + } - // Upsert the usage report - const report = await db.usageReport.upsert({ - where: { - accountId_sessionId_key: { + if (!cost || typeof cost !== 'object' || typeof cost.total !== 'number') { + if (callback) { + callback({ success: false, error: 'Invalid cost object - must include total' }); + } + return; + } + + // Validate sessionId if provided + if (sessionId && typeof sessionId !== 'string') { + if (callback) { + callback({ success: false, error: 'Invalid sessionId' }); + } + return; + } + + try { + // If sessionId provided, verify it belongs to the user + if (sessionId) { + const session = await db.session.findFirst({ + where: { + id: sessionId, + accountId: userId + } + }); + + if (!session) { + if (callback) { + callback({ success: false, error: 'Session not found' }); + } + return; + } + } + + // Prepare usage data + const usageData: PrismaJson.UsageReportData = { + tokens, + cost + }; + + // Upsert the usage report + const report = await db.usageReport.upsert({ + where: { + accountId_sessionId_key: { + accountId: userId, + sessionId: sessionId || null, + key + } + }, + update: { + data: usageData, + updatedAt: new Date() + }, + create: { accountId: userId, sessionId: sessionId || null, - key - } - }, - update: { - data: usageData, - updatedAt: new Date() - }, - create: { - accountId: userId, - sessionId: sessionId || null, - key, - data: usageData - } - }); - - log({ module: 'websocket' }, `Usage report saved: key=${key}, sessionId=${sessionId || 'none'}, userId=${userId}`); - - // Emit ephemeral update if sessionId is provided - if (sessionId) { - emitUpdateToInterestedClients({ - event: 'ephemeral', - userId, - sessionId, - payload: { - type: 'usage', - id: sessionId, key, - tokens: usageData.tokens, - cost: usageData.cost, - timestamp: Date.now() + data: usageData } }); - } - if (callback) { - callback({ - success: true, - reportId: report.id, - createdAt: report.createdAt.getTime(), - updatedAt: report.updatedAt.getTime() - }); + log({ module: 'websocket' }, `Usage report saved: key=${key}, sessionId=${sessionId || 'none'}, userId=${userId}`); + + // Emit ephemeral update if sessionId is provided + if (sessionId) { + emitUpdateToInterestedClients({ + event: 'ephemeral', + userId, + sessionId, + payload: { + type: 'usage', + id: sessionId, + key, + tokens: usageData.tokens, + cost: usageData.cost, + timestamp: Date.now() + } + }); + } + + if (callback) { + callback({ + success: true, + reportId: report.id, + createdAt: report.createdAt.getTime(), + updatedAt: report.updatedAt.getTime() + }); + } + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Failed to save usage report: ${error}`); + if (callback) { + callback({ success: false, error: 'Failed to save usage report' }); + } } } catch (error) { - log({ module: 'websocket', level: 'error' }, `Failed to save usage report: ${error}`); + log({ module: 'websocket', level: 'error' }, `Error in usage-report handler: ${error}`); if (callback) { - callback({ success: false, error: 'Failed to save usage report' }); + callback({ success: false, error: 'Internal error' }); } } });