refactor: prepare server for machine sync refactoring

- Clean up machine API endpoints formatting
- Update machine-alive to use ephemeral events instead of updates
- Prepare types for separated metadata and daemonState
- Fix activeAt field name consistency in machine responses

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Kirill Dubovitskiy 2025-08-17 18:32:31 -07:00
parent 6b1a3c3e82
commit d03240061d
3 changed files with 285 additions and 172 deletions

View File

@ -13,7 +13,7 @@
"migrate": "dotenv -e .env.dev -- prisma migrate dev", "migrate": "dotenv -e .env.dev -- prisma migrate dev",
"generate": "prisma generate", "generate": "prisma generate",
"postinstall": "prisma generate", "postinstall": "prisma generate",
"db": "docker run -d -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=handy -v postgres-data:/var/lib/postgresql/data -p 5432:5432 postgres", "db": "docker run -d -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=handy -v $(pwd)/.pgdata:/var/lib/postgresql/data -p 5432:5432 postgres",
"redis": "docker run -d -p 6379:6379 redis" "redis": "docker run -d -p 6379:6379 redis"
}, },
"devDependencies": { "devDependencies": {

View File

@ -14,10 +14,10 @@ import { AsyncLock } from "@/utils/lock";
// Recipient filter types // Recipient filter types
type RecipientFilter = type RecipientFilter =
| { type: 'all-interested-in-session'; sessionId: string } | { type: 'all-interested-in-session'; sessionId: string }
| { type: 'user-scoped-only' } | { type: 'user-scoped-only' }
| { type: 'all-user-authenticated-connections' }; | { type: 'all-user-authenticated-connections' };
// Connection metadata types // Connection metadata types
interface SessionScopedConnection { interface SessionScopedConnection {
@ -108,7 +108,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
log({ module: 'auth-decorator' }, `Auth failed - user not found: ${verified.user}`); log({ module: 'auth-decorator' }, `Auth failed - user not found: ${verified.user}`);
return reply.code(401).send({ error: 'User not found' }); return reply.code(401).send({ error: 'User not found' });
} }
log({ module: 'auth-decorator' }, `Auth success - user: ${user.id}`); log({ module: 'auth-decorator' }, `Auth success - user: ${user.id}`);
request.user = user; request.user = user;
@ -118,12 +118,12 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}); });
// Send session update to all relevant connections // Send session update to all relevant connections
let emitUpdateToInterestedClients = ({ let emitUpdateToInterestedClients = ({
event, event,
userId, userId,
payload, payload,
recipientFilter = { type: 'all-user-authenticated-connections' }, recipientFilter = { type: 'all-user-authenticated-connections' },
skipSenderConnection skipSenderConnection
}: { }: {
event: string, event: string,
userId: string, userId: string,
@ -156,13 +156,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
} }
// user-scoped always gets it // user-scoped always gets it
break; break;
case 'user-scoped-only': case 'user-scoped-only':
if (connection.connectionType !== 'user-scoped') { if (connection.connectionType !== 'user-scoped') {
continue; continue;
} }
break; break;
case 'all-user-authenticated-connections': case 'all-user-authenticated-connections':
// Send to all connection types (default behavior) // Send to all connection types (default behavior)
break; break;
@ -232,7 +232,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
const publicKeyHex = privacyKit.encodeHex(publicKey); const publicKeyHex = privacyKit.encodeHex(publicKey);
log({ module: 'auth-request' }, `Terminal auth request - publicKey hex: ${publicKeyHex}`); log({ module: 'auth-request' }, `Terminal auth request - publicKey hex: ${publicKeyHex}`);
const answer = await db.terminalAuthRequest.upsert({ const answer = await db.terminalAuthRequest.upsert({
where: { publicKey: publicKeyHex }, where: { publicKey: publicKeyHex },
update: {}, update: {},
@ -310,11 +310,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
// Check if OpenAI API key is configured on server // Check if OpenAI API key is configured on server
const OPENAI_API_KEY = process.env.OPENAI_API_KEY; const OPENAI_API_KEY = process.env.OPENAI_API_KEY;
if (!OPENAI_API_KEY) { if (!OPENAI_API_KEY) {
return reply.code(500).send({ return reply.code(500).send({
error: 'OpenAI API key not configured on server' error: 'OpenAI API key not configured on server'
}); });
} }
// Generate ephemeral token from OpenAI // Generate ephemeral token from OpenAI
const response = await fetch('https://api.openai.com/v1/realtime/sessions', { const response = await fetch('https://api.openai.com/v1/realtime/sessions', {
method: 'POST', method: 'POST',
@ -327,11 +327,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
voice: 'verse', voice: 'verse',
}), }),
}); });
if (!response.ok) { if (!response.ok) {
throw new Error(`OpenAI API error: ${response.status}`); throw new Error(`OpenAI API error: ${response.status}`);
} }
const data = await response.json() as { const data = await response.json() as {
client_secret: { client_secret: {
value: string; value: string;
@ -339,14 +339,14 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}; };
id: string; id: string;
}; };
return reply.send({ return reply.send({
token: data.client_secret.value token: data.client_secret.value
}); });
} catch (error) { } catch (error) {
log({ module: 'openai', level: 'error' }, 'Failed to generate ephemeral token', error); log({ module: 'openai', level: 'error' }, 'Failed to generate ephemeral token', error);
return reply.code(500).send({ return reply.code(500).send({
error: 'Failed to generate ephemeral token' error: 'Failed to generate ephemeral token'
}); });
} }
}); });
@ -391,7 +391,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
const lastMessage = v.messages[0]; const lastMessage = v.messages[0];
const sessionUpdatedAt = v.updatedAt.getTime(); const sessionUpdatedAt = v.updatedAt.getTime();
const lastMessageCreatedAt = lastMessage ? lastMessage.createdAt.getTime() : 0; const lastMessageCreatedAt = lastMessage ? lastMessage.createdAt.getTime() : 0;
return { return {
id: v.id, id: v.id,
seq: v.seq, seq: v.seq,
@ -490,9 +490,9 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
body: updContent, body: updContent,
createdAt: Date.now() createdAt: Date.now()
}; };
logger.info({ logger.info({
module: 'session-create', module: 'session-create',
userId, userId,
sessionId: session.id, sessionId: session.id,
updateType: 'new-session', updateType: 'new-session',
updatePayload: JSON.stringify(updatePayload) updatePayload: JSON.stringify(updatePayload)
@ -940,30 +940,9 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}); });
}); });
// Machines API // Machines
typed.get('/v1/machines', {
preHandler: app.authenticate,
}, async (request, reply) => {
const userId = request.user.id;
const machines = await db.machine.findMany({
where: { accountId: userId },
orderBy: { lastActiveAt: 'desc' }
});
return machines.map(m => ({
id: m.id,
metadata: m.metadata,
metadataVersion: m.metadataVersion,
seq: m.seq,
active: m.active,
lastActiveAt: m.lastActiveAt.getTime(),
createdAt: m.createdAt.getTime(),
updatedAt: m.updatedAt.getTime()
}));
});
// POST /v1/machines - Create or update machine // POST /v1/machines - Create machine or return existing
typed.post('/v1/machines', { typed.post('/v1/machines', {
preHandler: app.authenticate, preHandler: app.authenticate,
schema: { schema: {
@ -975,67 +954,139 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}, async (request, reply) => { }, async (request, reply) => {
const userId = request.user.id; const userId = request.user.id;
const { id, metadata } = request.body; const { id, metadata } = request.body;
logger.info({ module: 'machines', machineId: id, userId, hasMetadata: !!metadata }, 'Creating/updating machine'); // Check if machine exists (like sessions do)
const machine = await db.machine.findFirst({
try {
const machine = await db.machine.upsert({
where: { where: {
accountId_id: {
accountId: userId,
id
}
},
create: {
id,
accountId: userId, accountId: userId,
metadata, id: id
metadataVersion: 1
},
update: {
metadata,
metadataVersion: { increment: 1 },
active: true,
lastActiveAt: new Date()
} }
}); });
// Emit update to all user connections if (machine) {
const updSeq = await allocateUserSeq(userId); // Machine exists - just return it
emitUpdateToInterestedClients({ logger.info({ module: 'machines', machineId: id, userId }, 'Found existing machine');
event: 'update', return reply.send({
userId, machine: {
payload: {
id: randomKeyNaked(),
seq: updSeq,
body: {
t: 'update-machine',
id: machine.id, id: machine.id,
metadata: { metadata: machine.metadata,
version: machine.metadataVersion, metadataVersion: machine.metadataVersion,
value: metadata active: machine.active,
} lastActiveAt: machine.lastActiveAt.getTime(),
}, createdAt: machine.createdAt.getTime(),
createdAt: Date.now() updatedAt: machine.updatedAt.getTime()
} }
}); });
} else {
return { success: true }; // Create new machine
} catch (error) { logger.info({ module: 'machines', machineId: id, userId }, 'Creating new machine');
logger.error({
module: 'machines', const newMachine = await db.machine.create({
machineId: id, data: {
userId, id,
error: error instanceof Error ? error.message : String(error), accountId: userId,
errorStack: error instanceof Error ? error.stack : undefined metadata,
}, 'Failed to create/update machine'); metadataVersion: 1
return reply.code(500).send({ // active defaults to true in schema
error: 'Failed to create/update machine', // lastActiveAt defaults to now() in schema
details: error instanceof Error ? error.message : String(error) }
});
// Emit update for new machine
const updSeq = await allocateUserSeq(userId);
emitUpdateToInterestedClients({
event: 'update',
userId,
payload: {
id: randomKeyNaked(),
seq: updSeq,
body: {
t: 'update-machine',
id: newMachine.id,
metadata: {
version: 1,
value: metadata
}
},
createdAt: Date.now()
}
});
return reply.send({
machine: {
id: newMachine.id,
metadata: newMachine.metadata,
metadataVersion: 1,
active: newMachine.active,
lastActiveAt: newMachine.lastActiveAt.getTime(),
createdAt: newMachine.createdAt.getTime(),
updatedAt: newMachine.updatedAt.getTime()
}
}); });
} }
}); });
// Machines API
typed.get('/v1/machines', {
preHandler: app.authenticate,
}, async (request, reply) => {
const userId = request.user.id;
const machines = await db.machine.findMany({
where: { accountId: userId },
orderBy: { lastActiveAt: 'desc' }
});
return machines.map(m => ({
id: m.id,
metadata: m.metadata,
metadataVersion: m.metadataVersion,
seq: m.seq,
active: m.active,
activeAt: m.lastActiveAt.getTime(),
createdAt: m.createdAt.getTime(),
updatedAt: m.updatedAt.getTime()
}));
});
// GET /v1/machines/:id - Get single machine by ID
typed.get('/v1/machines/:id', {
preHandler: app.authenticate,
schema: {
params: z.object({
id: z.string()
})
}
}, async (request, reply) => {
const userId = request.user.id;
const { id } = request.params;
const machine = await db.machine.findFirst({
where: {
accountId: userId,
id: id
}
});
if (!machine) {
return reply.code(404).send({ error: 'Machine not found' });
}
return {
machine: {
id: machine.id,
metadata: machine.metadata,
metadataVersion: machine.metadataVersion,
seq: machine.seq,
active: machine.active,
activeAt: machine.lastActiveAt.getTime(),
createdAt: machine.createdAt.getTime(),
updatedAt: machine.updatedAt.getTime()
}
};
});
// Combined logging endpoint (only when explicitly enabled) // Combined logging endpoint (only when explicitly enabled)
if (process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING) { if (process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING) {
typed.post('/logs-combined-from-cli-and-mobile-for-simple-ai-debugging', { typed.post('/logs-combined-from-cli-and-mobile-for-simple-ai-debugging', {
@ -1051,22 +1102,22 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
} }
}, async (request, reply) => { }, async (request, reply) => {
const { timestamp, level, message, source, platform } = request.body; const { timestamp, level, message, source, platform } = request.body;
// Log ONLY to separate remote logger (file only, no console) // Log ONLY to separate remote logger (file only, no console)
const logData = { const logData = {
source, source,
platform, platform,
timestamp timestamp
}; };
// Use the file-only logger if available // Use the file-only logger if available
const { fileConsolidatedLogger } = await import('@/utils/log'); const { fileConsolidatedLogger } = await import('@/utils/log');
if (!fileConsolidatedLogger) { if (!fileConsolidatedLogger) {
// Should never happen since we check env var above, but be safe // Should never happen since we check env var above, but be safe
return reply.send({ success: true }); return reply.send({ success: true });
} }
switch (level.toLowerCase()) { switch (level.toLowerCase()) {
case 'error': case 'error':
fileConsolidatedLogger.error(logData, message); fileConsolidatedLogger.error(logData, message);
@ -1081,7 +1132,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
default: default:
fileConsolidatedLogger.info(logData, message); fileConsolidatedLogger.info(logData, message);
} }
return reply.send({ success: true }); return reply.send({ success: true });
}); });
} }
@ -1146,7 +1197,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
socket.disconnect(); socket.disconnect();
return; return;
} }
// Validate machine-scoped clients have machineId // Validate machine-scoped clients have machineId
if (clientType === 'machine-scoped' && !machineId) { if (clientType === 'machine-scoped' && !machineId) {
log({ module: 'websocket' }, `Machine-scoped client missing machineId`); log({ module: 'websocket' }, `Machine-scoped client missing machineId`);
@ -1194,7 +1245,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
userIdToClientConnections.set(userId, new Set()); userIdToClientConnections.set(userId, new Set());
} }
userIdToClientConnections.get(userId)!.add(connection); userIdToClientConnections.get(userId)!.add(connection);
// Broadcast daemon online status // Broadcast daemon online status
if (connection.connectionType === 'machine-scoped') { if (connection.connectionType === 'machine-scoped') {
// Broadcast daemon online // Broadcast daemon online
@ -1248,7 +1299,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
} }
log({ module: 'websocket' }, `User disconnected: ${userId}`); log({ module: 'websocket' }, `User disconnected: ${userId}`);
// Broadcast daemon offline status // Broadcast daemon offline status
if (connection.connectionType === 'machine-scoped') { if (connection.connectionType === 'machine-scoped') {
emitUpdateToInterestedClients({ emitUpdateToInterestedClients({
@ -1285,7 +1336,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
} }
const { sid, thinking } = data; const { sid, thinking } = data;
// Resolve session // Resolve session
const session = await db.session.findUnique({ const session = await db.session.findUnique({
where: { id: sid, accountId: userId } where: { id: sid, accountId: userId }
@ -1336,49 +1387,45 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
return; return;
} }
const machineId = data.machineId; // Resolve machine
const machine = await db.machine.findUnique({
// Update machine lastActiveAt in database
const machine = await db.machine.update({
where: { where: {
accountId_id: { accountId_id: {
accountId: userId, accountId: userId,
id: machineId id: data.machineId
}
}
});
if (!machine) {
return;
}
// Update machine lastActiveAt in database
const updatedMachine = await db.machine.update({
where: {
accountId_id: {
accountId: userId,
id: data.machineId
} }
}, },
data: { data: {
lastActiveAt: new Date(t), lastActiveAt: new Date(t),
active: true active: true
} }
}).catch(() => { })
// Machine might not exist yet, that's ok
return null;
});
// If machine was updated, emit update to all user connections emitUpdateToInterestedClients({
if (machine) { event: 'ephemeral',
const updSeq = await allocateUserSeq(userId); userId,
emitUpdateToInterestedClients({ payload: {
event: 'update', type: 'machine-activity',
userId, id: updatedMachine.id,
payload: { active: true,
id: randomKeyNaked(12), lastActiveAt: t,
seq: updSeq, },
body: { recipientFilter: { type: 'user-scoped-only' }
t: 'update-machine', });
id: machine.id,
metadata: machine.metadata ? {
version: machine.metadataVersion,
value: machine.metadata
} : undefined,
active: true,
lastActiveAt: t
},
createdAt: Date.now()
},
recipientFilter: { type: 'all-user-authenticated-connections' }
});
}
} catch (error) { } catch (error) {
log({ module: 'websocket', level: 'error' }, `Error in machine-alive: ${error}`); log({ module: 'websocket', level: 'error' }, `Error in machine-alive: ${error}`);
} }
@ -1660,50 +1707,108 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
} }
}); });
// Update machine through socket // Machine metadata update with optimistic concurrency control
socket.on('update-machine', async (data: { metadata: string }) => { socket.on('machine-update-metadata', async (data: any, callback: (response: any) => void) => {
if (connection.connectionType !== 'machine-scoped') {
return; // Only machines can update their own metadata
}
const machineId = connection.machineId;
try { try {
const machine = await db.machine.update({ const { machineId, metadata, expectedVersion } = data;
// Validate input
if (!machineId || typeof metadata !== 'string' || typeof expectedVersion !== 'number') {
if (callback) {
callback({ result: 'error', message: 'Invalid parameters' });
}
return;
}
// Resolve machine
const machine = await db.machine.findFirst({
where: { where: {
accountId_id: { accountId: userId,
id: machineId
}
});
if (!machine) {
if (callback) {
callback({ result: 'error', message: 'Machine not found' });
}
return;
}
// Check version
if (machine.metadataVersion !== expectedVersion) {
callback({
result: 'version-mismatch',
version: machine.metadataVersion,
metadata: machine.metadata
});
return;
}
// Update metadata with atomic version check
const { count } = await db.machine.updateMany({
where: {
accountId: userId,
id: machineId,
metadataVersion: expectedVersion // Atomic CAS
},
data: {
metadata: metadata,
metadataVersion: expectedVersion + 1
// NOT updating active or lastActiveAt here
}
});
if (count === 0) {
// Re-fetch current version
const current = await db.machine.findFirst({
where: {
accountId: userId, accountId: userId,
id: machineId id: machineId
} }
}, });
data: { callback({
metadata: data.metadata, result: 'version-mismatch',
metadataVersion: { increment: 1 } version: current?.metadataVersion || 0,
} metadata: current?.metadata
}); });
return;
// Emit to other connections }
// Generate update
const updSeq = await allocateUserSeq(userId); const updSeq = await allocateUserSeq(userId);
const updContent: PrismaJson.UpdateBody = {
t: 'update-machine',
id: machineId,
metadata: {
value: metadata,
version: expectedVersion + 1
}
};
// Emit to all connections
emitUpdateToInterestedClients({ emitUpdateToInterestedClients({
event: 'update', event: 'update',
userId, userId,
payload: { payload: {
id: randomKeyNaked(), id: randomKeyNaked(12),
seq: updSeq, seq: updSeq,
body: { body: updContent,
t: 'update-machine',
id: machineId,
metadata: {
version: machine.metadataVersion,
value: data.metadata
}
},
createdAt: Date.now() createdAt: Date.now()
}, },
skipSenderConnection: connection recipientFilter: { type: 'all-user-authenticated-connections' }
});
// Send success response with new version
callback({
result: 'success',
version: expectedVersion + 1,
metadata: metadata
}); });
} catch (error) { } catch (error) {
log({ module: 'websocket', level: 'error' }, `Error updating machine metadata: ${error}`); log({ module: 'websocket', level: 'error' }, `Error in machine-update-metadata: ${error}`);
if (callback) {
callback({ result: 'error', message: 'Internal error' });
}
} }
}); });

View File

@ -60,6 +60,14 @@ declare global {
value: string | null; value: string | null;
version: number; version: number;
} | null | undefined; } | null | undefined;
} | {
t: 'update-machine';
id: string;
metadata?: {
value: string;
version: number;
};
activeAt?: number;
}; };
} }
} }