import fastify, { FastifyInstance } from "fastify"; import { log, logger } from "@/utils/log"; import { serializerCompiler, validatorCompiler, ZodTypeProvider } from "fastify-type-provider-zod"; import { Server, Socket } from "socket.io"; import { z } from "zod"; import * as privacyKit from "privacy-kit"; import * as tweetnacl from "tweetnacl"; import { db } from "@/storage/db"; import { Account } from "@prisma/client"; import { onShutdown } from "@/utils/shutdown"; import { allocateSessionSeq, allocateUserSeq } from "@/services/seq"; import { randomKeyNaked } from "@/utils/randomKeyNaked"; import { AsyncLock } from "@/utils/lock"; // Recipient filter types type RecipientFilter = | { type: 'all-interested-in-session'; sessionId: string } | { type: 'user-scoped-only' } | { type: 'all-user-authenticated-connections' }; // Connection metadata types interface SessionScopedConnection { connectionType: 'session-scoped'; socket: Socket; userId: string; sessionId: string; } interface UserScopedConnection { connectionType: 'user-scoped'; socket: Socket; userId: string; } interface MachineScopedConnection { connectionType: 'machine-scoped'; socket: Socket; userId: string; machineId: string; } type ClientConnection = SessionScopedConnection | UserScopedConnection | MachineScopedConnection; declare module 'fastify' { interface FastifyRequest { user: Account; } interface FastifyInstance { authenticate: any; } } export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> { // Configure log('Starting API...'); const tokenGenerator = await privacyKit.createPersistentTokenGenerator({ service: 'handy', seed: process.env.HANDY_MASTER_SECRET! }); const tokenVerifier = await privacyKit.createPersistentTokenVerifier({ service: 'handy', publicKey: tokenGenerator.publicKey }); // Start API const app = fastify({ logger: true, bodyLimit: 1024 * 1024 * 100, // 100MB }); app.register(require('@fastify/cors'), { origin: '*', allowedHeaders: '*', methods: ['GET', 'POST'] }); app.get('/', function (request, reply) { reply.send('Welcome to Happy Server!'); }); app.setValidatorCompiler(validatorCompiler); app.setSerializerCompiler(serializerCompiler); const typed = app.withTypeProvider(); // Authentication decorator app.decorate('authenticate', async function (request: any, reply: any) { try { const authHeader = request.headers.authorization; log({ module: 'auth-decorator' }, `Auth check - path: ${request.url}, has header: ${!!authHeader}, header start: ${authHeader?.substring(0, 50)}...`); if (!authHeader || !authHeader.startsWith('Bearer ')) { log({ module: 'auth-decorator' }, `Auth failed - missing or invalid header`); return reply.code(401).send({ error: 'Missing authorization header' }); } const token = authHeader.substring(7); const verified = await tokenVerifier.verify(token); if (!verified) { log({ module: 'auth-decorator' }, `Auth failed - invalid token`); return reply.code(401).send({ error: 'Invalid token' }); } // Get user from database const user = await db.account.findUnique({ where: { id: verified.user as string } }); if (!user) { log({ module: 'auth-decorator' }, `Auth failed - user not found: ${verified.user}`); return reply.code(401).send({ error: 'User not found' }); } log({ module: 'auth-decorator' }, `Auth success - user: ${user.id}`); request.user = user; } catch (error) { return reply.code(401).send({ error: 'Authentication failed' }); } }); // Send session update to all relevant connections let emitUpdateToInterestedClients = ({ event, userId, payload, recipientFilter = { type: 'all-user-authenticated-connections' }, skipSenderConnection }: { event: string, userId: string, payload: any, recipientFilter?: RecipientFilter, skipSenderConnection?: ClientConnection }) => { const connections = userIdToClientConnections.get(userId); if (!connections) { log({ module: 'websocket', level: 'warn' }, `No connections found for user ${userId}`); return; } for (const connection of connections) { // Skip message echo if (skipSenderConnection && connection === skipSenderConnection) { continue; } // Apply recipient filter switch (recipientFilter.type) { case 'all-interested-in-session': // Send to session-scoped with matching session + all user-scoped if (connection.connectionType === 'session-scoped') { if (connection.sessionId !== recipientFilter.sessionId) { continue; // Wrong session } } else if (connection.connectionType === 'machine-scoped') { continue; // Machines don't need session updates } // user-scoped always gets it break; case 'user-scoped-only': if (connection.connectionType !== 'user-scoped') { continue; } break; case 'all-user-authenticated-connections': // Send to all connection types (default behavior) break; } log({ module: 'websocket' }, `Sending ${event} to ${connection.connectionType} connection ${connection.socket.id}`); connection.socket.emit(event, payload); } } // Auth schema typed.post('/v1/auth', { schema: { body: z.object({ publicKey: z.string(), challenge: z.string(), signature: z.string() }) } }, async (request, reply) => { const publicKey = privacyKit.decodeBase64(request.body.publicKey); const challenge = privacyKit.decodeBase64(request.body.challenge); const signature = privacyKit.decodeBase64(request.body.signature); const isValid = tweetnacl.sign.detached.verify(challenge, signature, publicKey); if (!isValid) { return reply.code(401).send({ error: 'Invalid signature' }); } // Create or update user in database const publicKeyHex = privacyKit.encodeHex(publicKey); const user = await db.account.upsert({ where: { publicKey: publicKeyHex }, update: { updatedAt: new Date() }, create: { publicKey: publicKeyHex } }); return reply.send({ success: true, token: await tokenGenerator.new({ user: user.id }) }); }); typed.post('/v1/auth/request', { schema: { body: z.object({ publicKey: z.string(), }), response: { 200: z.union([z.object({ state: z.literal('requested'), }), z.object({ state: z.literal('authorized'), token: z.string(), response: z.string() })]), 401: z.object({ error: z.literal('Invalid public key') }) } } }, async (request, reply) => { const publicKey = privacyKit.decodeBase64(request.body.publicKey); const isValid = tweetnacl.box.publicKeyLength === publicKey.length; if (!isValid) { return reply.code(401).send({ error: 'Invalid public key' }); } const publicKeyHex = privacyKit.encodeHex(publicKey); log({ module: 'auth-request' }, `Terminal auth request - publicKey hex: ${publicKeyHex}`); const answer = await db.terminalAuthRequest.upsert({ where: { publicKey: publicKeyHex }, update: {}, create: { publicKey: publicKeyHex } }); if (answer.response && answer.responseAccountId) { const token = await tokenGenerator.new({ user: answer.responseAccountId!, extras: { session: answer.id } }); return reply.send({ state: 'authorized', token: token, response: answer.response }); } return reply.send({ state: 'requested' }); }); // Approve auth request typed.post('/v1/auth/response', { preHandler: app.authenticate, schema: { body: z.object({ response: z.string(), publicKey: z.string() }) } }, async (request, reply) => { log({ module: 'auth-response' }, `Auth response endpoint hit - user: ${request.user?.id || 'NO USER'}, publicKey: ${request.body.publicKey.substring(0, 20)}...`); const publicKey = privacyKit.decodeBase64(request.body.publicKey); const isValid = tweetnacl.box.publicKeyLength === publicKey.length; if (!isValid) { log({ module: 'auth-response' }, `Invalid public key length: ${publicKey.length}`); return reply.code(401).send({ error: 'Invalid public key' }); } const publicKeyHex = privacyKit.encodeHex(publicKey); log({ module: 'auth-response' }, `Looking for auth request with publicKey hex: ${publicKeyHex}`); const authRequest = await db.terminalAuthRequest.findUnique({ where: { publicKey: publicKeyHex } }); if (!authRequest) { log({ module: 'auth-response' }, `Auth request not found for publicKey: ${publicKeyHex}`); // Let's also check what auth requests exist const allRequests = await db.terminalAuthRequest.findMany({ take: 5, orderBy: { createdAt: 'desc' } }); log({ module: 'auth-response' }, `Recent auth requests in DB: ${JSON.stringify(allRequests.map(r => ({ id: r.id, publicKey: r.publicKey.substring(0, 20) + '...', hasResponse: !!r.response })))}`); return reply.code(404).send({ error: 'Request not found' }); } if (!authRequest.response) { await db.terminalAuthRequest.update({ where: { id: authRequest.id }, data: { response: request.body.response, responseAccountId: request.user.id } }); } return reply.send({ success: true }); }); // Account auth request typed.post('/v1/auth/account/request', { schema: { body: z.object({ publicKey: z.string(), }), response: { 200: z.union([z.object({ state: z.literal('requested'), }), z.object({ state: z.literal('authorized'), token: z.string(), response: z.string() })]), 401: z.object({ error: z.literal('Invalid public key') }) } } }, async (request, reply) => { const publicKey = privacyKit.decodeBase64(request.body.publicKey); const isValid = tweetnacl.box.publicKeyLength === publicKey.length; if (!isValid) { return reply.code(401).send({ error: 'Invalid public key' }); } const answer = await db.accountAuthRequest.upsert({ where: { publicKey: privacyKit.encodeHex(publicKey) }, update: {}, create: { publicKey: privacyKit.encodeHex(publicKey) } }); if (answer.response && answer.responseAccountId) { const token = await tokenGenerator.new({ user: answer.responseAccountId! }); return reply.send({ state: 'authorized', token: token, response: answer.response }); } return reply.send({ state: 'requested' }); }); // Approve account auth request typed.post('/v1/auth/account/response', { preHandler: app.authenticate, schema: { body: z.object({ response: z.string(), publicKey: z.string() }) } }, async (request, reply) => { const publicKey = privacyKit.decodeBase64(request.body.publicKey); const isValid = tweetnacl.box.publicKeyLength === publicKey.length; if (!isValid) { return reply.code(401).send({ error: 'Invalid public key' }); } const authRequest = await db.accountAuthRequest.findUnique({ where: { publicKey: privacyKit.encodeHex(publicKey) } }); if (!authRequest) { return reply.code(404).send({ error: 'Request not found' }); } if (!authRequest.response) { await db.accountAuthRequest.update({ where: { id: authRequest.id }, data: { response: request.body.response, responseAccountId: request.user.id } }); } return reply.send({ success: true }); }); // Sessions API typed.get('/v1/sessions', { preHandler: app.authenticate, }, async (request, reply) => { const userId = request.user.id; const sessions = await db.session.findMany({ where: { accountId: userId }, orderBy: { updatedAt: 'desc' }, take: 150, select: { id: true, seq: true, createdAt: true, updatedAt: true, metadata: true, metadataVersion: true, agentState: true, agentStateVersion: true, active: true, lastActiveAt: true, messages: { orderBy: { seq: 'desc' }, take: 1, select: { id: true, seq: true, content: true, localId: true, createdAt: true } } } }); return reply.send({ sessions: sessions.map((v) => { const lastMessage = v.messages[0]; const sessionUpdatedAt = v.updatedAt.getTime(); const lastMessageCreatedAt = lastMessage ? lastMessage.createdAt.getTime() : 0; return { id: v.id, seq: v.seq, createdAt: v.createdAt.getTime(), updatedAt: Math.max(sessionUpdatedAt, lastMessageCreatedAt), active: v.active, activeAt: v.lastActiveAt.getTime(), metadata: v.metadata, metadataVersion: v.metadataVersion, agentState: v.agentState, agentStateVersion: v.agentStateVersion, lastMessage: lastMessage ? { id: lastMessage.id, seq: lastMessage.seq, localId: lastMessage.localId, content: lastMessage.content, createdAt: lastMessageCreatedAt } : null }; }) }); }); // Create or load session by tag typed.post('/v1/sessions', { schema: { body: z.object({ tag: z.string(), metadata: z.string(), agentState: z.string().nullish() }) }, preHandler: app.authenticate }, async (request, reply) => { const userId = request.user.id; const { tag, metadata } = request.body; const session = await db.session.findFirst({ where: { accountId: userId, tag: tag } }); if (session) { logger.info({ module: 'session-create', sessionId: session.id, userId, tag }, `Found existing session: ${session.id} for tag ${tag}`); return reply.send({ session: { id: session.id, seq: session.seq, metadata: session.metadata, metadataVersion: session.metadataVersion, agentState: session.agentState, agentStateVersion: session.agentStateVersion, active: session.active, activeAt: session.lastActiveAt.getTime(), createdAt: session.createdAt.getTime(), updatedAt: session.updatedAt.getTime(), lastMessage: null } }); } else { // Resolve seq const updSeq = await allocateUserSeq(userId); // Create session logger.info({ module: 'session-create', userId, tag }, `Creating new session for user ${userId} with tag ${tag}`); const session = await db.session.create({ data: { accountId: userId, tag: tag, metadata: metadata } }); logger.info({ module: 'session-create', sessionId: session.id, userId }, `Session created: ${session.id}`); // Create update const updContent: PrismaJson.UpdateBody = { t: 'new-session', id: session.id, seq: session.seq, metadata: session.metadata, metadataVersion: session.metadataVersion, agentState: session.agentState, agentStateVersion: session.agentStateVersion, active: session.active, activeAt: session.lastActiveAt.getTime(), createdAt: session.createdAt.getTime(), updatedAt: session.updatedAt.getTime() }; // Emit update to connected sockets const updatePayload = { id: randomKeyNaked(12), seq: updSeq, body: updContent, createdAt: Date.now() }; logger.info({ module: 'session-create', userId, sessionId: session.id, updateType: 'new-session', updatePayload: JSON.stringify(updatePayload) }, `Emitting new-session update to all user connections`); emitUpdateToInterestedClients({ event: 'update', userId, payload: updatePayload, recipientFilter: { type: 'all-user-authenticated-connections' } }); return reply.send({ session: { id: session.id, seq: session.seq, metadata: session.metadata, metadataVersion: session.metadataVersion, agentState: session.agentState, agentStateVersion: session.agentStateVersion, active: session.active, activeAt: session.lastActiveAt.getTime(), createdAt: session.createdAt.getTime(), updatedAt: session.updatedAt.getTime(), lastMessage: null } }); } }); // Push Token Registration API typed.post('/v1/push-tokens', { schema: { body: z.object({ token: z.string() }), response: { 200: z.object({ success: z.literal(true) }), 500: z.object({ error: z.literal('Failed to register push token') }) } }, preHandler: app.authenticate }, async (request, reply) => { const userId = request.user.id; const { token } = request.body; try { await db.accountPushToken.upsert({ where: { accountId_token: { accountId: userId, token: token } }, update: { updatedAt: new Date() }, create: { accountId: userId, token: token } }); return reply.send({ success: true }); } catch (error) { return reply.code(500).send({ error: 'Failed to register push token' }); } }); // Delete Push Token API typed.delete('/v1/push-tokens/:token', { schema: { params: z.object({ token: z.string() }), response: { 200: z.object({ success: z.literal(true) }), 500: z.object({ error: z.literal('Failed to delete push token') }) } }, preHandler: app.authenticate }, async (request, reply) => { const userId = request.user.id; const { token } = request.params; try { await db.accountPushToken.deleteMany({ where: { accountId: userId, token: token } }); return reply.send({ success: true }); } catch (error) { return reply.code(500).send({ error: 'Failed to delete push token' }); } }); // Get Push Tokens API typed.get('/v1/push-tokens', { preHandler: app.authenticate }, async (request, reply) => { const userId = request.user.id; try { const tokens = await db.accountPushToken.findMany({ where: { accountId: userId }, orderBy: { createdAt: 'desc' } }); return reply.send({ tokens: tokens.map(t => ({ id: t.id, token: t.token, createdAt: t.createdAt.getTime(), updatedAt: t.updatedAt.getTime() })) }); } catch (error) { return reply.code(500).send({ error: 'Failed to get push tokens' }); } }); // Get Account Settings API typed.get('/v1/account/settings', { preHandler: app.authenticate, schema: { response: { 200: z.object({ settings: z.string().nullable(), settingsVersion: z.number() }), 500: z.object({ error: z.literal('Failed to get account settings') }) } } }, async (request, reply) => { try { return reply.send({ settings: request.user.settings, settingsVersion: request.user.settingsVersion }); } catch (error) { return reply.code(500).send({ error: 'Failed to get account settings' }); } }); // Update Account Settings API typed.post('/v1/account/settings', { schema: { body: z.object({ settings: z.string().nullable(), expectedVersion: z.number().int().min(0) }), response: { 200: z.union([z.object({ success: z.literal(true), version: z.number() }), z.object({ success: z.literal(false), error: z.literal('version-mismatch'), currentVersion: z.number(), currentSettings: z.string().nullable() })]), 500: z.object({ success: z.literal(false), error: z.literal('Failed to update account settings') }) } }, preHandler: app.authenticate }, async (request, reply) => { const userId = request.user.id; const { settings, expectedVersion } = request.body; try { // Check current version if (request.user.settingsVersion !== expectedVersion) { return reply.code(200).send({ success: false, error: 'version-mismatch', currentVersion: request.user.settingsVersion, currentSettings: request.user.settings }); } // Update settings with version check const { count } = await db.account.updateMany({ where: { id: userId, settingsVersion: expectedVersion }, data: { settings: settings, settingsVersion: expectedVersion + 1, updatedAt: new Date() } }); if (count === 0) { // Re-fetch to get current version const account = await db.account.findUnique({ where: { id: userId } }); return reply.code(200).send({ success: false, error: 'version-mismatch', currentVersion: account?.settingsVersion || 0, currentSettings: account?.settings || null }); } // Generate update for connected clients const updSeq = await allocateUserSeq(userId); const updContent: PrismaJson.UpdateBody = { t: 'update-account', id: userId, settings: { value: settings, version: expectedVersion + 1 } }; // Send to all user connections emitUpdateToInterestedClients({ event: 'update', userId, payload: { id: randomKeyNaked(12), seq: updSeq, body: updContent, createdAt: Date.now() }, recipientFilter: { type: 'all-user-authenticated-connections' } }); return reply.send({ success: true, version: expectedVersion + 1 }); } catch (error) { log({ module: 'api', level: 'error' }, `Failed to update account settings: ${error}`); return reply.code(500).send({ success: false, error: 'Failed to update account settings' }); } }); // Query Usage Reports API typed.post('/v1/usage/query', { schema: { body: z.object({ sessionId: z.string().nullish(), startTime: z.number().int().positive().nullish(), endTime: z.number().int().positive().nullish(), groupBy: z.enum(['hour', 'day']).nullish() }) }, preHandler: app.authenticate }, async (request, reply) => { const userId = request.user.id; const { sessionId, startTime, endTime, groupBy } = request.body; const actualGroupBy = groupBy || 'day'; try { // Build query conditions const where: { accountId: string; sessionId?: string | null; createdAt?: { gte?: Date; lte?: Date; }; } = { accountId: userId }; if (sessionId) { // Verify session belongs to user const session = await db.session.findFirst({ where: { id: sessionId, accountId: userId } }); if (!session) { return reply.code(404).send({ error: 'Session not found' }); } where.sessionId = sessionId; } if (startTime || endTime) { where.createdAt = {}; if (startTime) { where.createdAt.gte = new Date(startTime * 1000); } if (endTime) { where.createdAt.lte = new Date(endTime * 1000); } } // Fetch usage reports const reports = await db.usageReport.findMany({ where, orderBy: { createdAt: 'desc' } }); // Aggregate data by time period const aggregated = new Map; cost: Record; count: number; timestamp: number; }>(); for (const report of reports) { const data = report.data as PrismaJson.UsageReportData; const date = new Date(report.createdAt); // Calculate timestamp based on groupBy let timestamp: number; if (actualGroupBy === 'hour') { // Round down to hour const hourDate = new Date(date.getFullYear(), date.getMonth(), date.getDate(), date.getHours(), 0, 0, 0); timestamp = Math.floor(hourDate.getTime() / 1000); } else { // Round down to day const dayDate = new Date(date.getFullYear(), date.getMonth(), date.getDate(), 0, 0, 0, 0); timestamp = Math.floor(dayDate.getTime() / 1000); } const key = timestamp.toString(); if (!aggregated.has(key)) { aggregated.set(key, { tokens: {}, cost: {}, count: 0, timestamp }); } const agg = aggregated.get(key)!; agg.count++; // Aggregate tokens for (const [tokenKey, tokenValue] of Object.entries(data.tokens)) { if (typeof tokenValue === 'number') { agg.tokens[tokenKey] = (agg.tokens[tokenKey] || 0) + tokenValue; } } // Aggregate costs for (const [costKey, costValue] of Object.entries(data.cost)) { if (typeof costValue === 'number') { agg.cost[costKey] = (agg.cost[costKey] || 0) + costValue; } } } // Convert to array and sort by timestamp const result = Array.from(aggregated.values()) .map(data => ({ timestamp: data.timestamp, tokens: data.tokens, cost: data.cost, reportCount: data.count })) .sort((a, b) => a.timestamp - b.timestamp); return reply.send({ usage: result, groupBy: actualGroupBy, totalReports: reports.length }); } catch (error) { log({ module: 'api', level: 'error' }, `Failed to query usage reports: ${error}`); return reply.code(500).send({ error: 'Failed to query usage reports' }); } }); // Messages API typed.get('/v1/sessions/:sessionId/messages', { schema: { params: z.object({ sessionId: z.string() }) }, preHandler: app.authenticate }, async (request, reply) => { const userId = request.user.id; const { sessionId } = request.params; // Verify session belongs to user const session = await db.session.findFirst({ where: { id: sessionId, accountId: userId } }); if (!session) { return reply.code(404).send({ error: 'Session not found' }); } const messages = await db.sessionMessage.findMany({ where: { sessionId }, orderBy: { createdAt: 'desc' }, take: 150, select: { id: true, seq: true, localId: true, content: true, createdAt: true, updatedAt: true } }); return reply.send({ messages: messages.map((v) => ({ id: v.id, seq: v.seq, content: v.content, localId: v.localId, createdAt: v.createdAt.getTime(), updatedAt: v.updatedAt.getTime() })) }); }); // Machines // POST /v1/machines - Create machine or return existing typed.post('/v1/machines', { preHandler: app.authenticate, schema: { body: z.object({ id: z.string(), metadata: z.string(), // Encrypted metadata daemonState: z.string().optional() // Encrypted daemon state }) } }, async (request, reply) => { const userId = request.user.id; const { id, metadata, daemonState } = request.body; // Check if machine exists (like sessions do) const machine = await db.machine.findFirst({ where: { accountId: userId, id: id } }); if (machine) { // Machine exists - just return it logger.info({ module: 'machines', machineId: id, userId }, 'Found existing machine'); return reply.send({ machine: { id: machine.id, metadata: machine.metadata, metadataVersion: machine.metadataVersion, daemonState: machine.daemonState, daemonStateVersion: machine.daemonStateVersion, active: machine.active, activeAt: machine.lastActiveAt.getTime(), // Return as activeAt for API consistency createdAt: machine.createdAt.getTime(), updatedAt: machine.updatedAt.getTime() } }); } else { // Create new machine logger.info({ module: 'machines', machineId: id, userId }, 'Creating new machine'); const newMachine = await db.machine.create({ data: { id, accountId: userId, metadata, metadataVersion: 1, daemonState: daemonState || null, daemonStateVersion: daemonState ? 1 : 0 // active defaults to true in schema // lastActiveAt defaults to now() in schema } }); // Emit update for new machine const updSeq = await allocateUserSeq(userId); emitUpdateToInterestedClients({ event: 'update', userId, payload: { id: randomKeyNaked(), seq: updSeq, body: { t: 'update-machine', machineId: newMachine.id, metadata: { version: 1, value: metadata } }, createdAt: Date.now() } }); return reply.send({ machine: { id: newMachine.id, metadata: newMachine.metadata, metadataVersion: newMachine.metadataVersion, daemonState: newMachine.daemonState, daemonStateVersion: newMachine.daemonStateVersion, active: newMachine.active, activeAt: newMachine.lastActiveAt.getTime(), // Return as activeAt for API consistency createdAt: newMachine.createdAt.getTime(), updatedAt: newMachine.updatedAt.getTime() } }); } }); // Machines API typed.get('/v1/machines', { preHandler: app.authenticate, }, async (request, reply) => { const userId = request.user.id; const machines = await db.machine.findMany({ where: { accountId: userId }, orderBy: { lastActiveAt: 'desc' } }); return machines.map(m => ({ id: m.id, metadata: m.metadata, metadataVersion: m.metadataVersion, daemonState: m.daemonState, daemonStateVersion: m.daemonStateVersion, seq: m.seq, active: m.active, activeAt: m.lastActiveAt.getTime(), createdAt: m.createdAt.getTime(), updatedAt: m.updatedAt.getTime() })); }); // GET /v1/machines/:id - Get single machine by ID typed.get('/v1/machines/:id', { preHandler: app.authenticate, schema: { params: z.object({ id: z.string() }) } }, async (request, reply) => { const userId = request.user.id; const { id } = request.params; const machine = await db.machine.findFirst({ where: { accountId: userId, id: id } }); if (!machine) { return reply.code(404).send({ error: 'Machine not found' }); } return { machine: { id: machine.id, metadata: machine.metadata, metadataVersion: machine.metadataVersion, daemonState: machine.daemonState, daemonStateVersion: machine.daemonStateVersion, seq: machine.seq, active: machine.active, activeAt: machine.lastActiveAt.getTime(), createdAt: machine.createdAt.getTime(), updatedAt: machine.updatedAt.getTime() } }; }); // Combined logging endpoint (only when explicitly enabled) if (process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING) { typed.post('/logs-combined-from-cli-and-mobile-for-simple-ai-debugging', { schema: { body: z.object({ timestamp: z.string(), level: z.string(), message: z.string(), messageRawObject: z.any().optional(), source: z.enum(['mobile', 'cli']), platform: z.string().optional() }) } }, async (request, reply) => { const { timestamp, level, message, source, platform } = request.body; // Log ONLY to separate remote logger (file only, no console) const logData = { source, platform, timestamp }; // Use the file-only logger if available const { fileConsolidatedLogger } = await import('@/utils/log'); if (!fileConsolidatedLogger) { // Should never happen since we check env var above, but be safe return reply.send({ success: true }); } switch (level.toLowerCase()) { case 'error': fileConsolidatedLogger.error(logData, message); break; case 'warn': case 'warning': fileConsolidatedLogger.warn(logData, message); break; case 'debug': fileConsolidatedLogger.debug(logData, message); break; default: fileConsolidatedLogger.info(logData, message); } return reply.send({ success: true }); }); } // Catch-all route for debugging 404s app.setNotFoundHandler((request, reply) => { log({ module: '404-handler' }, `404 - Method: ${request.method}, Path: ${request.url}, Headers: ${JSON.stringify(request.headers)}`); reply.code(404).send({ error: 'Not found', path: request.url, method: request.method }); }); // Start const port = process.env.PORT ? parseInt(process.env.PORT, 10) : 3005; await app.listen({ port, host: '0.0.0.0' }); // Socket IO - Create after server is listening if (!app.server) { throw new Error('Fastify server not available'); } const io = new Server(app.server, { cors: { origin: "*", methods: ["GET", "POST", "OPTIONS"], credentials: true, allowedHeaders: ["*"] }, transports: ['websocket', 'polling'], pingTimeout: 45000, pingInterval: 15000, path: '/v1/updates', allowUpgrades: true, upgradeTimeout: 10000, connectTimeout: 20000, serveClient: false // Don't serve the client files }); // Track connections by scope type const userIdToClientConnections = new Map>(); // Track RPC listeners: Map> // Only session-scoped clients (CLI) register handlers, only user-scoped clients (mobile) call them const rpcListeners = new Map>(); io.on("connection", async (socket) => { log({ module: 'websocket' }, `New connection attempt from socket: ${socket.id}`); const token = socket.handshake.auth.token as string; const clientType = socket.handshake.auth.clientType as 'session-scoped' | 'user-scoped' | 'machine-scoped' | undefined; const sessionId = socket.handshake.auth.sessionId as string | undefined; const machineId = socket.handshake.auth.machineId as string | undefined; if (!token) { log({ module: 'websocket' }, `No token provided`); socket.emit('error', { message: 'Missing authentication token' }); socket.disconnect(); return; } // Validate session-scoped clients have sessionId if (clientType === 'session-scoped' && !sessionId) { log({ module: 'websocket' }, `Session-scoped client missing sessionId`); socket.emit('error', { message: 'Session ID required for session-scoped clients' }); socket.disconnect(); return; } // Validate machine-scoped clients have machineId if (clientType === 'machine-scoped' && !machineId) { log({ module: 'websocket' }, `Machine-scoped client missing machineId`); socket.emit('error', { message: 'Machine ID required for machine-scoped clients' }); socket.disconnect(); return; } const verified = await tokenVerifier.verify(token); if (!verified) { log({ module: 'websocket' }, `Invalid token provided`); socket.emit('error', { message: 'Invalid authentication token' }); socket.disconnect(); return; } const userId = verified.user as string; log({ module: 'websocket' }, `Token verified: ${userId}, clientType: ${clientType || 'user-scoped'}, sessionId: ${sessionId || 'none'}, machineId: ${machineId || 'none'}, socketId: ${socket.id}`); // Store connection based on type const metadata = { clientType: clientType || 'user-scoped', sessionId, machineId }; let connection: ClientConnection; if (metadata.clientType === 'session-scoped' && sessionId) { connection = { connectionType: 'session-scoped', socket, userId, sessionId }; } else if (metadata.clientType === 'machine-scoped' && machineId) { connection = { connectionType: 'machine-scoped', socket, userId, machineId }; } else { connection = { connectionType: 'user-scoped', socket, userId }; } if (!userIdToClientConnections.has(userId)) { userIdToClientConnections.set(userId, new Set()); } userIdToClientConnections.get(userId)!.add(connection); // Broadcast daemon online status if (connection.connectionType === 'machine-scoped') { // Broadcast daemon online emitUpdateToInterestedClients({ event: 'ephemeral', userId, payload: { type: 'machine-activity', id: machineId, active: true, activeAt: Date.now() }, recipientFilter: { type: 'user-scoped-only' } }); } // Lock const receiveMessageLock = new AsyncLock(); const receiveUsageLock = new AsyncLock(); socket.on('disconnect', () => { // Cleanup const connections = userIdToClientConnections.get(userId); if (connections) { connections.delete(connection); if (connections.size === 0) { userIdToClientConnections.delete(userId); } } // Clean up RPC listeners for this socket const userRpcMap = rpcListeners.get(userId); if (userRpcMap) { // Remove all RPC methods registered by this socket const methodsToRemove: string[] = []; for (const [method, registeredSocket] of userRpcMap.entries()) { if (registeredSocket === socket) { methodsToRemove.push(method); } } if (methodsToRemove.length > 0) { log({ module: 'websocket-rpc' }, `Cleaning up RPC methods on disconnect for socket ${socket.id}: ${methodsToRemove.join(', ')}`); methodsToRemove.forEach(method => userRpcMap.delete(method)); } if (userRpcMap.size === 0) { rpcListeners.delete(userId); log({ module: 'websocket-rpc' }, `All RPC listeners removed for user ${userId}`); } } log({ module: 'websocket' }, `User disconnected: ${userId}`); // Broadcast daemon offline status if (connection.connectionType === 'machine-scoped') { emitUpdateToInterestedClients({ event: 'ephemeral', userId, payload: { type: 'machine-activity', id: connection.machineId, active: false, activeAt: Date.now() }, recipientFilter: { type: 'user-scoped-only' } }); } }); socket.on('session-alive', async (data: { sid: string; time: number; thinking?: boolean; }) => { try { // Basic validation if (!data || typeof data.time !== 'number' || !data.sid) { return; } let t = data.time; if (t > Date.now()) { t = Date.now(); } if (t < Date.now() - 1000 * 60 * 10) { return; } const { sid, thinking } = data; // Resolve session const session = await db.session.findUnique({ where: { id: sid, accountId: userId } }); if (!session) { return; } // Update last active await db.session.update({ where: { id: sid }, data: { lastActiveAt: new Date(t), active: true } }); // Emit update emitUpdateToInterestedClients({ event: 'ephemeral', userId, payload: { type: 'activity', id: sid, active: true, activeAt: t, thinking: thinking || false }, recipientFilter: { type: 'all-user-authenticated-connections' } }); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in session-alive: ${error}`); } }); socket.on('machine-alive', async (data: { machineId: string; time: number; }) => { try { // Basic validation if (!data || typeof data.time !== 'number' || !data.machineId) { return; } let t = data.time; if (t > Date.now()) { t = Date.now(); } if (t < Date.now() - 1000 * 60 * 10) { return; } // Resolve machine const machine = await db.machine.findUnique({ where: { accountId_id: { accountId: userId, id: data.machineId } } }); if (!machine) { return; } // Update machine lastActiveAt in database const updatedMachine = await db.machine.update({ where: { accountId_id: { accountId: userId, id: data.machineId } }, data: { lastActiveAt: new Date(t), active: true } }) emitUpdateToInterestedClients({ event: 'ephemeral', userId, payload: { type: 'machine-activity', id: updatedMachine.id, active: true, activeAt: t, }, recipientFilter: { type: 'user-scoped-only' } }); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in machine-alive: ${error}`); } }); socket.on('session-end', async (data: { sid: string; time: number; }) => { try { const { sid, time } = data; let t = time; if (typeof t !== 'number') { return; } if (t > Date.now()) { t = Date.now(); } if (t < Date.now() - 1000 * 60 * 10) { // Ignore if time is in the past 10 minutes return; } // Resolve session const session = await db.session.findUnique({ where: { id: sid, accountId: userId } }); if (!session) { return; } // Update last active at await db.session.update({ where: { id: sid }, data: { lastActiveAt: new Date(t), active: false } }); // Emit update to connected sockets emitUpdateToInterestedClients({ event: 'ephemeral', userId, payload: { type: 'activity', id: sid, active: false, activeAt: t, thinking: false }, recipientFilter: { type: 'all-user-authenticated-connections' } }); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in session-end: ${error}`); } }); socket.on('message', async (data: any) => { await receiveMessageLock.inLock(async () => { try { const { sid, message, localId } = data; log({ module: 'websocket' }, `Received message from socket ${socket.id}: sessionId=${sid}, messageLength=${message.length} bytes, connectionType=${connection.connectionType}, connectionSessionId=${connection.connectionType === 'session-scoped' ? connection.sessionId : 'N/A'}`); // Resolve session const session = await db.session.findUnique({ where: { id: sid, accountId: userId } }); if (!session) { return; } let useLocalId = typeof localId === 'string' ? localId : null; // Create encrypted message const msgContent: PrismaJson.SessionMessageContent = { t: 'encrypted', c: message }; // Resolve seq const updSeq = await allocateUserSeq(userId); const msgSeq = await allocateSessionSeq(sid); // Check if message already exists if (useLocalId) { const existing = await db.sessionMessage.findFirst({ where: { sessionId: sid, localId: useLocalId } }); if (existing) { return { msg: existing, update: null }; } } // Create message const msg = await db.sessionMessage.create({ data: { sessionId: sid, seq: msgSeq, content: msgContent, localId: useLocalId } }); // Create update const update: PrismaJson.UpdateBody = { t: 'new-message', sid: sid, message: { id: msg.id, seq: msg.seq, content: msgContent, localId: useLocalId, createdAt: msg.createdAt.getTime(), updatedAt: msg.updatedAt.getTime() } }; // Emit update to relevant clients emitUpdateToInterestedClients({ event: 'update', userId, payload: { id: randomKeyNaked(12), seq: updSeq, body: update, createdAt: Date.now() }, recipientFilter: { type: 'all-interested-in-session', sessionId: sid }, skipSenderConnection: connection }); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in message handler: ${error}`); } }); }); socket.on('update-metadata', async (data: any, callback: (response: any) => void) => { try { const { sid, metadata, expectedVersion } = data; // Validate input if (!sid || typeof metadata !== 'string' || typeof expectedVersion !== 'number') { if (callback) { callback({ result: 'error' }); } return; } // Resolve session const session = await db.session.findUnique({ where: { id: sid, accountId: userId } }); if (!session) { return; } // Check version if (session.metadataVersion !== expectedVersion) { callback({ result: 'version-mismatch', version: session.metadataVersion, metadata: session.metadata }); return null; } // Update metadata const { count } = await db.session.updateMany({ where: { id: sid, metadataVersion: expectedVersion }, data: { metadata: metadata, metadataVersion: expectedVersion + 1 } }); if (count === 0) { callback({ result: 'version-mismatch', version: session.metadataVersion, metadata: session.metadata }); return null; } // Generate update const updSeq = await allocateUserSeq(userId); const updContent: PrismaJson.UpdateBody = { t: 'update-session', id: sid, metadata: { value: metadata, version: expectedVersion + 1 } }; emitUpdateToInterestedClients({ event: 'update', userId, payload: { id: randomKeyNaked(12), seq: updSeq, body: updContent, createdAt: Date.now() }, recipientFilter: { type: 'all-interested-in-session', sessionId: sid } }); // Send success response with new version via callback callback({ result: 'success', version: expectedVersion + 1, metadata: metadata }); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in update-metadata: ${error}`); if (callback) { callback({ result: 'error' }); } } }); socket.on('update-state', async (data: any, callback: (response: any) => void) => { try { const { sid, agentState, expectedVersion } = data; // Validate input if (!sid || (typeof agentState !== 'string' && agentState !== null) || typeof expectedVersion !== 'number') { if (callback) { callback({ result: 'error' }); } return; } // Resolve session const session = await db.session.findUnique({ where: { id: sid, accountId: userId } }); if (!session) { callback({ result: 'error' }); return null; } // Check version if (session.agentStateVersion !== expectedVersion) { callback({ result: 'version-mismatch', version: session.agentStateVersion, agentState: session.agentState }); return null; } // Update agent state const { count } = await db.session.updateMany({ where: { id: sid, agentStateVersion: expectedVersion }, data: { agentState: agentState, agentStateVersion: expectedVersion + 1 } }); if (count === 0) { callback({ result: 'version-mismatch', version: session.agentStateVersion, agentState: session.agentState }); return null; } // Generate update const updSeq = await allocateUserSeq(userId); const updContent: PrismaJson.UpdateBody = { t: 'update-session', id: sid, agentState: { value: agentState, version: expectedVersion + 1 } }; // Emit update to connected sockets emitUpdateToInterestedClients({ event: 'update', userId, payload: { id: randomKeyNaked(12), seq: updSeq, body: updContent, createdAt: Date.now() }, recipientFilter: { type: 'all-interested-in-session', sessionId: sid } }); // Send success response with new version via callback callback({ result: 'success', version: expectedVersion + 1, agentState: agentState }); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in update-state: ${error}`); if (callback) { callback({ result: 'error' }); } } }); // Machine metadata update with optimistic concurrency control socket.on('machine-update-metadata', async (data: any, callback: (response: any) => void) => { try { const { machineId, metadata, expectedVersion } = data; // Validate input if (!machineId || typeof metadata !== 'string' || typeof expectedVersion !== 'number') { if (callback) { callback({ result: 'error', message: 'Invalid parameters' }); } return; } // Resolve machine const machine = await db.machine.findFirst({ where: { accountId: userId, id: machineId } }); if (!machine) { if (callback) { callback({ result: 'error', message: 'Machine not found' }); } return; } // Check version if (machine.metadataVersion !== expectedVersion) { callback({ result: 'version-mismatch', version: machine.metadataVersion, metadata: machine.metadata }); return; } // Update metadata with atomic version check const { count } = await db.machine.updateMany({ where: { accountId: userId, id: machineId, metadataVersion: expectedVersion // Atomic CAS }, data: { metadata: metadata, metadataVersion: expectedVersion + 1 // NOT updating active or lastActiveAt here } }); if (count === 0) { // Re-fetch current version const current = await db.machine.findFirst({ where: { accountId: userId, id: machineId } }); callback({ result: 'version-mismatch', version: current?.metadataVersion || 0, metadata: current?.metadata }); return; } // Generate update const updSeq = await allocateUserSeq(userId); const updContent: PrismaJson.UpdateBody = { t: 'update-machine', machineId: machineId, metadata: { value: metadata, version: expectedVersion + 1 } }; // Emit to all connections emitUpdateToInterestedClients({ event: 'update', userId, payload: { id: randomKeyNaked(12), seq: updSeq, body: updContent, createdAt: Date.now() }, recipientFilter: { type: 'all-user-authenticated-connections' } }); // Send success response with new version callback({ result: 'success', version: expectedVersion + 1, metadata: metadata }); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in machine-update-metadata: ${error}`); if (callback) { callback({ result: 'error', message: 'Internal error' }); } } }); // Machine daemon state update with optimistic concurrency control socket.on('machine-update-state', async (data: any, callback: (response: any) => void) => { try { const { machineId, daemonState, expectedVersion } = data; // Validate input if (!machineId || typeof daemonState !== 'string' || typeof expectedVersion !== 'number') { if (callback) { callback({ result: 'error', message: 'Invalid parameters' }); } return; } // Resolve machine const machine = await db.machine.findFirst({ where: { accountId: userId, id: machineId } }); if (!machine) { if (callback) { callback({ result: 'error', message: 'Machine not found' }); } return; } // Check version if (machine.daemonStateVersion !== expectedVersion) { callback({ result: 'version-mismatch', version: machine.daemonStateVersion, daemonState: machine.daemonState }); return; } // Update daemon state with atomic version check const { count } = await db.machine.updateMany({ where: { accountId: userId, id: machineId, daemonStateVersion: expectedVersion // Atomic CAS }, data: { daemonState: daemonState, daemonStateVersion: expectedVersion + 1, active: true, lastActiveAt: new Date() } }); if (count === 0) { // Re-fetch current version const current = await db.machine.findFirst({ where: { accountId: userId, id: machineId } }); callback({ result: 'version-mismatch', version: current?.daemonStateVersion || 0, daemonState: current?.daemonState }); return; } // Generate update const updSeq = await allocateUserSeq(userId); const updContent: PrismaJson.UpdateBody = { t: 'update-machine', machineId: machineId, daemonState: { value: daemonState, version: expectedVersion + 1 } }; // Emit to all connections emitUpdateToInterestedClients({ event: 'update', userId, payload: { id: randomKeyNaked(12), seq: updSeq, body: updContent, createdAt: Date.now() }, recipientFilter: { type: 'all-user-authenticated-connections' } }); // Send success response with new version callback({ result: 'success', version: expectedVersion + 1, daemonState: daemonState }); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in machine-update-state: ${error}`); if (callback) { callback({ result: 'error', message: 'Internal error' }); } } }); // RPC register - Register this socket as a listener for an RPC method socket.on('rpc-register', async (data: any) => { try { const { method } = data; if (!method || typeof method !== 'string') { socket.emit('rpc-error', { type: 'register', error: 'Invalid method name' }); return; } // Get or create user's RPC map let userRpcMap = rpcListeners.get(userId); if (!userRpcMap) { userRpcMap = new Map(); rpcListeners.set(userId, userRpcMap); } // Check if method was already registered const previousSocket = userRpcMap.get(method); if (previousSocket && previousSocket !== socket) { log({ module: 'websocket-rpc' }, `RPC method ${method} re-registered: ${previousSocket.id} -> ${socket.id}`); } // Register this socket as the listener for this method userRpcMap.set(method, socket); socket.emit('rpc-registered', { method }); log({ module: 'websocket-rpc' }, `RPC method registered: ${method} on socket ${socket.id} (user: ${userId})`); log({ module: 'websocket-rpc' }, `Active RPC methods for user ${userId}: ${Array.from(userRpcMap.keys()).join(', ')}`); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in rpc-register: ${error}`); socket.emit('rpc-error', { type: 'register', error: 'Internal error' }); } }); // RPC unregister - Remove this socket as a listener for an RPC method socket.on('rpc-unregister', async (data: any) => { try { const { method } = data; if (!method || typeof method !== 'string') { socket.emit('rpc-error', { type: 'unregister', error: 'Invalid method name' }); return; } const userRpcMap = rpcListeners.get(userId); if (userRpcMap && userRpcMap.get(method) === socket) { userRpcMap.delete(method); log({ module: 'websocket-rpc' }, `RPC method unregistered: ${method} from socket ${socket.id} (user: ${userId})`); if (userRpcMap.size === 0) { rpcListeners.delete(userId); log({ module: 'websocket-rpc' }, `All RPC methods unregistered for user ${userId}`); } else { log({ module: 'websocket-rpc' }, `Remaining RPC methods for user ${userId}: ${Array.from(userRpcMap.keys()).join(', ')}`); } } else { log({ module: 'websocket-rpc' }, `RPC unregister ignored: ${method} not registered on socket ${socket.id}`); } socket.emit('rpc-unregistered', { method }); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in rpc-unregister: ${error}`); socket.emit('rpc-error', { type: 'unregister', error: 'Internal error' }); } }); // RPC call - Call an RPC method on another socket of the same user socket.on('rpc-call', async (data: any, callback: (response: any) => void) => { try { const { method, params } = data; if (!method || typeof method !== 'string') { if (callback) { callback({ ok: false, error: 'Invalid parameters: method is required' }); } return; } // Find the RPC listener for this method within the same user const userRpcMap = rpcListeners.get(userId); if (!userRpcMap) { log({ module: 'websocket-rpc' }, `RPC call failed: No RPC methods registered for user ${userId}`); if (callback) { callback({ ok: false, error: 'No RPC methods registered' }); } return; } const targetSocket = userRpcMap.get(method); if (!targetSocket || !targetSocket.connected) { log({ module: 'websocket-rpc' }, `RPC call failed: Method ${method} not available (disconnected or not registered)`); if (callback) { callback({ ok: false, error: 'RPC method not available' }); } return; } // Don't allow calling your own socket if (targetSocket === socket) { log({ module: 'websocket-rpc' }, `RPC call failed: Attempted self-call on method ${method}`); if (callback) { callback({ ok: false, error: 'Cannot call RPC on the same socket' }); } return; } // Log RPC call initiation const startTime = Date.now(); log({ module: 'websocket-rpc' }, `RPC call initiated: ${socket.id} -> ${method} (target: ${targetSocket.id})`); // Forward the RPC request to the target socket using emitWithAck try { const response = await targetSocket.timeout(30000).emitWithAck('rpc-request', { method, params }); const duration = Date.now() - startTime; log({ module: 'websocket-rpc' }, `RPC call succeeded: ${method} (${duration}ms)`); // Forward the response back to the caller via callback if (callback) { callback({ ok: true, result: response }); } } catch (error) { const duration = Date.now() - startTime; const errorMsg = error instanceof Error ? error.message : 'RPC call failed'; log({ module: 'websocket-rpc' }, `RPC call failed: ${method} - ${errorMsg} (${duration}ms)`); // Timeout or error occurred if (callback) { callback({ ok: false, error: errorMsg }); } } } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in rpc-call: ${error}`); if (callback) { callback({ ok: false, error: 'Internal error' }); } } }); socket.on('ping', async (callback: (response: any) => void) => { try { callback({}); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in ping: ${error}`); } }); // Usage reporting socket.on('usage-report', async (data: any, callback?: (response: any) => void) => { await receiveUsageLock.inLock(async () => { try { const { key, sessionId, tokens, cost } = data; // Validate required fields if (!key || typeof key !== 'string') { if (callback) { callback({ success: false, error: 'Invalid key' }); } return; } // Validate tokens and cost objects if (!tokens || typeof tokens !== 'object' || typeof tokens.total !== 'number') { if (callback) { callback({ success: false, error: 'Invalid tokens object - must include total' }); } return; } if (!cost || typeof cost !== 'object' || typeof cost.total !== 'number') { if (callback) { callback({ success: false, error: 'Invalid cost object - must include total' }); } return; } // Validate sessionId if provided if (sessionId && typeof sessionId !== 'string') { if (callback) { callback({ success: false, error: 'Invalid sessionId' }); } return; } try { // If sessionId provided, verify it belongs to the user if (sessionId) { const session = await db.session.findFirst({ where: { id: sessionId, accountId: userId } }); if (!session) { if (callback) { callback({ success: false, error: 'Session not found' }); } return; } } // Prepare usage data const usageData: PrismaJson.UsageReportData = { tokens, cost }; // Upsert the usage report const report = await db.usageReport.upsert({ where: { accountId_sessionId_key: { accountId: userId, sessionId: sessionId || null, key } }, update: { data: usageData, updatedAt: new Date() }, create: { accountId: userId, sessionId: sessionId || null, key, data: usageData } }); log({ module: 'websocket' }, `Usage report saved: key=${key}, sessionId=${sessionId || 'none'}, userId=${userId}`); // Emit ephemeral update if sessionId is provided if (sessionId) { emitUpdateToInterestedClients({ event: 'ephemeral', userId, payload: { type: 'usage', id: sessionId, key, tokens: usageData.tokens, cost: usageData.cost, timestamp: Date.now() }, recipientFilter: { type: 'user-scoped-only' } }); } if (callback) { callback({ success: true, reportId: report.id, createdAt: report.createdAt.getTime(), updatedAt: report.updatedAt.getTime() }); } } catch (error) { log({ module: 'websocket', level: 'error' }, `Failed to save usage report: ${error}`); if (callback) { callback({ success: false, error: 'Failed to save usage report' }); } } } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in usage-report handler: ${error}`); if (callback) { callback({ success: false, error: 'Internal error' }); } } }); }); socket.emit('auth', { success: true, user: userId }); log({ module: 'websocket' }, `User connected: ${userId}`); }); // End log('API ready on port http://localhost:' + port); onShutdown('api', async () => { await app.close(); }); onShutdown('api', async () => { await io.close(); }); return { app, io }; }