From a4bc4d34e811e015e0b7b7967c288ff501c2a5b0 Mon Sep 17 00:00:00 2001 From: Kirill Dubovitskiy Date: Tue, 12 Aug 2025 03:30:23 -0700 Subject: [PATCH] feat: Add machine persistence to database MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add Machine model to Prisma schema • Create /v1/machines endpoints for CRUD operations • Persist machine metadata and track active status • Update socket handlers for machine-scoped connections • Convert ephemeral machine status to database persistence 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../migration.sql | 23 ++ prisma/schema.prisma | 21 + sources/app/api.ts | 364 ++++++++++++++---- 3 files changed, 331 insertions(+), 77 deletions(-) create mode 100644 prisma/migrations/20250812092041_add_machine_model/migration.sql diff --git a/prisma/migrations/20250812092041_add_machine_model/migration.sql b/prisma/migrations/20250812092041_add_machine_model/migration.sql new file mode 100644 index 0000000..1e06047 --- /dev/null +++ b/prisma/migrations/20250812092041_add_machine_model/migration.sql @@ -0,0 +1,23 @@ +-- CreateTable +CREATE TABLE "Machine" ( + "id" TEXT NOT NULL, + "accountId" TEXT NOT NULL, + "metadata" TEXT NOT NULL, + "metadataVersion" INTEGER NOT NULL DEFAULT 0, + "seq" INTEGER NOT NULL DEFAULT 0, + "active" BOOLEAN NOT NULL DEFAULT true, + "lastActiveAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "Machine_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "Machine_accountId_idx" ON "Machine"("accountId"); + +-- CreateIndex +CREATE UNIQUE INDEX "Machine_accountId_id_key" ON "Machine"("accountId", "id"); + +-- AddForeignKey +ALTER TABLE "Machine" ADD CONSTRAINT "Machine_accountId_fkey" FOREIGN KEY ("accountId") REFERENCES "Account"("id") ON DELETE RESTRICT ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 535b078..d494f7f 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -30,6 +30,7 @@ model Account { AccountPushToken AccountPushToken[] TerminalAuthRequest TerminalAuthRequest[] UsageReport UsageReport[] + Machine Machine[] } model TerminalAuthRequest { @@ -137,3 +138,23 @@ model UsageReport { @@index([accountId]) @@index([sessionId]) } + +// +// Machines +// + +model Machine { + id String @id + accountId String + account Account @relation(fields: [accountId], references: [id]) + metadata String // Encrypted - contains ALL machine info + metadataVersion Int @default(0) + seq Int @default(0) + active Boolean @default(true) + lastActiveAt DateTime @default(now()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@unique([accountId, id]) + @@index([accountId]) +} diff --git a/sources/app/api.ts b/sources/app/api.ts index 6892a26..29b5d2b 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -12,6 +12,35 @@ import { allocateSessionSeq, allocateUserSeq } from "@/services/seq"; import { randomKeyNaked } from "@/utils/randomKeyNaked"; import { AsyncLock } from "@/utils/lock"; +// Session alive event types +type SessionAliveEvent = + | { + type: 'session-scoped'; + sid: string; + time: number; + thinking: boolean; + mode: 'local' | 'remote'; + } + | { + type: 'machine-scoped'; + machineId: string; + time: number; + } + | { + // Legacy format (no type field) - defaults to session-scoped + sid: string; + time: number; + thinking: boolean; + mode?: 'local' | 'remote'; + type?: undefined; + }; + +// Recipient filter types +type RecipientFilter = + | { type: 'all-interested-in-session'; sessionId: string } + | { type: 'user-scoped-only' } + | { type: 'all-user-authenticated-connections' }; + // Connection metadata types interface SessionScopedConnection { connectionType: 'session-scoped'; @@ -105,11 +134,17 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }); // Send session update to all relevant connections - let emitUpdateToInterestedClients = ({ event, userId, sessionId, payload, skipSenderConnection }: { + let emitUpdateToInterestedClients = ({ + event, + userId, + payload, + recipientFilter = { type: 'all-user-authenticated-connections' }, + skipSenderConnection + }: { event: string, userId: string, - sessionId: string, payload: any, + recipientFilter?: RecipientFilter, skipSenderConnection?: ClientConnection }) => { const connections = userIdToClientConnections.get(userId); @@ -124,27 +159,33 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> continue; } - // Send to all user-scoped connections - we already matched user - if (connection.connectionType === 'user-scoped') { - log({ module: 'websocket' }, `Sending ${event} to user-scoped connection ${connection.socket.id}`); - connection.socket.emit(event, payload); + // Apply recipient filter + switch (recipientFilter.type) { + case 'all-interested-in-session': + // Send to session-scoped with matching session + all user-scoped + if (connection.connectionType === 'session-scoped') { + if (connection.sessionId !== recipientFilter.sessionId) { + continue; // Wrong session + } + } else if (connection.connectionType === 'machine-scoped') { + continue; // Machines don't need session updates + } + // user-scoped always gets it + break; + + case 'user-scoped-only': + if (connection.connectionType !== 'user-scoped') { + continue; + } + break; + + case 'all-user-authenticated-connections': + // Send to all connection types (default behavior) + break; } - // Send to all session-scoped connections, only that match sessionId - if (connection.connectionType === 'session-scoped') { - const matches = connection.sessionId === sessionId; - log({ module: 'websocket' }, `Session-scoped connection ${connection.socket.id}: sessionId=${connection.sessionId}, messageSessionId=${sessionId}, matches=${matches}`); - if (matches) { - log({ module: 'websocket' }, `Sending ${event} to session-scoped connection ${connection.socket.id}`); - connection.socket.emit(event, payload); - } - } - - // Send to all machine-scoped connections - they get all user updates - if (connection.connectionType === 'machine-scoped') { - log({ module: 'websocket' }, `Sending ${event} to machine-scoped connection ${connection.socket.id}`); - connection.socket.emit(event, payload); - } + log({ module: 'websocket' }, `Sending ${event} to ${connection.connectionType} connection ${connection.socket.id}`); + connection.socket.emit(event, payload); } } @@ -445,13 +486,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'update', userId, - sessionId: session.id, payload: { id: randomKeyNaked(12), seq: updSeq, body: updContent, createdAt: Date.now() - } + }, + recipientFilter: { type: 'all-user-authenticated-connections' } }); return reply.send({ @@ -679,18 +720,18 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }; - // Get all user connections (not session-specific) - const connections = userIdToClientConnections.get(userId); - if (connections) { - for (const connection of connections) { - connection.socket.emit('update', { - id: randomKeyNaked(12), - seq: updSeq, - body: updContent, - createdAt: Date.now() - }); - } - } + // Send to all user connections + emitUpdateToInterestedClients({ + event: 'update', + userId, + payload: { + id: randomKeyNaked(12), + seq: updSeq, + body: updContent, + createdAt: Date.now() + }, + recipientFilter: { type: 'all-user-authenticated-connections' } + }); return reply.send({ success: true, @@ -890,6 +931,86 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }); }); + // Machines API + typed.get('/v1/machines', { + preHandler: app.authenticate, + }, async (request, reply) => { + const userId = request.user.id; + + const machines = await db.machine.findMany({ + where: { accountId: userId }, + orderBy: { lastActiveAt: 'desc' } + }); + + return machines.map(m => ({ + id: m.id, + metadata: m.metadata, + metadataVersion: m.metadataVersion, + seq: m.seq, + active: m.active, + lastActiveAt: m.lastActiveAt.getTime(), + createdAt: m.createdAt.getTime(), + updatedAt: m.updatedAt.getTime() + })); + }); + + // POST /v1/machines - Create or update machine + typed.post('/v1/machines', { + preHandler: app.authenticate, + schema: { + body: z.object({ + id: z.string(), + metadata: z.string() // Encrypted metadata + }) + } + }, async (request, reply) => { + const userId = request.user.id; + const { id, metadata } = request.body; + + const machine = await db.machine.upsert({ + where: { + accountId_id: { + accountId: userId, + id + } + }, + create: { + id, + accountId: userId, + metadata, + metadataVersion: 1 + }, + update: { + metadata, + metadataVersion: { increment: 1 }, + active: true, + lastActiveAt: new Date() + } + }); + + // Emit update to all user connections + const updSeq = await allocateUserSeq(userId); + emitUpdateToInterestedClients({ + event: 'update', + userId, + payload: { + id: randomKeyNaked(), + seq: updSeq, + body: { + t: 'update-machine', + id: machine.id, + metadata: { + version: machine.metadataVersion, + value: metadata + } + }, + createdAt: Date.now() + } + }); + + return { success: true }; + }); + // Start const port = process.env.PORT ? parseInt(process.env.PORT, 10) : 3005; await app.listen({ port, host: '0.0.0.0' }); @@ -999,13 +1120,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'ephemeral', userId, - sessionId: '', // No specific session payload: { type: 'daemon-status', machineId, status: 'online', timestamp: Date.now() - } + }, + recipientFilter: { type: 'user-scoped-only' } }); } @@ -1052,58 +1173,100 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'ephemeral', userId, - sessionId: '', // No specific session payload: { type: 'daemon-status', machineId: connection.machineId, status: 'offline', timestamp: Date.now() - } + }, + recipientFilter: { type: 'user-scoped-only' } }); } }); - socket.on('session-alive', async (data: any) => { + socket.on('session-alive', async (data: SessionAliveEvent) => { try { - const { sid, time, thinking } = data; - let t = time; - if (typeof t !== 'number') { + // Basic validation + if (!data || typeof data.time !== 'number') { return; } + + let t = data.time; if (t > Date.now()) { t = Date.now(); } - if (t < Date.now() - 1000 * 60 * 10) { // Ignore if time is in the past 10 minutes + if (t < Date.now() - 1000 * 60 * 10) { return; } - // Resolve session - const session = await db.session.findUnique({ - where: { id: sid, accountId: userId } - }); - if (!session) { - return; + // Determine type (default to session-scoped for legacy) + const eventType = data.type || 'session-scoped'; + + // Validate but CONTINUE with warning + if (eventType === 'machine-scoped' && connection.connectionType !== 'machine-scoped') { + log({ module: 'websocket', level: 'warn' }, + `Connection type mismatch: ${connection.connectionType} sending machine-scoped alive`); + // CONTINUE ANYWAY + } + if (eventType === 'session-scoped' && connection.connectionType === 'machine-scoped') { + log({ module: 'websocket', level: 'warn' }, + `Connection type mismatch: ${connection.connectionType} sending session-scoped alive`); + // CONTINUE ANYWAY } - // Update last active at - await db.session.update({ - where: { id: sid }, - data: { lastActiveAt: new Date(t), active: true } - }); - - // Emit update to connected sockets - emitUpdateToInterestedClients({ - event: 'ephemeral', - userId, - sessionId: sid, - payload: { - type: 'activity', - id: sid, - active: true, - activeAt: t, - thinking + // Handle based on type + if (eventType === 'machine-scoped' && 'machineId' in data) { + // Machine heartbeat - update database instead of ephemeral + const machineId = connection.connectionType === 'machine-scoped' ? connection.machineId : data.machineId; + + // Update machine lastActiveAt in database + await db.machine.update({ + where: { + accountId_id: { + accountId: userId, + id: machineId + } + }, + data: { + lastActiveAt: new Date(t), + active: true + } + }).catch(() => { + // Machine might not exist yet, that's ok + }); + + } else if ('sid' in data) { + // Session heartbeat (legacy or explicit session-scoped) + const { sid, thinking } = data; + + // Resolve session + const session = await db.session.findUnique({ + where: { id: sid, accountId: userId } + }); + if (!session) { + return; } - }); + + // Update last active + await db.session.update({ + where: { id: sid }, + data: { lastActiveAt: new Date(t), active: true } + }); + + // Emit update + emitUpdateToInterestedClients({ + event: 'ephemeral', + userId, + payload: { + type: 'activity', + id: sid, + active: true, + activeAt: t, + thinking: thinking || false + }, + recipientFilter: { type: 'all-user-authenticated-connections' } + }); + } } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in session-alive: ${error}`); } @@ -1141,14 +1304,14 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'ephemeral', userId, - sessionId: sid, payload: { type: 'activity', id: sid, active: false, activeAt: t, thinking: false - } + }, + recipientFilter: { type: 'all-user-authenticated-connections' } }); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in session-end: ${error}`); @@ -1219,13 +1382,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'update', userId, - sessionId: sid, payload: { id: randomKeyNaked(12), seq: updSeq, body: update, createdAt: Date.now() }, + recipientFilter: { type: 'all-interested-in-session', sessionId: sid }, skipSenderConnection: connection }); } catch (error) { @@ -1286,13 +1449,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'update', userId, - sessionId: sid, payload: { id: randomKeyNaked(12), seq: updSeq, body: updContent, createdAt: Date.now() - } + }, + recipientFilter: { type: 'all-interested-in-session', sessionId: sid } }); // Send success response with new version via callback @@ -1363,13 +1526,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'update', userId, - sessionId: sid, payload: { id: randomKeyNaked(12), seq: updSeq, body: updContent, createdAt: Date.now() - } + }, + recipientFilter: { type: 'all-interested-in-session', sessionId: sid } }); // Send success response with new version via callback @@ -1382,6 +1545,53 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }); + // Update machine metadata through socket + socket.on('update-machine-metadata', async (data: { metadata: string }) => { + if (connection.connectionType !== 'machine-scoped') { + return; // Only machines can update their own metadata + } + + const machineId = connection.machineId; + + try { + const machine = await db.machine.update({ + where: { + accountId_id: { + accountId: userId, + id: machineId + } + }, + data: { + metadata: data.metadata, + metadataVersion: { increment: 1 } + } + }); + + // Emit to other connections + const updSeq = await allocateUserSeq(userId); + emitUpdateToInterestedClients({ + event: 'update', + userId, + payload: { + id: randomKeyNaked(), + seq: updSeq, + body: { + t: 'update-machine', + id: machineId, + metadata: { + version: machine.metadataVersion, + value: data.metadata + } + }, + createdAt: Date.now() + }, + skipSenderConnection: connection + }); + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Error updating machine metadata: ${error}`); + } + }); + // RPC register - Register this socket as a listener for an RPC method socket.on('rpc-register', async (data: any) => { try { @@ -1644,7 +1854,6 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'ephemeral', userId, - sessionId, payload: { type: 'usage', id: sessionId, @@ -1652,7 +1861,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> tokens: usageData.tokens, cost: usageData.cost, timestamp: Date.now() - } + }, + recipientFilter: { type: 'user-scoped-only' } }); }