diff --git a/package.json b/package.json index 6beb0ac..078e89a 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,7 @@ "migrate": "dotenv -e .env.dev -- prisma migrate dev", "generate": "prisma generate", "postinstall": "prisma generate", - "db": "docker run -d -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=handy -v postgres-data:/var/lib/postgresql/data -p 5432:5432 postgres", + "db": "docker run -d -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=handy -v $(pwd)/.pgdata:/var/lib/postgresql/data -p 5432:5432 postgres", "redis": "docker run -d -p 6379:6379 redis" }, "devDependencies": { diff --git a/sources/app/api.ts b/sources/app/api.ts index 518573f..9747fee 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -14,10 +14,10 @@ import { AsyncLock } from "@/utils/lock"; // Recipient filter types -type RecipientFilter = - | { type: 'all-interested-in-session'; sessionId: string } - | { type: 'user-scoped-only' } - | { type: 'all-user-authenticated-connections' }; +type RecipientFilter = + | { type: 'all-interested-in-session'; sessionId: string } + | { type: 'user-scoped-only' } + | { type: 'all-user-authenticated-connections' }; // Connection metadata types interface SessionScopedConnection { @@ -108,7 +108,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> log({ module: 'auth-decorator' }, `Auth failed - user not found: ${verified.user}`); return reply.code(401).send({ error: 'User not found' }); } - + log({ module: 'auth-decorator' }, `Auth success - user: ${user.id}`); request.user = user; @@ -118,12 +118,12 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }); // Send session update to all relevant connections - let emitUpdateToInterestedClients = ({ - event, - userId, - payload, + let emitUpdateToInterestedClients = ({ + event, + userId, + payload, recipientFilter = { type: 'all-user-authenticated-connections' }, - skipSenderConnection + skipSenderConnection }: { event: string, userId: string, @@ -156,13 +156,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } // user-scoped always gets it break; - + case 'user-scoped-only': if (connection.connectionType !== 'user-scoped') { continue; } break; - + case 'all-user-authenticated-connections': // Send to all connection types (default behavior) break; @@ -232,7 +232,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> const publicKeyHex = privacyKit.encodeHex(publicKey); log({ module: 'auth-request' }, `Terminal auth request - publicKey hex: ${publicKeyHex}`); - + const answer = await db.terminalAuthRequest.upsert({ where: { publicKey: publicKeyHex }, update: {}, @@ -310,11 +310,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> // Check if OpenAI API key is configured on server const OPENAI_API_KEY = process.env.OPENAI_API_KEY; if (!OPENAI_API_KEY) { - return reply.code(500).send({ - error: 'OpenAI API key not configured on server' + return reply.code(500).send({ + error: 'OpenAI API key not configured on server' }); } - + // Generate ephemeral token from OpenAI const response = await fetch('https://api.openai.com/v1/realtime/sessions', { method: 'POST', @@ -327,11 +327,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> voice: 'verse', }), }); - + if (!response.ok) { throw new Error(`OpenAI API error: ${response.status}`); } - + const data = await response.json() as { client_secret: { value: string; @@ -339,14 +339,14 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }; id: string; }; - + return reply.send({ token: data.client_secret.value }); } catch (error) { log({ module: 'openai', level: 'error' }, 'Failed to generate ephemeral token', error); - return reply.code(500).send({ - error: 'Failed to generate ephemeral token' + return reply.code(500).send({ + error: 'Failed to generate ephemeral token' }); } }); @@ -391,7 +391,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> const lastMessage = v.messages[0]; const sessionUpdatedAt = v.updatedAt.getTime(); const lastMessageCreatedAt = lastMessage ? lastMessage.createdAt.getTime() : 0; - + return { id: v.id, seq: v.seq, @@ -490,9 +490,9 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> body: updContent, createdAt: Date.now() }; - logger.info({ - module: 'session-create', - userId, + logger.info({ + module: 'session-create', + userId, sessionId: session.id, updateType: 'new-session', updatePayload: JSON.stringify(updatePayload) @@ -940,30 +940,9 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }); }); - // Machines API - typed.get('/v1/machines', { - preHandler: app.authenticate, - }, async (request, reply) => { - const userId = request.user.id; - - const machines = await db.machine.findMany({ - where: { accountId: userId }, - orderBy: { lastActiveAt: 'desc' } - }); - - return machines.map(m => ({ - id: m.id, - metadata: m.metadata, - metadataVersion: m.metadataVersion, - seq: m.seq, - active: m.active, - lastActiveAt: m.lastActiveAt.getTime(), - createdAt: m.createdAt.getTime(), - updatedAt: m.updatedAt.getTime() - })); - }); + // Machines - // POST /v1/machines - Create or update machine + // POST /v1/machines - Create machine or return existing typed.post('/v1/machines', { preHandler: app.authenticate, schema: { @@ -975,67 +954,139 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }, async (request, reply) => { const userId = request.user.id; const { id, metadata } = request.body; - - logger.info({ module: 'machines', machineId: id, userId, hasMetadata: !!metadata }, 'Creating/updating machine'); - - try { - const machine = await db.machine.upsert({ + + // Check if machine exists (like sessions do) + const machine = await db.machine.findFirst({ where: { - accountId_id: { - accountId: userId, - id - } - }, - create: { - id, accountId: userId, - metadata, - metadataVersion: 1 - }, - update: { - metadata, - metadataVersion: { increment: 1 }, - active: true, - lastActiveAt: new Date() + id: id } - }); - - // Emit update to all user connections - const updSeq = await allocateUserSeq(userId); - emitUpdateToInterestedClients({ - event: 'update', - userId, - payload: { - id: randomKeyNaked(), - seq: updSeq, - body: { - t: 'update-machine', + }); + + if (machine) { + // Machine exists - just return it + logger.info({ module: 'machines', machineId: id, userId }, 'Found existing machine'); + return reply.send({ + machine: { id: machine.id, - metadata: { - version: machine.metadataVersion, - value: metadata - } - }, - createdAt: Date.now() - } + metadata: machine.metadata, + metadataVersion: machine.metadataVersion, + active: machine.active, + lastActiveAt: machine.lastActiveAt.getTime(), + createdAt: machine.createdAt.getTime(), + updatedAt: machine.updatedAt.getTime() + } }); - - return { success: true }; - } catch (error) { - logger.error({ - module: 'machines', - machineId: id, - userId, - error: error instanceof Error ? error.message : String(error), - errorStack: error instanceof Error ? error.stack : undefined - }, 'Failed to create/update machine'); - return reply.code(500).send({ - error: 'Failed to create/update machine', - details: error instanceof Error ? error.message : String(error) + } else { + // Create new machine + logger.info({ module: 'machines', machineId: id, userId }, 'Creating new machine'); + + const newMachine = await db.machine.create({ + data: { + id, + accountId: userId, + metadata, + metadataVersion: 1 + // active defaults to true in schema + // lastActiveAt defaults to now() in schema + } + }); + + // Emit update for new machine + const updSeq = await allocateUserSeq(userId); + emitUpdateToInterestedClients({ + event: 'update', + userId, + payload: { + id: randomKeyNaked(), + seq: updSeq, + body: { + t: 'update-machine', + id: newMachine.id, + metadata: { + version: 1, + value: metadata + } + }, + createdAt: Date.now() + } + }); + + return reply.send({ + machine: { + id: newMachine.id, + metadata: newMachine.metadata, + metadataVersion: 1, + active: newMachine.active, + lastActiveAt: newMachine.lastActiveAt.getTime(), + createdAt: newMachine.createdAt.getTime(), + updatedAt: newMachine.updatedAt.getTime() + } }); } }); + + // Machines API + typed.get('/v1/machines', { + preHandler: app.authenticate, + }, async (request, reply) => { + const userId = request.user.id; + + const machines = await db.machine.findMany({ + where: { accountId: userId }, + orderBy: { lastActiveAt: 'desc' } + }); + + return machines.map(m => ({ + id: m.id, + metadata: m.metadata, + metadataVersion: m.metadataVersion, + seq: m.seq, + active: m.active, + activeAt: m.lastActiveAt.getTime(), + createdAt: m.createdAt.getTime(), + updatedAt: m.updatedAt.getTime() + })); + }); + + // GET /v1/machines/:id - Get single machine by ID + typed.get('/v1/machines/:id', { + preHandler: app.authenticate, + schema: { + params: z.object({ + id: z.string() + }) + } + }, async (request, reply) => { + const userId = request.user.id; + const { id } = request.params; + + const machine = await db.machine.findFirst({ + where: { + accountId: userId, + id: id + } + }); + + if (!machine) { + return reply.code(404).send({ error: 'Machine not found' }); + } + + return { + machine: { + id: machine.id, + metadata: machine.metadata, + metadataVersion: machine.metadataVersion, + seq: machine.seq, + active: machine.active, + activeAt: machine.lastActiveAt.getTime(), + createdAt: machine.createdAt.getTime(), + updatedAt: machine.updatedAt.getTime() + } + }; + }); + // Combined logging endpoint (only when explicitly enabled) if (process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING) { typed.post('/logs-combined-from-cli-and-mobile-for-simple-ai-debugging', { @@ -1051,22 +1102,22 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }, async (request, reply) => { const { timestamp, level, message, source, platform } = request.body; - + // Log ONLY to separate remote logger (file only, no console) const logData = { source, platform, timestamp }; - + // Use the file-only logger if available const { fileConsolidatedLogger } = await import('@/utils/log'); - + if (!fileConsolidatedLogger) { // Should never happen since we check env var above, but be safe return reply.send({ success: true }); } - + switch (level.toLowerCase()) { case 'error': fileConsolidatedLogger.error(logData, message); @@ -1081,7 +1132,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> default: fileConsolidatedLogger.info(logData, message); } - + return reply.send({ success: true }); }); } @@ -1146,7 +1197,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> socket.disconnect(); return; } - + // Validate machine-scoped clients have machineId if (clientType === 'machine-scoped' && !machineId) { log({ module: 'websocket' }, `Machine-scoped client missing machineId`); @@ -1194,7 +1245,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> userIdToClientConnections.set(userId, new Set()); } userIdToClientConnections.get(userId)!.add(connection); - + // Broadcast daemon online status if (connection.connectionType === 'machine-scoped') { // Broadcast daemon online @@ -1248,7 +1299,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } log({ module: 'websocket' }, `User disconnected: ${userId}`); - + // Broadcast daemon offline status if (connection.connectionType === 'machine-scoped') { emitUpdateToInterestedClients({ @@ -1285,7 +1336,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } const { sid, thinking } = data; - + // Resolve session const session = await db.session.findUnique({ where: { id: sid, accountId: userId } @@ -1336,49 +1387,45 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> return; } - const machineId = data.machineId; - - // Update machine lastActiveAt in database - const machine = await db.machine.update({ + // Resolve machine + const machine = await db.machine.findUnique({ where: { accountId_id: { accountId: userId, - id: machineId + id: data.machineId + } + } + }); + + if (!machine) { + return; + } + + // Update machine lastActiveAt in database + const updatedMachine = await db.machine.update({ + where: { + accountId_id: { + accountId: userId, + id: data.machineId } }, data: { lastActiveAt: new Date(t), active: true } - }).catch(() => { - // Machine might not exist yet, that's ok - return null; - }); + }) - // If machine was updated, emit update to all user connections - if (machine) { - const updSeq = await allocateUserSeq(userId); - emitUpdateToInterestedClients({ - event: 'update', - userId, - payload: { - id: randomKeyNaked(12), - seq: updSeq, - body: { - t: 'update-machine', - id: machine.id, - metadata: machine.metadata ? { - version: machine.metadataVersion, - value: machine.metadata - } : undefined, - active: true, - lastActiveAt: t - }, - createdAt: Date.now() - }, - recipientFilter: { type: 'all-user-authenticated-connections' } - }); - } + emitUpdateToInterestedClients({ + event: 'ephemeral', + userId, + payload: { + type: 'machine-activity', + id: updatedMachine.id, + active: true, + lastActiveAt: t, + }, + recipientFilter: { type: 'user-scoped-only' } + }); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in machine-alive: ${error}`); } @@ -1660,50 +1707,108 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }); - // Update machine through socket - socket.on('update-machine', async (data: { metadata: string }) => { - if (connection.connectionType !== 'machine-scoped') { - return; // Only machines can update their own metadata - } - - const machineId = connection.machineId; - + // Machine metadata update with optimistic concurrency control + socket.on('machine-update-metadata', async (data: any, callback: (response: any) => void) => { try { - const machine = await db.machine.update({ + const { machineId, metadata, expectedVersion } = data; + + // Validate input + if (!machineId || typeof metadata !== 'string' || typeof expectedVersion !== 'number') { + if (callback) { + callback({ result: 'error', message: 'Invalid parameters' }); + } + return; + } + + // Resolve machine + const machine = await db.machine.findFirst({ where: { - accountId_id: { + accountId: userId, + id: machineId + } + }); + if (!machine) { + if (callback) { + callback({ result: 'error', message: 'Machine not found' }); + } + return; + } + + // Check version + if (machine.metadataVersion !== expectedVersion) { + callback({ + result: 'version-mismatch', + version: machine.metadataVersion, + metadata: machine.metadata + }); + return; + } + + // Update metadata with atomic version check + const { count } = await db.machine.updateMany({ + where: { + accountId: userId, + id: machineId, + metadataVersion: expectedVersion // Atomic CAS + }, + data: { + metadata: metadata, + metadataVersion: expectedVersion + 1 + // NOT updating active or lastActiveAt here + } + }); + + if (count === 0) { + // Re-fetch current version + const current = await db.machine.findFirst({ + where: { accountId: userId, id: machineId } - }, - data: { - metadata: data.metadata, - metadataVersion: { increment: 1 } - } - }); - - // Emit to other connections + }); + callback({ + result: 'version-mismatch', + version: current?.metadataVersion || 0, + metadata: current?.metadata + }); + return; + } + + // Generate update const updSeq = await allocateUserSeq(userId); + const updContent: PrismaJson.UpdateBody = { + t: 'update-machine', + id: machineId, + metadata: { + value: metadata, + version: expectedVersion + 1 + } + }; + + // Emit to all connections emitUpdateToInterestedClients({ event: 'update', userId, payload: { - id: randomKeyNaked(), + id: randomKeyNaked(12), seq: updSeq, - body: { - t: 'update-machine', - id: machineId, - metadata: { - version: machine.metadataVersion, - value: data.metadata - } - }, + body: updContent, createdAt: Date.now() }, - skipSenderConnection: connection + recipientFilter: { type: 'all-user-authenticated-connections' } + }); + + // Send success response with new version + callback({ + result: 'success', + version: expectedVersion + 1, + metadata: metadata }); } catch (error) { - log({ module: 'websocket', level: 'error' }, `Error updating machine metadata: ${error}`); + log({ module: 'websocket', level: 'error' }, `Error in machine-update-metadata: ${error}`); + if (callback) { + callback({ result: 'error', message: 'Internal error' }); + } } }); diff --git a/sources/storage/types.ts b/sources/storage/types.ts index eb12f66..30998c3 100644 --- a/sources/storage/types.ts +++ b/sources/storage/types.ts @@ -60,6 +60,14 @@ declare global { value: string | null; version: number; } | null | undefined; + } | { + t: 'update-machine'; + id: string; + metadata?: { + value: string; + version: number; + }; + activeAt?: number; }; } }