feat: daemon kinda functional e2e

This commit is contained in:
Kirill Dubovitskiy 2025-08-18 00:05:37 -07:00
parent d03240061d
commit 62a2280268
4 changed files with 154 additions and 28 deletions

View File

@ -0,0 +1,3 @@
-- AlterTable
ALTER TABLE "Machine" ADD COLUMN "daemonState" TEXT,
ADD COLUMN "daemonStateVersion" INTEGER NOT NULL DEFAULT 0;

View File

@ -144,16 +144,18 @@ model UsageReport {
//
model Machine {
id String @id
accountId String
account Account @relation(fields: [accountId], references: [id])
metadata String // Encrypted - contains ALL machine info
metadataVersion Int @default(0)
seq Int @default(0)
active Boolean @default(true)
lastActiveAt DateTime @default(now())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
id String @id
accountId String
account Account @relation(fields: [accountId], references: [id])
metadata String // Encrypted - contains static machine info
metadataVersion Int @default(0)
daemonState String? // Encrypted - contains dynamic daemon state
daemonStateVersion Int @default(0)
seq Int @default(0)
active Boolean @default(true)
lastActiveAt DateTime @default(now())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([accountId, id])
@@index([accountId])

View File

@ -948,12 +948,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
schema: {
body: z.object({
id: z.string(),
metadata: z.string() // Encrypted metadata
metadata: z.string(), // Encrypted metadata
daemonState: z.string().optional() // Encrypted daemon state
})
}
}, async (request, reply) => {
const userId = request.user.id;
const { id, metadata } = request.body;
const { id, metadata, daemonState } = request.body;
// Check if machine exists (like sessions do)
const machine = await db.machine.findFirst({
@ -971,8 +972,10 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
id: machine.id,
metadata: machine.metadata,
metadataVersion: machine.metadataVersion,
daemonState: machine.daemonState,
daemonStateVersion: machine.daemonStateVersion,
active: machine.active,
lastActiveAt: machine.lastActiveAt.getTime(),
activeAt: machine.lastActiveAt.getTime(), // Return as activeAt for API consistency
createdAt: machine.createdAt.getTime(),
updatedAt: machine.updatedAt.getTime()
}
@ -986,7 +989,9 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
id,
accountId: userId,
metadata,
metadataVersion: 1
metadataVersion: 1,
daemonState: daemonState || null,
daemonStateVersion: daemonState ? 1 : 0
// active defaults to true in schema
// lastActiveAt defaults to now() in schema
}
@ -1002,7 +1007,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
seq: updSeq,
body: {
t: 'update-machine',
id: newMachine.id,
machineId: newMachine.id,
metadata: {
version: 1,
value: metadata
@ -1016,9 +1021,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
machine: {
id: newMachine.id,
metadata: newMachine.metadata,
metadataVersion: 1,
metadataVersion: newMachine.metadataVersion,
daemonState: newMachine.daemonState,
daemonStateVersion: newMachine.daemonStateVersion,
active: newMachine.active,
lastActiveAt: newMachine.lastActiveAt.getTime(),
activeAt: newMachine.lastActiveAt.getTime(), // Return as activeAt for API consistency
createdAt: newMachine.createdAt.getTime(),
updatedAt: newMachine.updatedAt.getTime()
}
@ -1042,6 +1049,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
id: m.id,
metadata: m.metadata,
metadataVersion: m.metadataVersion,
daemonState: m.daemonState,
daemonStateVersion: m.daemonStateVersion,
seq: m.seq,
active: m.active,
activeAt: m.lastActiveAt.getTime(),
@ -1078,6 +1087,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
id: machine.id,
metadata: machine.metadata,
metadataVersion: machine.metadataVersion,
daemonState: machine.daemonState,
daemonStateVersion: machine.daemonStateVersion,
seq: machine.seq,
active: machine.active,
activeAt: machine.lastActiveAt.getTime(),
@ -1253,10 +1264,10 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
event: 'ephemeral',
userId,
payload: {
type: 'daemon-status',
machineId,
status: 'online',
timestamp: Date.now()
type: 'machine-activity',
id: machineId,
active: true,
activeAt: Date.now()
},
recipientFilter: { type: 'user-scoped-only' }
});
@ -1306,10 +1317,10 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
event: 'ephemeral',
userId,
payload: {
type: 'daemon-status',
machineId: connection.machineId,
status: 'offline',
timestamp: Date.now()
type: 'machine-activity',
id: connection.machineId,
active: false,
activeAt: Date.now()
},
recipientFilter: { type: 'user-scoped-only' }
});
@ -1422,7 +1433,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
type: 'machine-activity',
id: updatedMachine.id,
active: true,
lastActiveAt: t,
activeAt: t,
},
recipientFilter: { type: 'user-scoped-only' }
});
@ -1778,7 +1789,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
const updSeq = await allocateUserSeq(userId);
const updContent: PrismaJson.UpdateBody = {
t: 'update-machine',
id: machineId,
machineId: machineId,
metadata: {
value: metadata,
version: expectedVersion + 1
@ -1812,6 +1823,112 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}
});
// Machine daemon state update with optimistic concurrency control
socket.on('machine-update-state', async (data: any, callback: (response: any) => void) => {
try {
const { machineId, daemonState, expectedVersion } = data;
// Validate input
if (!machineId || typeof daemonState !== 'string' || typeof expectedVersion !== 'number') {
if (callback) {
callback({ result: 'error', message: 'Invalid parameters' });
}
return;
}
// Resolve machine
const machine = await db.machine.findFirst({
where: {
accountId: userId,
id: machineId
}
});
if (!machine) {
if (callback) {
callback({ result: 'error', message: 'Machine not found' });
}
return;
}
// Check version
if (machine.daemonStateVersion !== expectedVersion) {
callback({
result: 'version-mismatch',
version: machine.daemonStateVersion,
daemonState: machine.daemonState
});
return;
}
// Update daemon state with atomic version check
const { count } = await db.machine.updateMany({
where: {
accountId: userId,
id: machineId,
daemonStateVersion: expectedVersion // Atomic CAS
},
data: {
daemonState: daemonState,
daemonStateVersion: expectedVersion + 1,
active: true,
lastActiveAt: new Date()
}
});
if (count === 0) {
// Re-fetch current version
const current = await db.machine.findFirst({
where: {
accountId: userId,
id: machineId
}
});
callback({
result: 'version-mismatch',
version: current?.daemonStateVersion || 0,
daemonState: current?.daemonState
});
return;
}
// Generate update
const updSeq = await allocateUserSeq(userId);
const updContent: PrismaJson.UpdateBody = {
t: 'update-machine',
machineId: machineId,
daemonState: {
value: daemonState,
version: expectedVersion + 1
}
};
// Emit to all connections
emitUpdateToInterestedClients({
event: 'update',
userId,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
},
recipientFilter: { type: 'all-user-authenticated-connections' }
});
// Send success response with new version
callback({
result: 'success',
version: expectedVersion + 1,
daemonState: daemonState
});
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Error in machine-update-state: ${error}`);
if (callback) {
callback({ result: 'error', message: 'Internal error' });
}
}
});
// RPC register - Register this socket as a listener for an RPC method
socket.on('rpc-register', async (data: any) => {
try {

View File

@ -62,11 +62,15 @@ declare global {
} | null | undefined;
} | {
t: 'update-machine';
id: string;
machineId: string;
metadata?: {
value: string;
version: number;
};
daemonState?: {
value: string;
version: number;
};
activeAt?: number;
};
}