From e2854faaa8690e53ca060475b3112567c1b3cc94 Mon Sep 17 00:00:00 2001 From: Steve Korshakov Date: Mon, 14 Jul 2025 19:21:39 -0700 Subject: [PATCH] feat: add update state --- sources/app/api.ts | 86 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 85 insertions(+), 1 deletion(-) diff --git a/sources/app/api.ts b/sources/app/api.ts index 242fcac..906bb5a 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -672,10 +672,94 @@ export async function startApi() { pubsub.emit('update', userId, result.update); // Send success response with new version via callback - callback({ success: true, metadataVersion: result.newMetadataVersion, metadata: metadata }); + callback({ result: 'success', version: result.newMetadataVersion, metadata: metadata }); }); + socket.on('update-state', async (data: any, callback: (response: any) => void) => { + const { sid, agentState, expectedVersion } = data; + + // Validate input + if (!sid || (typeof agentState !== 'string' && agentState !== null) || typeof expectedVersion !== 'number') { + if (callback) { + callback({ result: 'error' }); + } + return; + } + + // Start transaction to ensure consistency + const result = await db.$transaction(async (tx) => { + // Verify session belongs to user and lock it + const session = await tx.session.findFirst({ + where: { + id: sid, + accountId: userId + } + }); + const user = await tx.account.findUnique({ + where: { id: userId } + }); + if (!user || !session) { + callback({ result: 'error' }); + return null; + } + + // Check version + if (session.agentStateVersion !== expectedVersion) { + callback({ result: 'version-mismatch', version: session.agentStateVersion, agentState: session.agentState }); + return null; + } + + // Get next sequence number + const updSeq = user.seq + 1; + const newAgentStateVersion = session.agentStateVersion + 1; + + // Update session agent state + await tx.session.update({ + where: { id: sid }, + data: { + agentState: agentState, + agentStateVersion: newAgentStateVersion + } + }); + + // Create update + const updContent: PrismaJson.UpdateBody = { + t: 'update-session', + id: sid, + agentState: { + value: agentState, + version: newAgentStateVersion + } + }; + + const update = await tx.update.create({ + data: { + accountId: userId, + seq: updSeq, + content: updContent + } + }); + + // Update user sequence + await tx.account.update({ + where: { id: userId }, + data: { seq: updSeq } + }); + + return { update, newAgentStateVersion }; + }); + if (!result) { + return; + } + + // Emit update to connected sockets + pubsub.emit('update', userId, result.update); + + // Send success response with new version via callback + callback({ result: 'success', version: result.newAgentStateVersion, agentState: agentState }); + }); + socket.emit('auth', { success: true, user: userId }); log({ module: 'websocket' }, `User connected: ${userId}`); });