feat: Add machine persistence to database

• Add Machine model to Prisma schema
• Create /v1/machines endpoints for CRUD operations
• Persist machine metadata and track active status
• Update socket handlers for machine-scoped connections
• Convert ephemeral machine status to database persistence

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Kirill Dubovitskiy 2025-08-12 03:30:23 -07:00
parent 5e06cc3947
commit a4bc4d34e8
3 changed files with 331 additions and 77 deletions

View File

@ -0,0 +1,23 @@
-- CreateTable
CREATE TABLE "Machine" (
"id" TEXT NOT NULL,
"accountId" TEXT NOT NULL,
"metadata" TEXT NOT NULL,
"metadataVersion" INTEGER NOT NULL DEFAULT 0,
"seq" INTEGER NOT NULL DEFAULT 0,
"active" BOOLEAN NOT NULL DEFAULT true,
"lastActiveAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
CONSTRAINT "Machine_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "Machine_accountId_idx" ON "Machine"("accountId");
-- CreateIndex
CREATE UNIQUE INDEX "Machine_accountId_id_key" ON "Machine"("accountId", "id");
-- AddForeignKey
ALTER TABLE "Machine" ADD CONSTRAINT "Machine_accountId_fkey" FOREIGN KEY ("accountId") REFERENCES "Account"("id") ON DELETE RESTRICT ON UPDATE CASCADE;

View File

@ -30,6 +30,7 @@ model Account {
AccountPushToken AccountPushToken[]
TerminalAuthRequest TerminalAuthRequest[]
UsageReport UsageReport[]
Machine Machine[]
}
model TerminalAuthRequest {
@ -137,3 +138,23 @@ model UsageReport {
@@index([accountId])
@@index([sessionId])
}
//
// Machines
//
model Machine {
id String @id
accountId String
account Account @relation(fields: [accountId], references: [id])
metadata String // Encrypted - contains ALL machine info
metadataVersion Int @default(0)
seq Int @default(0)
active Boolean @default(true)
lastActiveAt DateTime @default(now())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([accountId, id])
@@index([accountId])
}

View File

@ -12,6 +12,35 @@ import { allocateSessionSeq, allocateUserSeq } from "@/services/seq";
import { randomKeyNaked } from "@/utils/randomKeyNaked";
import { AsyncLock } from "@/utils/lock";
// Session alive event types
type SessionAliveEvent =
| {
type: 'session-scoped';
sid: string;
time: number;
thinking: boolean;
mode: 'local' | 'remote';
}
| {
type: 'machine-scoped';
machineId: string;
time: number;
}
| {
// Legacy format (no type field) - defaults to session-scoped
sid: string;
time: number;
thinking: boolean;
mode?: 'local' | 'remote';
type?: undefined;
};
// Recipient filter types
type RecipientFilter =
| { type: 'all-interested-in-session'; sessionId: string }
| { type: 'user-scoped-only' }
| { type: 'all-user-authenticated-connections' };
// Connection metadata types
interface SessionScopedConnection {
connectionType: 'session-scoped';
@ -105,11 +134,17 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
});
// Send session update to all relevant connections
let emitUpdateToInterestedClients = ({ event, userId, sessionId, payload, skipSenderConnection }: {
let emitUpdateToInterestedClients = ({
event,
userId,
payload,
recipientFilter = { type: 'all-user-authenticated-connections' },
skipSenderConnection
}: {
event: string,
userId: string,
sessionId: string,
payload: any,
recipientFilter?: RecipientFilter,
skipSenderConnection?: ClientConnection
}) => {
const connections = userIdToClientConnections.get(userId);
@ -124,27 +159,33 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
continue;
}
// Send to all user-scoped connections - we already matched user
if (connection.connectionType === 'user-scoped') {
log({ module: 'websocket' }, `Sending ${event} to user-scoped connection ${connection.socket.id}`);
connection.socket.emit(event, payload);
// Apply recipient filter
switch (recipientFilter.type) {
case 'all-interested-in-session':
// Send to session-scoped with matching session + all user-scoped
if (connection.connectionType === 'session-scoped') {
if (connection.sessionId !== recipientFilter.sessionId) {
continue; // Wrong session
}
} else if (connection.connectionType === 'machine-scoped') {
continue; // Machines don't need session updates
}
// user-scoped always gets it
break;
case 'user-scoped-only':
if (connection.connectionType !== 'user-scoped') {
continue;
}
break;
case 'all-user-authenticated-connections':
// Send to all connection types (default behavior)
break;
}
// Send to all session-scoped connections, only that match sessionId
if (connection.connectionType === 'session-scoped') {
const matches = connection.sessionId === sessionId;
log({ module: 'websocket' }, `Session-scoped connection ${connection.socket.id}: sessionId=${connection.sessionId}, messageSessionId=${sessionId}, matches=${matches}`);
if (matches) {
log({ module: 'websocket' }, `Sending ${event} to session-scoped connection ${connection.socket.id}`);
connection.socket.emit(event, payload);
}
}
// Send to all machine-scoped connections - they get all user updates
if (connection.connectionType === 'machine-scoped') {
log({ module: 'websocket' }, `Sending ${event} to machine-scoped connection ${connection.socket.id}`);
connection.socket.emit(event, payload);
}
log({ module: 'websocket' }, `Sending ${event} to ${connection.connectionType} connection ${connection.socket.id}`);
connection.socket.emit(event, payload);
}
}
@ -445,13 +486,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
emitUpdateToInterestedClients({
event: 'update',
userId,
sessionId: session.id,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
}
},
recipientFilter: { type: 'all-user-authenticated-connections' }
});
return reply.send({
@ -679,18 +720,18 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}
};
// Get all user connections (not session-specific)
const connections = userIdToClientConnections.get(userId);
if (connections) {
for (const connection of connections) {
connection.socket.emit('update', {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
});
}
}
// Send to all user connections
emitUpdateToInterestedClients({
event: 'update',
userId,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
},
recipientFilter: { type: 'all-user-authenticated-connections' }
});
return reply.send({
success: true,
@ -890,6 +931,86 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
});
});
// Machines API
typed.get('/v1/machines', {
preHandler: app.authenticate,
}, async (request, reply) => {
const userId = request.user.id;
const machines = await db.machine.findMany({
where: { accountId: userId },
orderBy: { lastActiveAt: 'desc' }
});
return machines.map(m => ({
id: m.id,
metadata: m.metadata,
metadataVersion: m.metadataVersion,
seq: m.seq,
active: m.active,
lastActiveAt: m.lastActiveAt.getTime(),
createdAt: m.createdAt.getTime(),
updatedAt: m.updatedAt.getTime()
}));
});
// POST /v1/machines - Create or update machine
typed.post('/v1/machines', {
preHandler: app.authenticate,
schema: {
body: z.object({
id: z.string(),
metadata: z.string() // Encrypted metadata
})
}
}, async (request, reply) => {
const userId = request.user.id;
const { id, metadata } = request.body;
const machine = await db.machine.upsert({
where: {
accountId_id: {
accountId: userId,
id
}
},
create: {
id,
accountId: userId,
metadata,
metadataVersion: 1
},
update: {
metadata,
metadataVersion: { increment: 1 },
active: true,
lastActiveAt: new Date()
}
});
// Emit update to all user connections
const updSeq = await allocateUserSeq(userId);
emitUpdateToInterestedClients({
event: 'update',
userId,
payload: {
id: randomKeyNaked(),
seq: updSeq,
body: {
t: 'update-machine',
id: machine.id,
metadata: {
version: machine.metadataVersion,
value: metadata
}
},
createdAt: Date.now()
}
});
return { success: true };
});
// Start
const port = process.env.PORT ? parseInt(process.env.PORT, 10) : 3005;
await app.listen({ port, host: '0.0.0.0' });
@ -999,13 +1120,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
emitUpdateToInterestedClients({
event: 'ephemeral',
userId,
sessionId: '', // No specific session
payload: {
type: 'daemon-status',
machineId,
status: 'online',
timestamp: Date.now()
}
},
recipientFilter: { type: 'user-scoped-only' }
});
}
@ -1052,58 +1173,100 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
emitUpdateToInterestedClients({
event: 'ephemeral',
userId,
sessionId: '', // No specific session
payload: {
type: 'daemon-status',
machineId: connection.machineId,
status: 'offline',
timestamp: Date.now()
}
},
recipientFilter: { type: 'user-scoped-only' }
});
}
});
socket.on('session-alive', async (data: any) => {
socket.on('session-alive', async (data: SessionAliveEvent) => {
try {
const { sid, time, thinking } = data;
let t = time;
if (typeof t !== 'number') {
// Basic validation
if (!data || typeof data.time !== 'number') {
return;
}
let t = data.time;
if (t > Date.now()) {
t = Date.now();
}
if (t < Date.now() - 1000 * 60 * 10) { // Ignore if time is in the past 10 minutes
if (t < Date.now() - 1000 * 60 * 10) {
return;
}
// Resolve session
const session = await db.session.findUnique({
where: { id: sid, accountId: userId }
});
if (!session) {
return;
// Determine type (default to session-scoped for legacy)
const eventType = data.type || 'session-scoped';
// Validate but CONTINUE with warning
if (eventType === 'machine-scoped' && connection.connectionType !== 'machine-scoped') {
log({ module: 'websocket', level: 'warn' },
`Connection type mismatch: ${connection.connectionType} sending machine-scoped alive`);
// CONTINUE ANYWAY
}
if (eventType === 'session-scoped' && connection.connectionType === 'machine-scoped') {
log({ module: 'websocket', level: 'warn' },
`Connection type mismatch: ${connection.connectionType} sending session-scoped alive`);
// CONTINUE ANYWAY
}
// Update last active at
await db.session.update({
where: { id: sid },
data: { lastActiveAt: new Date(t), active: true }
});
// Emit update to connected sockets
emitUpdateToInterestedClients({
event: 'ephemeral',
userId,
sessionId: sid,
payload: {
type: 'activity',
id: sid,
active: true,
activeAt: t,
thinking
// Handle based on type
if (eventType === 'machine-scoped' && 'machineId' in data) {
// Machine heartbeat - update database instead of ephemeral
const machineId = connection.connectionType === 'machine-scoped' ? connection.machineId : data.machineId;
// Update machine lastActiveAt in database
await db.machine.update({
where: {
accountId_id: {
accountId: userId,
id: machineId
}
},
data: {
lastActiveAt: new Date(t),
active: true
}
}).catch(() => {
// Machine might not exist yet, that's ok
});
} else if ('sid' in data) {
// Session heartbeat (legacy or explicit session-scoped)
const { sid, thinking } = data;
// Resolve session
const session = await db.session.findUnique({
where: { id: sid, accountId: userId }
});
if (!session) {
return;
}
});
// Update last active
await db.session.update({
where: { id: sid },
data: { lastActiveAt: new Date(t), active: true }
});
// Emit update
emitUpdateToInterestedClients({
event: 'ephemeral',
userId,
payload: {
type: 'activity',
id: sid,
active: true,
activeAt: t,
thinking: thinking || false
},
recipientFilter: { type: 'all-user-authenticated-connections' }
});
}
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Error in session-alive: ${error}`);
}
@ -1141,14 +1304,14 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
emitUpdateToInterestedClients({
event: 'ephemeral',
userId,
sessionId: sid,
payload: {
type: 'activity',
id: sid,
active: false,
activeAt: t,
thinking: false
}
},
recipientFilter: { type: 'all-user-authenticated-connections' }
});
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Error in session-end: ${error}`);
@ -1219,13 +1382,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
emitUpdateToInterestedClients({
event: 'update',
userId,
sessionId: sid,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: update,
createdAt: Date.now()
},
recipientFilter: { type: 'all-interested-in-session', sessionId: sid },
skipSenderConnection: connection
});
} catch (error) {
@ -1286,13 +1449,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
emitUpdateToInterestedClients({
event: 'update',
userId,
sessionId: sid,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
}
},
recipientFilter: { type: 'all-interested-in-session', sessionId: sid }
});
// Send success response with new version via callback
@ -1363,13 +1526,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
emitUpdateToInterestedClients({
event: 'update',
userId,
sessionId: sid,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
}
},
recipientFilter: { type: 'all-interested-in-session', sessionId: sid }
});
// Send success response with new version via callback
@ -1382,6 +1545,53 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}
});
// Update machine metadata through socket
socket.on('update-machine-metadata', async (data: { metadata: string }) => {
if (connection.connectionType !== 'machine-scoped') {
return; // Only machines can update their own metadata
}
const machineId = connection.machineId;
try {
const machine = await db.machine.update({
where: {
accountId_id: {
accountId: userId,
id: machineId
}
},
data: {
metadata: data.metadata,
metadataVersion: { increment: 1 }
}
});
// Emit to other connections
const updSeq = await allocateUserSeq(userId);
emitUpdateToInterestedClients({
event: 'update',
userId,
payload: {
id: randomKeyNaked(),
seq: updSeq,
body: {
t: 'update-machine',
id: machineId,
metadata: {
version: machine.metadataVersion,
value: data.metadata
}
},
createdAt: Date.now()
},
skipSenderConnection: connection
});
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Error updating machine metadata: ${error}`);
}
});
// RPC register - Register this socket as a listener for an RPC method
socket.on('rpc-register', async (data: any) => {
try {
@ -1644,7 +1854,6 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
emitUpdateToInterestedClients({
event: 'ephemeral',
userId,
sessionId,
payload: {
type: 'usage',
id: sessionId,
@ -1652,7 +1861,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
tokens: usageData.tokens,
cost: usageData.cost,
timestamp: Date.now()
}
},
recipientFilter: { type: 'user-scoped-only' }
});
}