From a4bc4d34e811e015e0b7b7967c288ff501c2a5b0 Mon Sep 17 00:00:00 2001 From: Kirill Dubovitskiy Date: Tue, 12 Aug 2025 03:30:23 -0700 Subject: [PATCH 1/9] feat: Add machine persistence to database MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add Machine model to Prisma schema • Create /v1/machines endpoints for CRUD operations • Persist machine metadata and track active status • Update socket handlers for machine-scoped connections • Convert ephemeral machine status to database persistence 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../migration.sql | 23 ++ prisma/schema.prisma | 21 + sources/app/api.ts | 364 ++++++++++++++---- 3 files changed, 331 insertions(+), 77 deletions(-) create mode 100644 prisma/migrations/20250812092041_add_machine_model/migration.sql diff --git a/prisma/migrations/20250812092041_add_machine_model/migration.sql b/prisma/migrations/20250812092041_add_machine_model/migration.sql new file mode 100644 index 0000000..1e06047 --- /dev/null +++ b/prisma/migrations/20250812092041_add_machine_model/migration.sql @@ -0,0 +1,23 @@ +-- CreateTable +CREATE TABLE "Machine" ( + "id" TEXT NOT NULL, + "accountId" TEXT NOT NULL, + "metadata" TEXT NOT NULL, + "metadataVersion" INTEGER NOT NULL DEFAULT 0, + "seq" INTEGER NOT NULL DEFAULT 0, + "active" BOOLEAN NOT NULL DEFAULT true, + "lastActiveAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "Machine_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "Machine_accountId_idx" ON "Machine"("accountId"); + +-- CreateIndex +CREATE UNIQUE INDEX "Machine_accountId_id_key" ON "Machine"("accountId", "id"); + +-- AddForeignKey +ALTER TABLE "Machine" ADD CONSTRAINT "Machine_accountId_fkey" FOREIGN KEY ("accountId") REFERENCES "Account"("id") ON DELETE RESTRICT ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 535b078..d494f7f 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -30,6 +30,7 @@ model Account { AccountPushToken AccountPushToken[] TerminalAuthRequest TerminalAuthRequest[] UsageReport UsageReport[] + Machine Machine[] } model TerminalAuthRequest { @@ -137,3 +138,23 @@ model UsageReport { @@index([accountId]) @@index([sessionId]) } + +// +// Machines +// + +model Machine { + id String @id + accountId String + account Account @relation(fields: [accountId], references: [id]) + metadata String // Encrypted - contains ALL machine info + metadataVersion Int @default(0) + seq Int @default(0) + active Boolean @default(true) + lastActiveAt DateTime @default(now()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@unique([accountId, id]) + @@index([accountId]) +} diff --git a/sources/app/api.ts b/sources/app/api.ts index 6892a26..29b5d2b 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -12,6 +12,35 @@ import { allocateSessionSeq, allocateUserSeq } from "@/services/seq"; import { randomKeyNaked } from "@/utils/randomKeyNaked"; import { AsyncLock } from "@/utils/lock"; +// Session alive event types +type SessionAliveEvent = + | { + type: 'session-scoped'; + sid: string; + time: number; + thinking: boolean; + mode: 'local' | 'remote'; + } + | { + type: 'machine-scoped'; + machineId: string; + time: number; + } + | { + // Legacy format (no type field) - defaults to session-scoped + sid: string; + time: number; + thinking: boolean; + mode?: 'local' | 'remote'; + type?: undefined; + }; + +// Recipient filter types +type RecipientFilter = + | { type: 'all-interested-in-session'; sessionId: string } + | { type: 'user-scoped-only' } + | { type: 'all-user-authenticated-connections' }; + // Connection metadata types interface SessionScopedConnection { connectionType: 'session-scoped'; @@ -105,11 +134,17 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }); // Send session update to all relevant connections - let emitUpdateToInterestedClients = ({ event, userId, sessionId, payload, skipSenderConnection }: { + let emitUpdateToInterestedClients = ({ + event, + userId, + payload, + recipientFilter = { type: 'all-user-authenticated-connections' }, + skipSenderConnection + }: { event: string, userId: string, - sessionId: string, payload: any, + recipientFilter?: RecipientFilter, skipSenderConnection?: ClientConnection }) => { const connections = userIdToClientConnections.get(userId); @@ -124,27 +159,33 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> continue; } - // Send to all user-scoped connections - we already matched user - if (connection.connectionType === 'user-scoped') { - log({ module: 'websocket' }, `Sending ${event} to user-scoped connection ${connection.socket.id}`); - connection.socket.emit(event, payload); + // Apply recipient filter + switch (recipientFilter.type) { + case 'all-interested-in-session': + // Send to session-scoped with matching session + all user-scoped + if (connection.connectionType === 'session-scoped') { + if (connection.sessionId !== recipientFilter.sessionId) { + continue; // Wrong session + } + } else if (connection.connectionType === 'machine-scoped') { + continue; // Machines don't need session updates + } + // user-scoped always gets it + break; + + case 'user-scoped-only': + if (connection.connectionType !== 'user-scoped') { + continue; + } + break; + + case 'all-user-authenticated-connections': + // Send to all connection types (default behavior) + break; } - // Send to all session-scoped connections, only that match sessionId - if (connection.connectionType === 'session-scoped') { - const matches = connection.sessionId === sessionId; - log({ module: 'websocket' }, `Session-scoped connection ${connection.socket.id}: sessionId=${connection.sessionId}, messageSessionId=${sessionId}, matches=${matches}`); - if (matches) { - log({ module: 'websocket' }, `Sending ${event} to session-scoped connection ${connection.socket.id}`); - connection.socket.emit(event, payload); - } - } - - // Send to all machine-scoped connections - they get all user updates - if (connection.connectionType === 'machine-scoped') { - log({ module: 'websocket' }, `Sending ${event} to machine-scoped connection ${connection.socket.id}`); - connection.socket.emit(event, payload); - } + log({ module: 'websocket' }, `Sending ${event} to ${connection.connectionType} connection ${connection.socket.id}`); + connection.socket.emit(event, payload); } } @@ -445,13 +486,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'update', userId, - sessionId: session.id, payload: { id: randomKeyNaked(12), seq: updSeq, body: updContent, createdAt: Date.now() - } + }, + recipientFilter: { type: 'all-user-authenticated-connections' } }); return reply.send({ @@ -679,18 +720,18 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }; - // Get all user connections (not session-specific) - const connections = userIdToClientConnections.get(userId); - if (connections) { - for (const connection of connections) { - connection.socket.emit('update', { - id: randomKeyNaked(12), - seq: updSeq, - body: updContent, - createdAt: Date.now() - }); - } - } + // Send to all user connections + emitUpdateToInterestedClients({ + event: 'update', + userId, + payload: { + id: randomKeyNaked(12), + seq: updSeq, + body: updContent, + createdAt: Date.now() + }, + recipientFilter: { type: 'all-user-authenticated-connections' } + }); return reply.send({ success: true, @@ -890,6 +931,86 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }); }); + // Machines API + typed.get('/v1/machines', { + preHandler: app.authenticate, + }, async (request, reply) => { + const userId = request.user.id; + + const machines = await db.machine.findMany({ + where: { accountId: userId }, + orderBy: { lastActiveAt: 'desc' } + }); + + return machines.map(m => ({ + id: m.id, + metadata: m.metadata, + metadataVersion: m.metadataVersion, + seq: m.seq, + active: m.active, + lastActiveAt: m.lastActiveAt.getTime(), + createdAt: m.createdAt.getTime(), + updatedAt: m.updatedAt.getTime() + })); + }); + + // POST /v1/machines - Create or update machine + typed.post('/v1/machines', { + preHandler: app.authenticate, + schema: { + body: z.object({ + id: z.string(), + metadata: z.string() // Encrypted metadata + }) + } + }, async (request, reply) => { + const userId = request.user.id; + const { id, metadata } = request.body; + + const machine = await db.machine.upsert({ + where: { + accountId_id: { + accountId: userId, + id + } + }, + create: { + id, + accountId: userId, + metadata, + metadataVersion: 1 + }, + update: { + metadata, + metadataVersion: { increment: 1 }, + active: true, + lastActiveAt: new Date() + } + }); + + // Emit update to all user connections + const updSeq = await allocateUserSeq(userId); + emitUpdateToInterestedClients({ + event: 'update', + userId, + payload: { + id: randomKeyNaked(), + seq: updSeq, + body: { + t: 'update-machine', + id: machine.id, + metadata: { + version: machine.metadataVersion, + value: metadata + } + }, + createdAt: Date.now() + } + }); + + return { success: true }; + }); + // Start const port = process.env.PORT ? parseInt(process.env.PORT, 10) : 3005; await app.listen({ port, host: '0.0.0.0' }); @@ -999,13 +1120,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'ephemeral', userId, - sessionId: '', // No specific session payload: { type: 'daemon-status', machineId, status: 'online', timestamp: Date.now() - } + }, + recipientFilter: { type: 'user-scoped-only' } }); } @@ -1052,58 +1173,100 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'ephemeral', userId, - sessionId: '', // No specific session payload: { type: 'daemon-status', machineId: connection.machineId, status: 'offline', timestamp: Date.now() - } + }, + recipientFilter: { type: 'user-scoped-only' } }); } }); - socket.on('session-alive', async (data: any) => { + socket.on('session-alive', async (data: SessionAliveEvent) => { try { - const { sid, time, thinking } = data; - let t = time; - if (typeof t !== 'number') { + // Basic validation + if (!data || typeof data.time !== 'number') { return; } + + let t = data.time; if (t > Date.now()) { t = Date.now(); } - if (t < Date.now() - 1000 * 60 * 10) { // Ignore if time is in the past 10 minutes + if (t < Date.now() - 1000 * 60 * 10) { return; } - // Resolve session - const session = await db.session.findUnique({ - where: { id: sid, accountId: userId } - }); - if (!session) { - return; + // Determine type (default to session-scoped for legacy) + const eventType = data.type || 'session-scoped'; + + // Validate but CONTINUE with warning + if (eventType === 'machine-scoped' && connection.connectionType !== 'machine-scoped') { + log({ module: 'websocket', level: 'warn' }, + `Connection type mismatch: ${connection.connectionType} sending machine-scoped alive`); + // CONTINUE ANYWAY + } + if (eventType === 'session-scoped' && connection.connectionType === 'machine-scoped') { + log({ module: 'websocket', level: 'warn' }, + `Connection type mismatch: ${connection.connectionType} sending session-scoped alive`); + // CONTINUE ANYWAY } - // Update last active at - await db.session.update({ - where: { id: sid }, - data: { lastActiveAt: new Date(t), active: true } - }); - - // Emit update to connected sockets - emitUpdateToInterestedClients({ - event: 'ephemeral', - userId, - sessionId: sid, - payload: { - type: 'activity', - id: sid, - active: true, - activeAt: t, - thinking + // Handle based on type + if (eventType === 'machine-scoped' && 'machineId' in data) { + // Machine heartbeat - update database instead of ephemeral + const machineId = connection.connectionType === 'machine-scoped' ? connection.machineId : data.machineId; + + // Update machine lastActiveAt in database + await db.machine.update({ + where: { + accountId_id: { + accountId: userId, + id: machineId + } + }, + data: { + lastActiveAt: new Date(t), + active: true + } + }).catch(() => { + // Machine might not exist yet, that's ok + }); + + } else if ('sid' in data) { + // Session heartbeat (legacy or explicit session-scoped) + const { sid, thinking } = data; + + // Resolve session + const session = await db.session.findUnique({ + where: { id: sid, accountId: userId } + }); + if (!session) { + return; } - }); + + // Update last active + await db.session.update({ + where: { id: sid }, + data: { lastActiveAt: new Date(t), active: true } + }); + + // Emit update + emitUpdateToInterestedClients({ + event: 'ephemeral', + userId, + payload: { + type: 'activity', + id: sid, + active: true, + activeAt: t, + thinking: thinking || false + }, + recipientFilter: { type: 'all-user-authenticated-connections' } + }); + } } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in session-alive: ${error}`); } @@ -1141,14 +1304,14 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'ephemeral', userId, - sessionId: sid, payload: { type: 'activity', id: sid, active: false, activeAt: t, thinking: false - } + }, + recipientFilter: { type: 'all-user-authenticated-connections' } }); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in session-end: ${error}`); @@ -1219,13 +1382,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'update', userId, - sessionId: sid, payload: { id: randomKeyNaked(12), seq: updSeq, body: update, createdAt: Date.now() }, + recipientFilter: { type: 'all-interested-in-session', sessionId: sid }, skipSenderConnection: connection }); } catch (error) { @@ -1286,13 +1449,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'update', userId, - sessionId: sid, payload: { id: randomKeyNaked(12), seq: updSeq, body: updContent, createdAt: Date.now() - } + }, + recipientFilter: { type: 'all-interested-in-session', sessionId: sid } }); // Send success response with new version via callback @@ -1363,13 +1526,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'update', userId, - sessionId: sid, payload: { id: randomKeyNaked(12), seq: updSeq, body: updContent, createdAt: Date.now() - } + }, + recipientFilter: { type: 'all-interested-in-session', sessionId: sid } }); // Send success response with new version via callback @@ -1382,6 +1545,53 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }); + // Update machine metadata through socket + socket.on('update-machine-metadata', async (data: { metadata: string }) => { + if (connection.connectionType !== 'machine-scoped') { + return; // Only machines can update their own metadata + } + + const machineId = connection.machineId; + + try { + const machine = await db.machine.update({ + where: { + accountId_id: { + accountId: userId, + id: machineId + } + }, + data: { + metadata: data.metadata, + metadataVersion: { increment: 1 } + } + }); + + // Emit to other connections + const updSeq = await allocateUserSeq(userId); + emitUpdateToInterestedClients({ + event: 'update', + userId, + payload: { + id: randomKeyNaked(), + seq: updSeq, + body: { + t: 'update-machine', + id: machineId, + metadata: { + version: machine.metadataVersion, + value: data.metadata + } + }, + createdAt: Date.now() + }, + skipSenderConnection: connection + }); + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Error updating machine metadata: ${error}`); + } + }); + // RPC register - Register this socket as a listener for an RPC method socket.on('rpc-register', async (data: any) => { try { @@ -1644,7 +1854,6 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'ephemeral', userId, - sessionId, payload: { type: 'usage', id: sessionId, @@ -1652,7 +1861,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> tokens: usageData.tokens, cost: usageData.cost, timestamp: Date.now() - } + }, + recipientFilter: { type: 'user-scoped-only' } }); } From 597d1d262a82ae77c0be22f3292008790c4bce19 Mon Sep 17 00:00:00 2001 From: Kirill Dubovitskiy Date: Tue, 12 Aug 2025 21:51:18 -0700 Subject: [PATCH 2/9] refactor: separate session-alive and machine-alive socket events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove SessionAliveEvent type and simplify event handling - session-alive now only handles session heartbeats (requires sid) - Add new machine-alive event for daemon heartbeats (requires machineId) - Remove type field and coupling between session and machine events - Add proper TypeScript types instead of using 'any' 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- sources/app/api.ts | 165 +++++++++++++++++++++------------------------ 1 file changed, 76 insertions(+), 89 deletions(-) diff --git a/sources/app/api.ts b/sources/app/api.ts index 29b5d2b..2852422 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -12,28 +12,6 @@ import { allocateSessionSeq, allocateUserSeq } from "@/services/seq"; import { randomKeyNaked } from "@/utils/randomKeyNaked"; import { AsyncLock } from "@/utils/lock"; -// Session alive event types -type SessionAliveEvent = - | { - type: 'session-scoped'; - sid: string; - time: number; - thinking: boolean; - mode: 'local' | 'remote'; - } - | { - type: 'machine-scoped'; - machineId: string; - time: number; - } - | { - // Legacy format (no type field) - defaults to session-scoped - sid: string; - time: number; - thinking: boolean; - mode?: 'local' | 'remote'; - type?: undefined; - }; // Recipient filter types type RecipientFilter = @@ -1184,10 +1162,15 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }); - socket.on('session-alive', async (data: SessionAliveEvent) => { + socket.on('session-alive', async (data: { + sid: string; + time: number; + thinking?: boolean; + mode?: 'local' | 'remote'; + }) => { try { // Basic validation - if (!data || typeof data.time !== 'number') { + if (!data || typeof data.time !== 'number' || !data.sid) { return; } @@ -1199,80 +1182,84 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> return; } - // Determine type (default to session-scoped for legacy) - const eventType = data.type || 'session-scoped'; + const { sid, thinking } = data; - // Validate but CONTINUE with warning - if (eventType === 'machine-scoped' && connection.connectionType !== 'machine-scoped') { - log({ module: 'websocket', level: 'warn' }, - `Connection type mismatch: ${connection.connectionType} sending machine-scoped alive`); - // CONTINUE ANYWAY - } - if (eventType === 'session-scoped' && connection.connectionType === 'machine-scoped') { - log({ module: 'websocket', level: 'warn' }, - `Connection type mismatch: ${connection.connectionType} sending session-scoped alive`); - // CONTINUE ANYWAY + // Resolve session + const session = await db.session.findUnique({ + where: { id: sid, accountId: userId } + }); + if (!session) { + return; } - // Handle based on type - if (eventType === 'machine-scoped' && 'machineId' in data) { - // Machine heartbeat - update database instead of ephemeral - const machineId = connection.connectionType === 'machine-scoped' ? connection.machineId : data.machineId; - - // Update machine lastActiveAt in database - await db.machine.update({ - where: { - accountId_id: { - accountId: userId, - id: machineId - } - }, - data: { - lastActiveAt: new Date(t), - active: true - } - }).catch(() => { - // Machine might not exist yet, that's ok - }); - - } else if ('sid' in data) { - // Session heartbeat (legacy or explicit session-scoped) - const { sid, thinking } = data; - - // Resolve session - const session = await db.session.findUnique({ - where: { id: sid, accountId: userId } - }); - if (!session) { - return; - } + // Update last active + await db.session.update({ + where: { id: sid }, + data: { lastActiveAt: new Date(t), active: true } + }); - // Update last active - await db.session.update({ - where: { id: sid }, - data: { lastActiveAt: new Date(t), active: true } - }); - - // Emit update - emitUpdateToInterestedClients({ - event: 'ephemeral', - userId, - payload: { - type: 'activity', - id: sid, - active: true, - activeAt: t, - thinking: thinking || false - }, - recipientFilter: { type: 'all-user-authenticated-connections' } - }); - } + // Emit update + emitUpdateToInterestedClients({ + event: 'ephemeral', + userId, + payload: { + type: 'activity', + id: sid, + active: true, + activeAt: t, + thinking: thinking || false + }, + recipientFilter: { type: 'all-user-authenticated-connections' } + }); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in session-alive: ${error}`); } }); - socket.on('session-end', async (data: any) => { + socket.on('machine-alive', async (data: { + machineId: string; + time: number; + }) => { + try { + // Basic validation + if (!data || typeof data.time !== 'number' || !data.machineId) { + return; + } + + let t = data.time; + if (t > Date.now()) { + t = Date.now(); + } + if (t < Date.now() - 1000 * 60 * 10) { + return; + } + + const machineId = data.machineId; + + // Update machine lastActiveAt in database + await db.machine.update({ + where: { + accountId_id: { + accountId: userId, + id: machineId + } + }, + data: { + lastActiveAt: new Date(t), + active: true + } + }).catch(() => { + // Machine might not exist yet, that's ok + }); + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Error in machine-alive: ${error}`); + } + }); + + socket.on('session-end', async (data: { + sid: string; + time: number; + }) => { try { const { sid, time } = data; let t = time; From 4212d2725e8dec42172ebbab501820bf12182360 Mon Sep 17 00:00:00 2001 From: Kirill Dubovitskiy Date: Wed, 13 Aug 2025 00:29:08 -0700 Subject: [PATCH 3/9] chore: remove unused mode parameter from session-alive handler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The mode parameter is not used by the server, only tracked client-side. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- sources/app/api.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/sources/app/api.ts b/sources/app/api.ts index 2852422..bf335fb 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -1166,7 +1166,6 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> sid: string; time: number; thinking?: boolean; - mode?: 'local' | 'remote'; }) => { try { // Basic validation From 4006d63232a55e3fa9c31eefe071ef9b39136f61 Mon Sep 17 00:00:00 2001 From: Kirill Dubovitskiy Date: Wed, 13 Aug 2025 01:18:50 -0700 Subject: [PATCH 4/9] refactor: rename update-machine-metadata to update-machine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Consistent naming with the update body type (t: 'update-machine') - Shorter and cleaner event name - Matches the pattern used elsewhere in the codebase 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- sources/app/api.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sources/app/api.ts b/sources/app/api.ts index bf335fb..a3ed823 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -1531,8 +1531,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }); - // Update machine metadata through socket - socket.on('update-machine-metadata', async (data: { metadata: string }) => { + // Update machine through socket + socket.on('update-machine', async (data: { metadata: string }) => { if (connection.connectionType !== 'machine-scoped') { return; // Only machines can update their own metadata } From 3a804f24c8975fe8ddf33736c8d5eee8ab0c735c Mon Sep 17 00:00:00 2001 From: Kirill Dubovitskiy Date: Fri, 15 Aug 2025 19:13:45 -0700 Subject: [PATCH 5/9] devx: dangerous logging endpoint --- .env.example => .env.dev | 3 + CLAUDE.md | 95 ++++++++++++++++++++++++++- package.json | 6 +- sources/app/api.ts | 135 ++++++++++++++++++++++++++++++++++----- sources/utils/log.ts | 72 +++++++++++++++++++-- 5 files changed, 284 insertions(+), 27 deletions(-) rename .env.example => .env.dev (50%) diff --git a/.env.example b/.env.dev similarity index 50% rename from .env.example rename to .env.dev index 9277a2e..0d31f60 100644 --- a/.env.example +++ b/.env.dev @@ -1,3 +1,6 @@ DATABASE_URL=postgresql://postgres:postgres@localhost:5432/handy HANDY_MASTER_SECRET=your-super-secret-key-for-local-development PORT=3005 + +# Uncomment to enable centralized logging for AI debugging (creates .logs directory) +DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING=true diff --git a/CLAUDE.md b/CLAUDE.md index fb2e7b6..d95a9a4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -175,4 +175,97 @@ The project includes a multi-stage Dockerfile: 3. ALWAYS prefer editing an existing file to creating a new one 4. NEVER proactively create documentation files (*.md) or README files unless explicitly requested 5. Use 4 spaces for tabs (not 2 spaces) -6. Use yarn instead of npm for all package management \ No newline at end of file +6. Use yarn instead of npm for all package management + +## Debugging Notes + +### Remote Logging Setup +- Use `DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING=true` env var to enable +- Server logs to `.logs/` directory with timestamped files (format: `MM-DD-HH-MM-SS.log`) +- Mobile and CLI send logs to `/logs-combined-from-cli-and-mobile-for-simple-ai-debugging` endpoint + +### Common Issues & Tells + +#### Socket/Connection Issues +- **Tell**: "Sending update to user-scoped connection" but mobile not updating +- **Tell**: Multiple "User disconnected" messages indicate socket instability +- **Tell**: "Response from the Engine was empty" = Prisma database connection lost + +#### Auth Flow Debugging +- CLI must hit `/v1/auth/request` to create auth request +- Mobile scans QR and hits `/v1/auth/response` to approve +- **Tell**: 404 on `/v1/auth/response` = server likely restarted/crashed +- **Tell**: "Auth failed - user not found" = token issue or user doesn't exist + +#### Session Creation Flow +- Sessions created via POST `/v1/sessions` with tag-based deduplication +- Server emits "new-session" update to all user connections +- **Tell**: Sessions created but not showing = mobile app not processing updates +- **Tell**: "pathname /" in mobile logs = app stuck at root screen + +#### Environment Variables +- CLI: Use `yarn dev:local-server` (NOT `yarn dev`) to load `.env.dev-local-server` +- Server: Use `yarn dev` to start with proper env files +- **Tell**: Wrong server URL = check `HAPPY_SERVER_URL` env var +- **Tell**: Wrong home dir = check `HAPPY_HOME_DIR` (should be `~/.happy-dev` for local) + +### Quick Diagnostic Commands + +#### IMPORTANT: Always Start Debugging With These +```bash +# 1. CHECK CURRENT TIME - Logs use local time, know what's current! +date + +# 2. CHECK LATEST LOG FILES - Server creates new logs on restart +ls -la .logs/*.log | tail -5 + +# 3. VERIFY YOU'RE LOOKING AT CURRENT LOGS +# Server logs are named: MM-DD-HH-MM-SS.log (month-day-hour-min-sec) +# If current time is 13:45 and latest log is 08-15-10-57-02.log from 10:57, +# that log started 3 hours ago but may still be active! +tail -1 .logs/[LATEST_LOG_FILE] # Check last entry timestamp +``` + +#### Common Debugging Patterns +```bash +# Check server logs for errors +tail -100 .logs/*.log | grep -E "(error|Error|ERROR|failed|Failed)" + +# Monitor session creation +tail -f .logs/*.log | grep -E "(new-session|Session created)" + +# Check active connections +tail -100 .logs/*.log | grep -E "(Token verified|User connected|User disconnected)" + +# See what endpoints are being hit +tail -100 .logs/*.log | grep "incoming request" + +# Debug socket real-time updates +tail -500 .logs/*.log | grep -A 2 -B 2 "new-session" | tail -30 +tail -200 .logs/*.log | grep -E "(websocket|Socket.*connected|Sending update)" | tail -30 + +# Track socket events from mobile client +tail -300 .logs/*.log | grep "remote-log.*mobile" | grep -E "(SyncSocket|handleUpdate)" | tail -20 + +# Monitor session creation flow end-to-end +tail -500 .logs/*.log | grep "session-create" | tail -20 +tail -500 .logs/*.log | grep "cmed556s4002bvb2020igg8jf" -A 3 -B 3 # Replace with actual session ID + +# Check auth flow for sessions API +tail -300 .logs/*.log | grep "auth-decorator.*sessions" | tail -10 + +# Debug machine registration and online status +tail -500 .logs/*.log | grep -E "(machine-alive|machine-register|update-machine)" | tail -20 +tail -500 .logs/*.log | grep "GET /v1/machines" | tail -10 +tail -500 .logs/*.log | grep "POST /v1/machines" | tail -10 + +# Check what mobile app is seeing +tail -500 .logs/*.log | grep "📊 Storage" | tail -20 +tail -500 .logs/*.log | grep "applySessions.*active" | tail -10 +``` + +#### Time Format Reference +- **CLI logs**: `[HH:MM:SS.mmm]` in local time (e.g., `[13:45:23.738]`) +- **Server logs**: Include both `time` (Unix ms) and `localTime` (HH:MM:ss.mmm) +- **Mobile logs**: Sent with `timestamp` in UTC, converted to `localTime` on server +- **All consolidated logs**: Have `localTime` field for easy correlation \ No newline at end of file diff --git a/package.json b/package.json index 4441978..6beb0ac 100644 --- a/package.json +++ b/package.json @@ -8,12 +8,12 @@ "scripts": { "build": "tsc --noEmit", "start": "tsx ./sources/main.ts", - "dev": "tsx --env-file=.env --env-file=.env.example ./sources/main.ts", + "dev": "lsof -ti tcp:3005 | xargs kill -9 && tsx --env-file=.env --env-file=.env.dev ./sources/main.ts", "test": "vitest run", - "migrate": "dotenv -e .env.example -- prisma migrate dev", + "migrate": "dotenv -e .env.dev -- prisma migrate dev", "generate": "prisma generate", "postinstall": "prisma generate", - "db": "docker run -d -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=handy -v $(pwd)/.pgdata:/var/lib/postgresql/data -p 5432:5432 postgres", + "db": "docker run -d -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=handy -v postgres-data:/var/lib/postgresql/data -p 5432:5432 postgres", "redis": "docker run -d -p 6379:6379 redis" }, "devDependencies": { diff --git a/sources/app/api.ts b/sources/app/api.ts index a3ed823..9bdee22 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -1,5 +1,5 @@ import fastify, { FastifyInstance } from "fastify"; -import { log } from "@/utils/log"; +import { log, logger } from "@/utils/log"; import { serializerCompiler, validatorCompiler, ZodTypeProvider } from "fastify-type-provider-zod"; import { Server, Socket } from "socket.io"; import { z } from "zod"; @@ -86,13 +86,16 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> app.decorate('authenticate', async function (request: any, reply: any) { try { const authHeader = request.headers.authorization; + log({ module: 'auth-decorator' }, `Auth check - path: ${request.url}, has header: ${!!authHeader}, header start: ${authHeader?.substring(0, 50)}...`); if (!authHeader || !authHeader.startsWith('Bearer ')) { + log({ module: 'auth-decorator' }, `Auth failed - missing or invalid header`); return reply.code(401).send({ error: 'Missing authorization header' }); } const token = authHeader.substring(7); const verified = await tokenVerifier.verify(token); if (!verified) { + log({ module: 'auth-decorator' }, `Auth failed - invalid token`); return reply.code(401).send({ error: 'Invalid token' }); } @@ -102,8 +105,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }); if (!user) { + log({ module: 'auth-decorator' }, `Auth failed - user not found: ${verified.user}`); return reply.code(401).send({ error: 'User not found' }); } + + log({ module: 'auth-decorator' }, `Auth success - user: ${user.id}`); request.user = user; } catch (error) { @@ -224,10 +230,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> return reply.code(401).send({ error: 'Invalid public key' }); } + const publicKeyHex = privacyKit.encodeHex(publicKey); + log({ module: 'auth-request' }, `Terminal auth request - publicKey hex: ${publicKeyHex}`); + const answer = await db.terminalAuthRequest.upsert({ - where: { publicKey: privacyKit.encodeHex(publicKey) }, + where: { publicKey: publicKeyHex }, update: {}, - create: { publicKey: privacyKit.encodeHex(publicKey) } + create: { publicKey: publicKeyHex } }); if (answer.response && answer.responseAccountId) { @@ -252,15 +261,26 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }) } }, async (request, reply) => { + log({ module: 'auth-response' }, `Auth response endpoint hit - user: ${request.user?.id || 'NO USER'}, publicKey: ${request.body.publicKey.substring(0, 20)}...`); const publicKey = privacyKit.decodeBase64(request.body.publicKey); const isValid = tweetnacl.box.publicKeyLength === publicKey.length; if (!isValid) { + log({ module: 'auth-response' }, `Invalid public key length: ${publicKey.length}`); return reply.code(401).send({ error: 'Invalid public key' }); } + const publicKeyHex = privacyKit.encodeHex(publicKey); + log({ module: 'auth-response' }, `Looking for auth request with publicKey hex: ${publicKeyHex}`); const authRequest = await db.terminalAuthRequest.findUnique({ - where: { publicKey: privacyKit.encodeHex(publicKey) } + where: { publicKey: publicKeyHex } }); if (!authRequest) { + log({ module: 'auth-response' }, `Auth request not found for publicKey: ${publicKeyHex}`); + // Let's also check what auth requests exist + const allRequests = await db.terminalAuthRequest.findMany({ + take: 5, + orderBy: { createdAt: 'desc' } + }); + log({ module: 'auth-response' }, `Recent auth requests in DB: ${JSON.stringify(allRequests.map(r => ({ id: r.id, publicKey: r.publicKey.substring(0, 20) + '...', hasResponse: !!r.response })))}`); return reply.code(404).send({ error: 'Request not found' }); } if (!authRequest.response) { @@ -416,6 +436,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }); if (session) { + logger.info({ module: 'session-create', sessionId: session.id, userId, tag }, `Found existing session: ${session.id} for tag ${tag}`); return reply.send({ session: { id: session.id, @@ -437,6 +458,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> const updSeq = await allocateUserSeq(userId); // Create session + logger.info({ module: 'session-create', userId, tag }, `Creating new session for user ${userId} with tag ${tag}`); const session = await db.session.create({ data: { accountId: userId, @@ -444,6 +466,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> metadata: metadata } }); + logger.info({ module: 'session-create', sessionId: session.id, userId }, `Session created: ${session.id}`); // Create update const updContent: PrismaJson.UpdateBody = { @@ -461,15 +484,23 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }; // Emit update to connected sockets + const updatePayload = { + id: randomKeyNaked(12), + seq: updSeq, + body: updContent, + createdAt: Date.now() + }; + logger.info({ + module: 'session-create', + userId, + sessionId: session.id, + updateType: 'new-session', + updatePayload: JSON.stringify(updatePayload) + }, `Emitting new-session update to all user connections`); emitUpdateToInterestedClients({ event: 'update', userId, - payload: { - id: randomKeyNaked(12), - seq: updSeq, - body: updContent, - createdAt: Date.now() - }, + payload: updatePayload, recipientFilter: { type: 'all-user-authenticated-connections' } }); @@ -945,7 +976,10 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> const userId = request.user.id; const { id, metadata } = request.body; - const machine = await db.machine.upsert({ + logger.info({ module: 'machines', machineId: id, userId, hasMetadata: !!metadata }, 'Creating/updating machine'); + + try { + const machine = await db.machine.upsert({ where: { accountId_id: { accountId: userId, @@ -964,9 +998,9 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> active: true, lastActiveAt: new Date() } - }); - - // Emit update to all user connections + }); + + // Emit update to all user connections const updSeq = await allocateUserSeq(userId); emitUpdateToInterestedClients({ event: 'update', @@ -984,9 +1018,78 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }, createdAt: Date.now() } + }); + + return { success: true }; + } catch (error) { + logger.error({ + module: 'machines', + machineId: id, + userId, + error: error instanceof Error ? error.message : String(error), + errorStack: error instanceof Error ? error.stack : undefined + }, 'Failed to create/update machine'); + return reply.code(500).send({ + error: 'Failed to create/update machine', + details: error instanceof Error ? error.message : String(error) + }); + } + }); + + // Combined logging endpoint (only when explicitly enabled) + if (process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING) { + typed.post('/logs-combined-from-cli-and-mobile-for-simple-ai-debugging', { + schema: { + body: z.object({ + timestamp: z.string(), + level: z.string(), + message: z.string(), + messageRawObject: z.any().optional(), + source: z.enum(['mobile', 'cli']), + platform: z.string().optional() + }) + } + }, async (request, reply) => { + const { timestamp, level, message, source, platform } = request.body; + + // Log ONLY to separate remote logger (file only, no console) + const logData = { + source, + platform, + timestamp + }; + + // Use the file-only logger if available + const { fileConsolidatedLogger } = await import('@/utils/log'); + + if (!fileConsolidatedLogger) { + // Should never happen since we check env var above, but be safe + return reply.send({ success: true }); + } + + switch (level.toLowerCase()) { + case 'error': + fileConsolidatedLogger.error(logData, message); + break; + case 'warn': + case 'warning': + fileConsolidatedLogger.warn(logData, message); + break; + case 'debug': + fileConsolidatedLogger.debug(logData, message); + break; + default: + fileConsolidatedLogger.info(logData, message); + } + + return reply.send({ success: true }); }); - - return { success: true }; + } + + // Catch-all route for debugging 404s + app.setNotFoundHandler((request, reply) => { + log({ module: '404-handler' }, `404 - Method: ${request.method}, Path: ${request.url}, Headers: ${JSON.stringify(request.headers)}`); + reply.code(404).send({ error: 'Not found', path: request.url, method: request.method }); }); // Start diff --git a/sources/utils/log.ts b/sources/utils/log.ts index a2d9cc5..d07402a 100644 --- a/sources/utils/log.ts +++ b/sources/utils/log.ts @@ -2,17 +2,37 @@ import pino from 'pino'; import { mkdirSync } from 'fs'; import { join } from 'path'; -const isDebug = process.env.DEBUG === 'true' || process.env.NODE_ENV === 'development'; -const logsDir = join(process.cwd(), '.logs'); +// Single log file name created once at startup +let consolidatedLogFile: string | undefined; -if (isDebug) { +if (process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING) { + const logsDir = join(process.cwd(), '.logs'); try { mkdirSync(logsDir, { recursive: true }); + // Create filename once at startup + const now = new Date(); + const month = String(now.getMonth() + 1).padStart(2, '0'); + const day = String(now.getDate()).padStart(2, '0'); + const hour = String(now.getHours()).padStart(2, '0'); + const min = String(now.getMinutes()).padStart(2, '0'); + const sec = String(now.getSeconds()).padStart(2, '0'); + consolidatedLogFile = join(logsDir, `${month}-${day}-${hour}-${min}-${sec}.log`); + console.log(`[PINO] Remote debugging logs enabled - writing to ${consolidatedLogFile}`); } catch (error) { console.error('Failed to create logs directory:', error); } } +// Format time as HH:MM:ss.mmm in local time +function formatLocalTime(timestamp?: number) { + const date = timestamp ? new Date(timestamp) : new Date(); + const hours = String(date.getHours()).padStart(2, '0'); + const mins = String(date.getMinutes()).padStart(2, '0'); + const secs = String(date.getSeconds()).padStart(2, '0'); + const ms = String(date.getMilliseconds()).padStart(3, '0'); + return `${hours}:${mins}:${secs}.${ms}`; +} + const transports: any[] = []; transports.push({ @@ -21,28 +41,66 @@ transports.push({ colorize: true, translateTime: 'HH:MM:ss.l', ignore: 'pid,hostname', - messageFormat: '{levelLabel} [{time}] {msg}', + messageFormat: '{levelLabel} {msg} | [{time}]', errorLikeObjectKeys: ['err', 'error'], }, }); -if (isDebug) { +if (process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING && consolidatedLogFile) { transports.push({ target: 'pino/file', options: { - destination: join(logsDir, `server-${new Date().toISOString().split('T')[0]}.log`), + destination: consolidatedLogFile, mkdir: true, + messageFormat: '{levelLabel} {msg} | [server time: {time}]', }, }); } +// Main server logger with local time formatting export const logger = pino({ - level: isDebug ? 'debug' : 'info', + level: 'debug', transport: { targets: transports, }, + formatters: { + log: (object: any) => { + // Add localTime to every log entry + return { + ...object, + localTime: formatLocalTime(typeof object.time === 'number' ? object.time : undefined), + }; + } + }, + timestamp: () => `,"time":${Date.now()},"localTime":"${formatLocalTime()}"`, }); +// Optional file-only logger for remote logs from CLI/mobile +export const fileConsolidatedLogger = process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING && consolidatedLogFile ? + pino({ + level: 'debug', + transport: { + targets: [{ + target: 'pino/file', + options: { + destination: consolidatedLogFile, + mkdir: true, + }, + }], + }, + formatters: { + log: (object: any) => { + // Add localTime to every log entry + // Note: source property already exists from CLI/mobile logs + return { + ...object, + localTime: formatLocalTime(typeof object.time === 'number' ? object.time : undefined), + }; + } + }, + timestamp: () => `,"time":${Date.now()},"localTime":"${formatLocalTime()}"`, + }) : undefined; + export function log(src: any, ...args: any[]) { logger.info(src, ...args); } From 6b1a3c3e824a557f23e4b50a788d448aa89c31e1 Mon Sep 17 00:00:00 2001 From: Kirill Dubovitskiy Date: Sat, 16 Aug 2025 07:12:11 -0700 Subject: [PATCH 6/9] wip: emit machine updates on heartbeat for real-time status MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Machine-alive handler now properly emits update events to all user connections - Ensures mobile clients receive real-time machine status updates - Fixed null handling when machine doesn't exist in database Note: Last test was not able to spawn new session - webhook callback timing issue 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- sources/app/api.ts | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/sources/app/api.ts b/sources/app/api.ts index 9bdee22..518573f 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -1339,7 +1339,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> const machineId = data.machineId; // Update machine lastActiveAt in database - await db.machine.update({ + const machine = await db.machine.update({ where: { accountId_id: { accountId: userId, @@ -1352,7 +1352,33 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }).catch(() => { // Machine might not exist yet, that's ok + return null; }); + + // If machine was updated, emit update to all user connections + if (machine) { + const updSeq = await allocateUserSeq(userId); + emitUpdateToInterestedClients({ + event: 'update', + userId, + payload: { + id: randomKeyNaked(12), + seq: updSeq, + body: { + t: 'update-machine', + id: machine.id, + metadata: machine.metadata ? { + version: machine.metadataVersion, + value: machine.metadata + } : undefined, + active: true, + lastActiveAt: t + }, + createdAt: Date.now() + }, + recipientFilter: { type: 'all-user-authenticated-connections' } + }); + } } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in machine-alive: ${error}`); } From d03240061d5aed805aef984108346eb1d6bb04f5 Mon Sep 17 00:00:00 2001 From: Kirill Dubovitskiy Date: Sun, 17 Aug 2025 18:32:31 -0700 Subject: [PATCH 7/9] refactor: prepare server for machine sync refactoring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Clean up machine API endpoints formatting - Update machine-alive to use ephemeral events instead of updates - Prepare types for separated metadata and daemonState - Fix activeAt field name consistency in machine responses 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- package.json | 2 +- sources/app/api.ts | 447 ++++++++++++++++++++++++--------------- sources/storage/types.ts | 8 + 3 files changed, 285 insertions(+), 172 deletions(-) diff --git a/package.json b/package.json index 6beb0ac..078e89a 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,7 @@ "migrate": "dotenv -e .env.dev -- prisma migrate dev", "generate": "prisma generate", "postinstall": "prisma generate", - "db": "docker run -d -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=handy -v postgres-data:/var/lib/postgresql/data -p 5432:5432 postgres", + "db": "docker run -d -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=handy -v $(pwd)/.pgdata:/var/lib/postgresql/data -p 5432:5432 postgres", "redis": "docker run -d -p 6379:6379 redis" }, "devDependencies": { diff --git a/sources/app/api.ts b/sources/app/api.ts index 518573f..9747fee 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -14,10 +14,10 @@ import { AsyncLock } from "@/utils/lock"; // Recipient filter types -type RecipientFilter = - | { type: 'all-interested-in-session'; sessionId: string } - | { type: 'user-scoped-only' } - | { type: 'all-user-authenticated-connections' }; +type RecipientFilter = + | { type: 'all-interested-in-session'; sessionId: string } + | { type: 'user-scoped-only' } + | { type: 'all-user-authenticated-connections' }; // Connection metadata types interface SessionScopedConnection { @@ -108,7 +108,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> log({ module: 'auth-decorator' }, `Auth failed - user not found: ${verified.user}`); return reply.code(401).send({ error: 'User not found' }); } - + log({ module: 'auth-decorator' }, `Auth success - user: ${user.id}`); request.user = user; @@ -118,12 +118,12 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }); // Send session update to all relevant connections - let emitUpdateToInterestedClients = ({ - event, - userId, - payload, + let emitUpdateToInterestedClients = ({ + event, + userId, + payload, recipientFilter = { type: 'all-user-authenticated-connections' }, - skipSenderConnection + skipSenderConnection }: { event: string, userId: string, @@ -156,13 +156,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } // user-scoped always gets it break; - + case 'user-scoped-only': if (connection.connectionType !== 'user-scoped') { continue; } break; - + case 'all-user-authenticated-connections': // Send to all connection types (default behavior) break; @@ -232,7 +232,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> const publicKeyHex = privacyKit.encodeHex(publicKey); log({ module: 'auth-request' }, `Terminal auth request - publicKey hex: ${publicKeyHex}`); - + const answer = await db.terminalAuthRequest.upsert({ where: { publicKey: publicKeyHex }, update: {}, @@ -310,11 +310,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> // Check if OpenAI API key is configured on server const OPENAI_API_KEY = process.env.OPENAI_API_KEY; if (!OPENAI_API_KEY) { - return reply.code(500).send({ - error: 'OpenAI API key not configured on server' + return reply.code(500).send({ + error: 'OpenAI API key not configured on server' }); } - + // Generate ephemeral token from OpenAI const response = await fetch('https://api.openai.com/v1/realtime/sessions', { method: 'POST', @@ -327,11 +327,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> voice: 'verse', }), }); - + if (!response.ok) { throw new Error(`OpenAI API error: ${response.status}`); } - + const data = await response.json() as { client_secret: { value: string; @@ -339,14 +339,14 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }; id: string; }; - + return reply.send({ token: data.client_secret.value }); } catch (error) { log({ module: 'openai', level: 'error' }, 'Failed to generate ephemeral token', error); - return reply.code(500).send({ - error: 'Failed to generate ephemeral token' + return reply.code(500).send({ + error: 'Failed to generate ephemeral token' }); } }); @@ -391,7 +391,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> const lastMessage = v.messages[0]; const sessionUpdatedAt = v.updatedAt.getTime(); const lastMessageCreatedAt = lastMessage ? lastMessage.createdAt.getTime() : 0; - + return { id: v.id, seq: v.seq, @@ -490,9 +490,9 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> body: updContent, createdAt: Date.now() }; - logger.info({ - module: 'session-create', - userId, + logger.info({ + module: 'session-create', + userId, sessionId: session.id, updateType: 'new-session', updatePayload: JSON.stringify(updatePayload) @@ -940,30 +940,9 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }); }); - // Machines API - typed.get('/v1/machines', { - preHandler: app.authenticate, - }, async (request, reply) => { - const userId = request.user.id; - - const machines = await db.machine.findMany({ - where: { accountId: userId }, - orderBy: { lastActiveAt: 'desc' } - }); - - return machines.map(m => ({ - id: m.id, - metadata: m.metadata, - metadataVersion: m.metadataVersion, - seq: m.seq, - active: m.active, - lastActiveAt: m.lastActiveAt.getTime(), - createdAt: m.createdAt.getTime(), - updatedAt: m.updatedAt.getTime() - })); - }); + // Machines - // POST /v1/machines - Create or update machine + // POST /v1/machines - Create machine or return existing typed.post('/v1/machines', { preHandler: app.authenticate, schema: { @@ -975,67 +954,139 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }, async (request, reply) => { const userId = request.user.id; const { id, metadata } = request.body; - - logger.info({ module: 'machines', machineId: id, userId, hasMetadata: !!metadata }, 'Creating/updating machine'); - - try { - const machine = await db.machine.upsert({ + + // Check if machine exists (like sessions do) + const machine = await db.machine.findFirst({ where: { - accountId_id: { - accountId: userId, - id - } - }, - create: { - id, accountId: userId, - metadata, - metadataVersion: 1 - }, - update: { - metadata, - metadataVersion: { increment: 1 }, - active: true, - lastActiveAt: new Date() + id: id } - }); - - // Emit update to all user connections - const updSeq = await allocateUserSeq(userId); - emitUpdateToInterestedClients({ - event: 'update', - userId, - payload: { - id: randomKeyNaked(), - seq: updSeq, - body: { - t: 'update-machine', + }); + + if (machine) { + // Machine exists - just return it + logger.info({ module: 'machines', machineId: id, userId }, 'Found existing machine'); + return reply.send({ + machine: { id: machine.id, - metadata: { - version: machine.metadataVersion, - value: metadata - } - }, - createdAt: Date.now() - } + metadata: machine.metadata, + metadataVersion: machine.metadataVersion, + active: machine.active, + lastActiveAt: machine.lastActiveAt.getTime(), + createdAt: machine.createdAt.getTime(), + updatedAt: machine.updatedAt.getTime() + } }); - - return { success: true }; - } catch (error) { - logger.error({ - module: 'machines', - machineId: id, - userId, - error: error instanceof Error ? error.message : String(error), - errorStack: error instanceof Error ? error.stack : undefined - }, 'Failed to create/update machine'); - return reply.code(500).send({ - error: 'Failed to create/update machine', - details: error instanceof Error ? error.message : String(error) + } else { + // Create new machine + logger.info({ module: 'machines', machineId: id, userId }, 'Creating new machine'); + + const newMachine = await db.machine.create({ + data: { + id, + accountId: userId, + metadata, + metadataVersion: 1 + // active defaults to true in schema + // lastActiveAt defaults to now() in schema + } + }); + + // Emit update for new machine + const updSeq = await allocateUserSeq(userId); + emitUpdateToInterestedClients({ + event: 'update', + userId, + payload: { + id: randomKeyNaked(), + seq: updSeq, + body: { + t: 'update-machine', + id: newMachine.id, + metadata: { + version: 1, + value: metadata + } + }, + createdAt: Date.now() + } + }); + + return reply.send({ + machine: { + id: newMachine.id, + metadata: newMachine.metadata, + metadataVersion: 1, + active: newMachine.active, + lastActiveAt: newMachine.lastActiveAt.getTime(), + createdAt: newMachine.createdAt.getTime(), + updatedAt: newMachine.updatedAt.getTime() + } }); } }); + + // Machines API + typed.get('/v1/machines', { + preHandler: app.authenticate, + }, async (request, reply) => { + const userId = request.user.id; + + const machines = await db.machine.findMany({ + where: { accountId: userId }, + orderBy: { lastActiveAt: 'desc' } + }); + + return machines.map(m => ({ + id: m.id, + metadata: m.metadata, + metadataVersion: m.metadataVersion, + seq: m.seq, + active: m.active, + activeAt: m.lastActiveAt.getTime(), + createdAt: m.createdAt.getTime(), + updatedAt: m.updatedAt.getTime() + })); + }); + + // GET /v1/machines/:id - Get single machine by ID + typed.get('/v1/machines/:id', { + preHandler: app.authenticate, + schema: { + params: z.object({ + id: z.string() + }) + } + }, async (request, reply) => { + const userId = request.user.id; + const { id } = request.params; + + const machine = await db.machine.findFirst({ + where: { + accountId: userId, + id: id + } + }); + + if (!machine) { + return reply.code(404).send({ error: 'Machine not found' }); + } + + return { + machine: { + id: machine.id, + metadata: machine.metadata, + metadataVersion: machine.metadataVersion, + seq: machine.seq, + active: machine.active, + activeAt: machine.lastActiveAt.getTime(), + createdAt: machine.createdAt.getTime(), + updatedAt: machine.updatedAt.getTime() + } + }; + }); + // Combined logging endpoint (only when explicitly enabled) if (process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING) { typed.post('/logs-combined-from-cli-and-mobile-for-simple-ai-debugging', { @@ -1051,22 +1102,22 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }, async (request, reply) => { const { timestamp, level, message, source, platform } = request.body; - + // Log ONLY to separate remote logger (file only, no console) const logData = { source, platform, timestamp }; - + // Use the file-only logger if available const { fileConsolidatedLogger } = await import('@/utils/log'); - + if (!fileConsolidatedLogger) { // Should never happen since we check env var above, but be safe return reply.send({ success: true }); } - + switch (level.toLowerCase()) { case 'error': fileConsolidatedLogger.error(logData, message); @@ -1081,7 +1132,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> default: fileConsolidatedLogger.info(logData, message); } - + return reply.send({ success: true }); }); } @@ -1146,7 +1197,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> socket.disconnect(); return; } - + // Validate machine-scoped clients have machineId if (clientType === 'machine-scoped' && !machineId) { log({ module: 'websocket' }, `Machine-scoped client missing machineId`); @@ -1194,7 +1245,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> userIdToClientConnections.set(userId, new Set()); } userIdToClientConnections.get(userId)!.add(connection); - + // Broadcast daemon online status if (connection.connectionType === 'machine-scoped') { // Broadcast daemon online @@ -1248,7 +1299,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } log({ module: 'websocket' }, `User disconnected: ${userId}`); - + // Broadcast daemon offline status if (connection.connectionType === 'machine-scoped') { emitUpdateToInterestedClients({ @@ -1285,7 +1336,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } const { sid, thinking } = data; - + // Resolve session const session = await db.session.findUnique({ where: { id: sid, accountId: userId } @@ -1336,49 +1387,45 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> return; } - const machineId = data.machineId; - - // Update machine lastActiveAt in database - const machine = await db.machine.update({ + // Resolve machine + const machine = await db.machine.findUnique({ where: { accountId_id: { accountId: userId, - id: machineId + id: data.machineId + } + } + }); + + if (!machine) { + return; + } + + // Update machine lastActiveAt in database + const updatedMachine = await db.machine.update({ + where: { + accountId_id: { + accountId: userId, + id: data.machineId } }, data: { lastActiveAt: new Date(t), active: true } - }).catch(() => { - // Machine might not exist yet, that's ok - return null; - }); + }) - // If machine was updated, emit update to all user connections - if (machine) { - const updSeq = await allocateUserSeq(userId); - emitUpdateToInterestedClients({ - event: 'update', - userId, - payload: { - id: randomKeyNaked(12), - seq: updSeq, - body: { - t: 'update-machine', - id: machine.id, - metadata: machine.metadata ? { - version: machine.metadataVersion, - value: machine.metadata - } : undefined, - active: true, - lastActiveAt: t - }, - createdAt: Date.now() - }, - recipientFilter: { type: 'all-user-authenticated-connections' } - }); - } + emitUpdateToInterestedClients({ + event: 'ephemeral', + userId, + payload: { + type: 'machine-activity', + id: updatedMachine.id, + active: true, + lastActiveAt: t, + }, + recipientFilter: { type: 'user-scoped-only' } + }); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in machine-alive: ${error}`); } @@ -1660,50 +1707,108 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }); - // Update machine through socket - socket.on('update-machine', async (data: { metadata: string }) => { - if (connection.connectionType !== 'machine-scoped') { - return; // Only machines can update their own metadata - } - - const machineId = connection.machineId; - + // Machine metadata update with optimistic concurrency control + socket.on('machine-update-metadata', async (data: any, callback: (response: any) => void) => { try { - const machine = await db.machine.update({ + const { machineId, metadata, expectedVersion } = data; + + // Validate input + if (!machineId || typeof metadata !== 'string' || typeof expectedVersion !== 'number') { + if (callback) { + callback({ result: 'error', message: 'Invalid parameters' }); + } + return; + } + + // Resolve machine + const machine = await db.machine.findFirst({ where: { - accountId_id: { + accountId: userId, + id: machineId + } + }); + if (!machine) { + if (callback) { + callback({ result: 'error', message: 'Machine not found' }); + } + return; + } + + // Check version + if (machine.metadataVersion !== expectedVersion) { + callback({ + result: 'version-mismatch', + version: machine.metadataVersion, + metadata: machine.metadata + }); + return; + } + + // Update metadata with atomic version check + const { count } = await db.machine.updateMany({ + where: { + accountId: userId, + id: machineId, + metadataVersion: expectedVersion // Atomic CAS + }, + data: { + metadata: metadata, + metadataVersion: expectedVersion + 1 + // NOT updating active or lastActiveAt here + } + }); + + if (count === 0) { + // Re-fetch current version + const current = await db.machine.findFirst({ + where: { accountId: userId, id: machineId } - }, - data: { - metadata: data.metadata, - metadataVersion: { increment: 1 } - } - }); - - // Emit to other connections + }); + callback({ + result: 'version-mismatch', + version: current?.metadataVersion || 0, + metadata: current?.metadata + }); + return; + } + + // Generate update const updSeq = await allocateUserSeq(userId); + const updContent: PrismaJson.UpdateBody = { + t: 'update-machine', + id: machineId, + metadata: { + value: metadata, + version: expectedVersion + 1 + } + }; + + // Emit to all connections emitUpdateToInterestedClients({ event: 'update', userId, payload: { - id: randomKeyNaked(), + id: randomKeyNaked(12), seq: updSeq, - body: { - t: 'update-machine', - id: machineId, - metadata: { - version: machine.metadataVersion, - value: data.metadata - } - }, + body: updContent, createdAt: Date.now() }, - skipSenderConnection: connection + recipientFilter: { type: 'all-user-authenticated-connections' } + }); + + // Send success response with new version + callback({ + result: 'success', + version: expectedVersion + 1, + metadata: metadata }); } catch (error) { - log({ module: 'websocket', level: 'error' }, `Error updating machine metadata: ${error}`); + log({ module: 'websocket', level: 'error' }, `Error in machine-update-metadata: ${error}`); + if (callback) { + callback({ result: 'error', message: 'Internal error' }); + } } }); diff --git a/sources/storage/types.ts b/sources/storage/types.ts index eb12f66..30998c3 100644 --- a/sources/storage/types.ts +++ b/sources/storage/types.ts @@ -60,6 +60,14 @@ declare global { value: string | null; version: number; } | null | undefined; + } | { + t: 'update-machine'; + id: string; + metadata?: { + value: string; + version: number; + }; + activeAt?: number; }; } } From 62a2280268e89fbbde0c6c716040b9e815e67ca7 Mon Sep 17 00:00:00 2001 From: Kirill Dubovitskiy Date: Mon, 18 Aug 2025 00:05:37 -0700 Subject: [PATCH 8/9] feat: daemon kinda functional e2e --- .../migration.sql | 3 + prisma/schema.prisma | 22 +-- sources/app/api.ts | 151 ++++++++++++++++-- sources/storage/types.ts | 6 +- 4 files changed, 154 insertions(+), 28 deletions(-) create mode 100644 prisma/migrations/20250818044258_add_daemon_state_to_machine/migration.sql diff --git a/prisma/migrations/20250818044258_add_daemon_state_to_machine/migration.sql b/prisma/migrations/20250818044258_add_daemon_state_to_machine/migration.sql new file mode 100644 index 0000000..ff78ad3 --- /dev/null +++ b/prisma/migrations/20250818044258_add_daemon_state_to_machine/migration.sql @@ -0,0 +1,3 @@ +-- AlterTable +ALTER TABLE "Machine" ADD COLUMN "daemonState" TEXT, +ADD COLUMN "daemonStateVersion" INTEGER NOT NULL DEFAULT 0; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index d494f7f..a37a1b3 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -144,16 +144,18 @@ model UsageReport { // model Machine { - id String @id - accountId String - account Account @relation(fields: [accountId], references: [id]) - metadata String // Encrypted - contains ALL machine info - metadataVersion Int @default(0) - seq Int @default(0) - active Boolean @default(true) - lastActiveAt DateTime @default(now()) - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt + id String @id + accountId String + account Account @relation(fields: [accountId], references: [id]) + metadata String // Encrypted - contains static machine info + metadataVersion Int @default(0) + daemonState String? // Encrypted - contains dynamic daemon state + daemonStateVersion Int @default(0) + seq Int @default(0) + active Boolean @default(true) + lastActiveAt DateTime @default(now()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt @@unique([accountId, id]) @@index([accountId]) diff --git a/sources/app/api.ts b/sources/app/api.ts index 9747fee..a1b7a8b 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -948,12 +948,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> schema: { body: z.object({ id: z.string(), - metadata: z.string() // Encrypted metadata + metadata: z.string(), // Encrypted metadata + daemonState: z.string().optional() // Encrypted daemon state }) } }, async (request, reply) => { const userId = request.user.id; - const { id, metadata } = request.body; + const { id, metadata, daemonState } = request.body; // Check if machine exists (like sessions do) const machine = await db.machine.findFirst({ @@ -971,8 +972,10 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> id: machine.id, metadata: machine.metadata, metadataVersion: machine.metadataVersion, + daemonState: machine.daemonState, + daemonStateVersion: machine.daemonStateVersion, active: machine.active, - lastActiveAt: machine.lastActiveAt.getTime(), + activeAt: machine.lastActiveAt.getTime(), // Return as activeAt for API consistency createdAt: machine.createdAt.getTime(), updatedAt: machine.updatedAt.getTime() } @@ -986,7 +989,9 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> id, accountId: userId, metadata, - metadataVersion: 1 + metadataVersion: 1, + daemonState: daemonState || null, + daemonStateVersion: daemonState ? 1 : 0 // active defaults to true in schema // lastActiveAt defaults to now() in schema } @@ -1002,7 +1007,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> seq: updSeq, body: { t: 'update-machine', - id: newMachine.id, + machineId: newMachine.id, metadata: { version: 1, value: metadata @@ -1016,9 +1021,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> machine: { id: newMachine.id, metadata: newMachine.metadata, - metadataVersion: 1, + metadataVersion: newMachine.metadataVersion, + daemonState: newMachine.daemonState, + daemonStateVersion: newMachine.daemonStateVersion, active: newMachine.active, - lastActiveAt: newMachine.lastActiveAt.getTime(), + activeAt: newMachine.lastActiveAt.getTime(), // Return as activeAt for API consistency createdAt: newMachine.createdAt.getTime(), updatedAt: newMachine.updatedAt.getTime() } @@ -1042,6 +1049,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> id: m.id, metadata: m.metadata, metadataVersion: m.metadataVersion, + daemonState: m.daemonState, + daemonStateVersion: m.daemonStateVersion, seq: m.seq, active: m.active, activeAt: m.lastActiveAt.getTime(), @@ -1078,6 +1087,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> id: machine.id, metadata: machine.metadata, metadataVersion: machine.metadataVersion, + daemonState: machine.daemonState, + daemonStateVersion: machine.daemonStateVersion, seq: machine.seq, active: machine.active, activeAt: machine.lastActiveAt.getTime(), @@ -1253,10 +1264,10 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> event: 'ephemeral', userId, payload: { - type: 'daemon-status', - machineId, - status: 'online', - timestamp: Date.now() + type: 'machine-activity', + id: machineId, + active: true, + activeAt: Date.now() }, recipientFilter: { type: 'user-scoped-only' } }); @@ -1306,10 +1317,10 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> event: 'ephemeral', userId, payload: { - type: 'daemon-status', - machineId: connection.machineId, - status: 'offline', - timestamp: Date.now() + type: 'machine-activity', + id: connection.machineId, + active: false, + activeAt: Date.now() }, recipientFilter: { type: 'user-scoped-only' } }); @@ -1422,7 +1433,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> type: 'machine-activity', id: updatedMachine.id, active: true, - lastActiveAt: t, + activeAt: t, }, recipientFilter: { type: 'user-scoped-only' } }); @@ -1778,7 +1789,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> const updSeq = await allocateUserSeq(userId); const updContent: PrismaJson.UpdateBody = { t: 'update-machine', - id: machineId, + machineId: machineId, metadata: { value: metadata, version: expectedVersion + 1 @@ -1812,6 +1823,112 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }); + // Machine daemon state update with optimistic concurrency control + socket.on('machine-update-state', async (data: any, callback: (response: any) => void) => { + try { + const { machineId, daemonState, expectedVersion } = data; + + // Validate input + if (!machineId || typeof daemonState !== 'string' || typeof expectedVersion !== 'number') { + if (callback) { + callback({ result: 'error', message: 'Invalid parameters' }); + } + return; + } + + // Resolve machine + const machine = await db.machine.findFirst({ + where: { + accountId: userId, + id: machineId + } + }); + if (!machine) { + if (callback) { + callback({ result: 'error', message: 'Machine not found' }); + } + return; + } + + // Check version + if (machine.daemonStateVersion !== expectedVersion) { + callback({ + result: 'version-mismatch', + version: machine.daemonStateVersion, + daemonState: machine.daemonState + }); + return; + } + + // Update daemon state with atomic version check + const { count } = await db.machine.updateMany({ + where: { + accountId: userId, + id: machineId, + daemonStateVersion: expectedVersion // Atomic CAS + }, + data: { + daemonState: daemonState, + daemonStateVersion: expectedVersion + 1, + active: true, + lastActiveAt: new Date() + } + }); + + if (count === 0) { + // Re-fetch current version + const current = await db.machine.findFirst({ + where: { + accountId: userId, + id: machineId + } + }); + callback({ + result: 'version-mismatch', + version: current?.daemonStateVersion || 0, + daemonState: current?.daemonState + }); + return; + } + + // Generate update + const updSeq = await allocateUserSeq(userId); + const updContent: PrismaJson.UpdateBody = { + t: 'update-machine', + machineId: machineId, + daemonState: { + value: daemonState, + version: expectedVersion + 1 + } + }; + + // Emit to all connections + emitUpdateToInterestedClients({ + event: 'update', + userId, + payload: { + id: randomKeyNaked(12), + seq: updSeq, + body: updContent, + createdAt: Date.now() + }, + recipientFilter: { type: 'all-user-authenticated-connections' } + }); + + // Send success response with new version + callback({ + result: 'success', + version: expectedVersion + 1, + daemonState: daemonState + }); + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Error in machine-update-state: ${error}`); + if (callback) { + callback({ result: 'error', message: 'Internal error' }); + } + } + }); + // RPC register - Register this socket as a listener for an RPC method socket.on('rpc-register', async (data: any) => { try { diff --git a/sources/storage/types.ts b/sources/storage/types.ts index 30998c3..a564f2c 100644 --- a/sources/storage/types.ts +++ b/sources/storage/types.ts @@ -62,11 +62,15 @@ declare global { } | null | undefined; } | { t: 'update-machine'; - id: string; + machineId: string; metadata?: { value: string; version: number; }; + daemonState?: { + value: string; + version: number; + }; activeAt?: number; }; } From 6f1aefc05693c63f8aed2d6c020f694f20405eff Mon Sep 17 00:00:00 2001 From: Steve Korshakov Date: Sat, 16 Aug 2025 10:13:44 -0700 Subject: [PATCH 9/9] feat: add app-to-app authentication --- .../migration.sql | 17 +++ prisma/schema.prisma | 11 ++ sources/app/api.ts | 107 ++++++++++-------- 3 files changed, 89 insertions(+), 46 deletions(-) create mode 100644 prisma/migrations/20250816171155_add_app_to_app_authentication/migration.sql diff --git a/prisma/migrations/20250816171155_add_app_to_app_authentication/migration.sql b/prisma/migrations/20250816171155_add_app_to_app_authentication/migration.sql new file mode 100644 index 0000000..14e7cd7 --- /dev/null +++ b/prisma/migrations/20250816171155_add_app_to_app_authentication/migration.sql @@ -0,0 +1,17 @@ +-- CreateTable +CREATE TABLE "AccountAuthRequest" ( + "id" TEXT NOT NULL, + "publicKey" TEXT NOT NULL, + "response" TEXT, + "responseAccountId" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "AccountAuthRequest_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "AccountAuthRequest_publicKey_key" ON "AccountAuthRequest"("publicKey"); + +-- AddForeignKey +ALTER TABLE "AccountAuthRequest" ADD CONSTRAINT "AccountAuthRequest_responseAccountId_fkey" FOREIGN KEY ("responseAccountId") REFERENCES "Account"("id") ON DELETE SET NULL ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index a37a1b3..692a10e 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -29,6 +29,7 @@ model Account { Session Session[] AccountPushToken AccountPushToken[] TerminalAuthRequest TerminalAuthRequest[] + AccountAuthRequest AccountAuthRequest[] UsageReport UsageReport[] Machine Machine[] } @@ -43,6 +44,16 @@ model TerminalAuthRequest { updatedAt DateTime @updatedAt } +model AccountAuthRequest { + id String @id @default(cuid()) + publicKey String @unique + response String? + responseAccountId String? + responseAccount Account? @relation(fields: [responseAccountId], references: [id]) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt +} + model AccountPushToken { id String @id @default(cuid()) accountId String diff --git a/sources/app/api.ts b/sources/app/api.ts index a1b7a8b..a5abadf 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -292,63 +292,78 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> return reply.send({ success: true }); }); - // OpenAI Realtime ephemeral token generation - typed.post('/v1/openai/realtime-token', { - preHandler: app.authenticate, + // Account auth request + typed.post('/v1/auth/account/request', { schema: { + body: z.object({ + publicKey: z.string(), + }), response: { - 200: z.object({ - token: z.string() - }), - 500: z.object({ - error: z.string() + 200: z.union([z.object({ + state: z.literal('requested'), + }), z.object({ + state: z.literal('authorized'), + token: z.string(), + response: z.string() + })]), + 401: z.object({ + error: z.literal('Invalid public key') }) } } }, async (request, reply) => { - try { - // Check if OpenAI API key is configured on server - const OPENAI_API_KEY = process.env.OPENAI_API_KEY; - if (!OPENAI_API_KEY) { - return reply.code(500).send({ - error: 'OpenAI API key not configured on server' - }); - } + const publicKey = privacyKit.decodeBase64(request.body.publicKey); + const isValid = tweetnacl.box.publicKeyLength === publicKey.length; + if (!isValid) { + return reply.code(401).send({ error: 'Invalid public key' }); + } - // Generate ephemeral token from OpenAI - const response = await fetch('https://api.openai.com/v1/realtime/sessions', { - method: 'POST', - headers: { - 'Authorization': `Bearer ${OPENAI_API_KEY}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - model: 'gpt-4o-realtime-preview-2024-12-17', - voice: 'verse', - }), - }); - - if (!response.ok) { - throw new Error(`OpenAI API error: ${response.status}`); - } - - const data = await response.json() as { - client_secret: { - value: string; - expires_at: number; - }; - id: string; - }; + const answer = await db.accountAuthRequest.upsert({ + where: { publicKey: privacyKit.encodeHex(publicKey) }, + update: {}, + create: { publicKey: privacyKit.encodeHex(publicKey) } + }); + if (answer.response && answer.responseAccountId) { + const token = await tokenGenerator.new({ user: answer.responseAccountId! }); return reply.send({ - token: data.client_secret.value - }); - } catch (error) { - log({ module: 'openai', level: 'error' }, 'Failed to generate ephemeral token', error); - return reply.code(500).send({ - error: 'Failed to generate ephemeral token' + state: 'authorized', + token: token, + response: answer.response }); } + + return reply.send({ state: 'requested' }); + }); + + // Approve account auth request + typed.post('/v1/auth/account/response', { + preHandler: app.authenticate, + schema: { + body: z.object({ + response: z.string(), + publicKey: z.string() + }) + } + }, async (request, reply) => { + const publicKey = privacyKit.decodeBase64(request.body.publicKey); + const isValid = tweetnacl.box.publicKeyLength === publicKey.length; + if (!isValid) { + return reply.code(401).send({ error: 'Invalid public key' }); + } + const authRequest = await db.accountAuthRequest.findUnique({ + where: { publicKey: privacyKit.encodeHex(publicKey) } + }); + if (!authRequest) { + return reply.code(404).send({ error: 'Request not found' }); + } + if (!authRequest.response) { + await db.accountAuthRequest.update({ + where: { id: authRequest.id }, + data: { response: request.body.response, responseAccountId: request.user.id } + }); + } + return reply.send({ success: true }); }); // Sessions API