diff --git a/sources/app/api.ts b/sources/app/api.ts index 941b8e5..c19f28e 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -8,6 +8,9 @@ import * as tweetnacl from "tweetnacl"; import { db } from "@/storage/db"; import { Account, Update } from "@prisma/client"; import { onShutdown } from "@/utils/shutdown"; +import { allocateSessionSeq, allocateUserSeq } from "@/services/seq"; +import { randomKey } from "@/utils/randomKey"; +import { randomKeyNaked } from "@/utils/randomKeyNaked"; // Connection metadata types interface SessionScopedConnection { @@ -916,114 +919,57 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> c: message }; - // Start transaction to ensure consistency - const result = await db.$transaction(async (tx) => { + // Resolve seq + const updSeq = await allocateUserSeq(userId); + const msgSeq = await allocateSessionSeq(sid); - // Get user for update (lock account first to prevent deadlocks) - const user = await tx.account.findUnique({ - where: { id: userId } + // Check if message already exists + if (useLocalId) { + const existing = await db.sessionMessage.findFirst({ + where: { sessionId: sid, localId: useLocalId } }); - - if (!user) { - throw new Error('User not found'); + if (existing) { + return { msg: existing, update: null }; } + } - // Verify session belongs to user and lock it - const session = await tx.session.findFirst({ - where: { - id: sid, - accountId: userId - } - }); - - if (!session) { - throw new Error('Session not found'); + // Create message + const msg = await db.sessionMessage.create({ + data: { + sessionId: sid, + seq: msgSeq, + content: msgContent, + localId: useLocalId } - - // Get next sequence numbers - const msgSeq = session.seq + 1; - const updSeq = user.seq + 1; - - if (useLocalId) { - const existing = await tx.sessionMessage.findFirst({ - where: { sessionId: sid, localId: useLocalId } - }); - if (existing) { - return { msg: existing, update: null }; - } - } - - // Create message - const msg = await tx.sessionMessage.create({ - data: { - sessionId: sid, - seq: msgSeq, - content: msgContent, - localId: useLocalId - } - }); - - // Create update - const updContent: PrismaJson.UpdateBody = { - t: 'new-message', - sid: sid, - message: { - id: msg.id, - seq: msg.seq, - content: msgContent, - localId: useLocalId, - createdAt: msg.createdAt.getTime(), - updatedAt: msg.updatedAt.getTime() - } - }; - - const update = await tx.update.create({ - data: { - accountId: userId, - seq: updSeq, - content: updContent - } - }); - - // Update sequences - await tx.session.update({ - where: { id: sid }, - data: { seq: msgSeq } - }); - - await tx.account.update({ - where: { id: userId }, - data: { seq: updSeq } - }); - - return { msg, update }; - }).catch((error) => { - if (error.message === 'Session not found') { - return null; - } - throw error; }); - // If no update, we're done - if (!result) { - return; - } + // Create update + const update: PrismaJson.UpdateBody = { + t: 'new-message', + sid: sid, + message: { + id: msg.id, + seq: msg.seq, + content: msgContent, + localId: useLocalId, + createdAt: msg.createdAt.getTime(), + updatedAt: msg.updatedAt.getTime() + } + }; // Emit update to relevant clients - if (result.update) { - emitUpdateToInterestedClients({ - event: 'update', - userId, - sessionId: sid, - payload: { - id: result.update.id, - seq: result.update.seq, - body: result.update.content, - createdAt: result.update.createdAt.getTime() - }, - skipSenderConnection: connection - }); - } + emitUpdateToInterestedClients({ + event: 'update', + userId, + sessionId: sid, + payload: { + id: randomKeyNaked(12), + seq: updSeq, + body: update, + createdAt: Date.now() + }, + skipSenderConnection: connection + }); }); socket.on('update-metadata', async (data: any, callback: (response: any) => void) => { @@ -1037,88 +983,57 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> return; } - // Start transaction to ensure consistency - const result = await db.$transaction(async (tx) => { - // Get user for update (lock account first to prevent deadlocks) - const user = await tx.account.findUnique({ - where: { id: userId } - }); - if (!user) { - callback({ result: 'error' }); - return null; - } - - // Verify session belongs to user and lock it - const session = await tx.session.findFirst({ - where: { - id: sid, - accountId: userId - } - }); - if (!session) { - callback({ result: 'error' }); - return null; - } - - // Check version - if (session.metadataVersion !== expectedVersion) { - callback({ result: 'version-mismatch', version: session.metadataVersion, metadata: session.metadata }); - return null; - } - - // Get next sequence number - const updSeq = user.seq + 1; - const newMetadataVersion = session.metadataVersion + 1; - - // Update session metadata - await tx.session.update({ - where: { id: sid }, - data: { - metadata: metadata, - metadataVersion: newMetadataVersion - } - }); - - // Create update - const updContent: PrismaJson.UpdateBody = { - t: 'update-session', - id: sid, - metadata: { - value: metadata, - version: newMetadataVersion - } - }; - - const update = await tx.update.create({ - data: { - accountId: userId, - seq: updSeq, - content: updContent - } - }); - - // Update user sequence - await tx.account.update({ - where: { id: userId }, - data: { seq: updSeq } - }); - - return { update, newMetadataVersion }; + // Resolve session + const session = await db.session.findUnique({ + where: { id: sid, accountId: userId } }); - if (!result) { + if (!session) { return; } - // Emit update to connected sockets + // Check version + if (session.metadataVersion !== expectedVersion) { + callback({ result: 'version-mismatch', version: session.metadataVersion, metadata: session.metadata }); + return null; + } + + // Update metadata + const { count } = await db.session.updateMany({ + where: { id: sid, metadataVersion: expectedVersion }, + data: { + metadata: metadata, + metadataVersion: expectedVersion + 1 + } + }); + if (count === 0) { + callback({ result: 'version-mismatch', version: session.metadataVersion, metadata: session.metadata }); + return null; + } + + // Generate update + const updSeq = await allocateUserSeq(userId); + const updContent: PrismaJson.UpdateBody = { + t: 'update-session', + id: sid, + metadata: { + value: metadata, + version: expectedVersion + 1 + } + }; emitUpdateToInterestedClients({ event: 'update', userId, sessionId: sid, - payload: result.update + payload: { + id: randomKeyNaked(12), + seq: updSeq, + body: updContent, + createdAt: Date.now() + } }); // Send success response with new version via callback - callback({ result: 'success', version: result.newMetadataVersion, metadata: metadata }); + callback({ result: 'success', version: expectedVersion + 1, metadata: metadata }); }); @@ -1133,93 +1048,63 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> return; } - // Start transaction to ensure consistency - const result = await db.$transaction(async (tx) => { - // Get user for update (lock account first to prevent deadlocks) - const user = await tx.account.findUnique({ - where: { id: userId } - }); - if (!user) { - callback({ result: 'error' }); - return null; - } - - // Verify session belongs to user and lock it - const session = await tx.session.findFirst({ - where: { - id: sid, - accountId: userId - } - }); - if (!session) { - callback({ result: 'error' }); - return null; - } - - // Check version - if (session.agentStateVersion !== expectedVersion) { - callback({ result: 'version-mismatch', version: session.agentStateVersion, agentState: session.agentState }); - return null; - } - - // Get next sequence number - const updSeq = user.seq + 1; - const newAgentStateVersion = session.agentStateVersion + 1; - - // Update session agent state - await tx.session.update({ - where: { id: sid }, - data: { - agentState: agentState, - agentStateVersion: newAgentStateVersion - } - }); - - // Create update - const updContent: PrismaJson.UpdateBody = { - t: 'update-session', + // Resolve session + const session = await db.session.findUnique({ + where: { id: sid, - agentState: { - value: agentState, - version: newAgentStateVersion - } - }; - - const update = await tx.update.create({ - data: { - accountId: userId, - seq: updSeq, - content: updContent - } - }); - - // Update user sequence - await tx.account.update({ - where: { id: userId }, - data: { seq: updSeq } - }); - - return { update, newAgentStateVersion }; + accountId: userId + } }); - if (!result) { - return; + if (!session) { + callback({ result: 'error' }); + return null; } + // Check version + if (session.agentStateVersion !== expectedVersion) { + callback({ result: 'version-mismatch', version: session.agentStateVersion, agentState: session.agentState }); + return null; + } + + // Update agent state + const { count } = await db.session.updateMany({ + where: { id: sid, agentStateVersion: expectedVersion }, + data: { + agentState: agentState, + agentStateVersion: expectedVersion + 1 + } + }); + if (count === 0) { + callback({ result: 'version-mismatch', version: session.agentStateVersion, agentState: session.agentState }); + return null; + } + + // Generate update + const updSeq = await allocateUserSeq(userId); + const updContent: PrismaJson.UpdateBody = { + t: 'update-session', + id: sid, + agentState: { + value: agentState, + version: expectedVersion + 1 + } + }; + // Emit update to connected sockets emitUpdateToInterestedClients({ event: 'update', userId, sessionId: sid, payload: { - id: result.update.id, - seq: result.update.seq, - body: result.update.content, - createdAt: result.update.createdAt.getTime() + id: randomKeyNaked(12), + seq: updSeq, + body: updContent, + createdAt: Date.now() } }); // Send success response with new version via callback - callback({ result: 'success', version: result.newAgentStateVersion, agentState: agentState }); + callback({ result: 'success', version: expectedVersion + 1, agentState: agentState }); }); // RPC register - Register this socket as a listener for an RPC method diff --git a/sources/app/timeout.ts b/sources/app/timeout.ts index d0f36f1..7128673 100644 --- a/sources/app/timeout.ts +++ b/sources/app/timeout.ts @@ -1,11 +1,12 @@ import { pubsub } from "@/services/pubsub"; import { db } from "@/storage/db"; -import { backoff, delay } from "@/utils/delay"; +import { delay } from "@/utils/delay"; +import { forever } from "@/utils/forever"; +import { shutdownSignal } from "@/utils/shutdown"; export function startTimeout() { - backoff(async () => { + forever('session-timeout', async () => { while (true) { - // Find timed out sessions const sessions = await db.session.findMany({ where: { @@ -30,7 +31,7 @@ export function startTimeout() { } // Wait for 1 minute - await delay(1000 * 60); + await delay(1000 * 60, shutdownSignal); } }); } \ No newline at end of file diff --git a/sources/services/seq.ts b/sources/services/seq.ts new file mode 100644 index 0000000..fc4db20 --- /dev/null +++ b/sources/services/seq.ts @@ -0,0 +1,21 @@ +import { db } from "@/storage/db"; + +export async function allocateUserSeq(accountId: string) { + const user = await db.account.update({ + where: { id: accountId }, + select: { seq: true }, + data: { seq: { increment: 1 } } + }); + const seq = user.seq; + return seq; +} + +export async function allocateSessionSeq(sessionId: string) { + const session = await db.session.update({ + where: { id: sessionId }, + select: { seq: true }, + data: { seq: { increment: 1 } } + }); + const seq = session.seq; + return seq; +} \ No newline at end of file