ref: remove transactions

This commit is contained in:
Steve Korshakov 2025-07-26 02:00:25 -07:00
parent ae95f70372
commit 6c0428c9d3
3 changed files with 154 additions and 247 deletions

View File

@ -8,6 +8,9 @@ import * as tweetnacl from "tweetnacl";
import { db } from "@/storage/db";
import { Account, Update } from "@prisma/client";
import { onShutdown } from "@/utils/shutdown";
import { allocateSessionSeq, allocateUserSeq } from "@/services/seq";
import { randomKey } from "@/utils/randomKey";
import { randomKeyNaked } from "@/utils/randomKeyNaked";
// Connection metadata types
interface SessionScopedConnection {
@ -916,114 +919,57 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
c: message
};
// Start transaction to ensure consistency
const result = await db.$transaction(async (tx) => {
// Resolve seq
const updSeq = await allocateUserSeq(userId);
const msgSeq = await allocateSessionSeq(sid);
// Get user for update (lock account first to prevent deadlocks)
const user = await tx.account.findUnique({
where: { id: userId }
// Check if message already exists
if (useLocalId) {
const existing = await db.sessionMessage.findFirst({
where: { sessionId: sid, localId: useLocalId }
});
if (!user) {
throw new Error('User not found');
if (existing) {
return { msg: existing, update: null };
}
}
// Verify session belongs to user and lock it
const session = await tx.session.findFirst({
where: {
id: sid,
accountId: userId
}
});
if (!session) {
throw new Error('Session not found');
// Create message
const msg = await db.sessionMessage.create({
data: {
sessionId: sid,
seq: msgSeq,
content: msgContent,
localId: useLocalId
}
// Get next sequence numbers
const msgSeq = session.seq + 1;
const updSeq = user.seq + 1;
if (useLocalId) {
const existing = await tx.sessionMessage.findFirst({
where: { sessionId: sid, localId: useLocalId }
});
if (existing) {
return { msg: existing, update: null };
}
}
// Create message
const msg = await tx.sessionMessage.create({
data: {
sessionId: sid,
seq: msgSeq,
content: msgContent,
localId: useLocalId
}
});
// Create update
const updContent: PrismaJson.UpdateBody = {
t: 'new-message',
sid: sid,
message: {
id: msg.id,
seq: msg.seq,
content: msgContent,
localId: useLocalId,
createdAt: msg.createdAt.getTime(),
updatedAt: msg.updatedAt.getTime()
}
};
const update = await tx.update.create({
data: {
accountId: userId,
seq: updSeq,
content: updContent
}
});
// Update sequences
await tx.session.update({
where: { id: sid },
data: { seq: msgSeq }
});
await tx.account.update({
where: { id: userId },
data: { seq: updSeq }
});
return { msg, update };
}).catch((error) => {
if (error.message === 'Session not found') {
return null;
}
throw error;
});
// If no update, we're done
if (!result) {
return;
}
// Create update
const update: PrismaJson.UpdateBody = {
t: 'new-message',
sid: sid,
message: {
id: msg.id,
seq: msg.seq,
content: msgContent,
localId: useLocalId,
createdAt: msg.createdAt.getTime(),
updatedAt: msg.updatedAt.getTime()
}
};
// Emit update to relevant clients
if (result.update) {
emitUpdateToInterestedClients({
event: 'update',
userId,
sessionId: sid,
payload: {
id: result.update.id,
seq: result.update.seq,
body: result.update.content,
createdAt: result.update.createdAt.getTime()
},
skipSenderConnection: connection
});
}
emitUpdateToInterestedClients({
event: 'update',
userId,
sessionId: sid,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: update,
createdAt: Date.now()
},
skipSenderConnection: connection
});
});
socket.on('update-metadata', async (data: any, callback: (response: any) => void) => {
@ -1037,88 +983,57 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
return;
}
// Start transaction to ensure consistency
const result = await db.$transaction(async (tx) => {
// Get user for update (lock account first to prevent deadlocks)
const user = await tx.account.findUnique({
where: { id: userId }
});
if (!user) {
callback({ result: 'error' });
return null;
}
// Verify session belongs to user and lock it
const session = await tx.session.findFirst({
where: {
id: sid,
accountId: userId
}
});
if (!session) {
callback({ result: 'error' });
return null;
}
// Check version
if (session.metadataVersion !== expectedVersion) {
callback({ result: 'version-mismatch', version: session.metadataVersion, metadata: session.metadata });
return null;
}
// Get next sequence number
const updSeq = user.seq + 1;
const newMetadataVersion = session.metadataVersion + 1;
// Update session metadata
await tx.session.update({
where: { id: sid },
data: {
metadata: metadata,
metadataVersion: newMetadataVersion
}
});
// Create update
const updContent: PrismaJson.UpdateBody = {
t: 'update-session',
id: sid,
metadata: {
value: metadata,
version: newMetadataVersion
}
};
const update = await tx.update.create({
data: {
accountId: userId,
seq: updSeq,
content: updContent
}
});
// Update user sequence
await tx.account.update({
where: { id: userId },
data: { seq: updSeq }
});
return { update, newMetadataVersion };
// Resolve session
const session = await db.session.findUnique({
where: { id: sid, accountId: userId }
});
if (!result) {
if (!session) {
return;
}
// Emit update to connected sockets
// Check version
if (session.metadataVersion !== expectedVersion) {
callback({ result: 'version-mismatch', version: session.metadataVersion, metadata: session.metadata });
return null;
}
// Update metadata
const { count } = await db.session.updateMany({
where: { id: sid, metadataVersion: expectedVersion },
data: {
metadata: metadata,
metadataVersion: expectedVersion + 1
}
});
if (count === 0) {
callback({ result: 'version-mismatch', version: session.metadataVersion, metadata: session.metadata });
return null;
}
// Generate update
const updSeq = await allocateUserSeq(userId);
const updContent: PrismaJson.UpdateBody = {
t: 'update-session',
id: sid,
metadata: {
value: metadata,
version: expectedVersion + 1
}
};
emitUpdateToInterestedClients({
event: 'update',
userId,
sessionId: sid,
payload: result.update
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
}
});
// Send success response with new version via callback
callback({ result: 'success', version: result.newMetadataVersion, metadata: metadata });
callback({ result: 'success', version: expectedVersion + 1, metadata: metadata });
});
@ -1133,93 +1048,63 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
return;
}
// Start transaction to ensure consistency
const result = await db.$transaction(async (tx) => {
// Get user for update (lock account first to prevent deadlocks)
const user = await tx.account.findUnique({
where: { id: userId }
});
if (!user) {
callback({ result: 'error' });
return null;
}
// Verify session belongs to user and lock it
const session = await tx.session.findFirst({
where: {
id: sid,
accountId: userId
}
});
if (!session) {
callback({ result: 'error' });
return null;
}
// Check version
if (session.agentStateVersion !== expectedVersion) {
callback({ result: 'version-mismatch', version: session.agentStateVersion, agentState: session.agentState });
return null;
}
// Get next sequence number
const updSeq = user.seq + 1;
const newAgentStateVersion = session.agentStateVersion + 1;
// Update session agent state
await tx.session.update({
where: { id: sid },
data: {
agentState: agentState,
agentStateVersion: newAgentStateVersion
}
});
// Create update
const updContent: PrismaJson.UpdateBody = {
t: 'update-session',
// Resolve session
const session = await db.session.findUnique({
where: {
id: sid,
agentState: {
value: agentState,
version: newAgentStateVersion
}
};
const update = await tx.update.create({
data: {
accountId: userId,
seq: updSeq,
content: updContent
}
});
// Update user sequence
await tx.account.update({
where: { id: userId },
data: { seq: updSeq }
});
return { update, newAgentStateVersion };
accountId: userId
}
});
if (!result) {
return;
if (!session) {
callback({ result: 'error' });
return null;
}
// Check version
if (session.agentStateVersion !== expectedVersion) {
callback({ result: 'version-mismatch', version: session.agentStateVersion, agentState: session.agentState });
return null;
}
// Update agent state
const { count } = await db.session.updateMany({
where: { id: sid, agentStateVersion: expectedVersion },
data: {
agentState: agentState,
agentStateVersion: expectedVersion + 1
}
});
if (count === 0) {
callback({ result: 'version-mismatch', version: session.agentStateVersion, agentState: session.agentState });
return null;
}
// Generate update
const updSeq = await allocateUserSeq(userId);
const updContent: PrismaJson.UpdateBody = {
t: 'update-session',
id: sid,
agentState: {
value: agentState,
version: expectedVersion + 1
}
};
// Emit update to connected sockets
emitUpdateToInterestedClients({
event: 'update',
userId,
sessionId: sid,
payload: {
id: result.update.id,
seq: result.update.seq,
body: result.update.content,
createdAt: result.update.createdAt.getTime()
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
}
});
// Send success response with new version via callback
callback({ result: 'success', version: result.newAgentStateVersion, agentState: agentState });
callback({ result: 'success', version: expectedVersion + 1, agentState: agentState });
});
// RPC register - Register this socket as a listener for an RPC method

View File

@ -1,11 +1,12 @@
import { pubsub } from "@/services/pubsub";
import { db } from "@/storage/db";
import { backoff, delay } from "@/utils/delay";
import { delay } from "@/utils/delay";
import { forever } from "@/utils/forever";
import { shutdownSignal } from "@/utils/shutdown";
export function startTimeout() {
backoff(async () => {
forever('session-timeout', async () => {
while (true) {
// Find timed out sessions
const sessions = await db.session.findMany({
where: {
@ -30,7 +31,7 @@ export function startTimeout() {
}
// Wait for 1 minute
await delay(1000 * 60);
await delay(1000 * 60, shutdownSignal);
}
});
}

21
sources/services/seq.ts Normal file
View File

@ -0,0 +1,21 @@
import { db } from "@/storage/db";
export async function allocateUserSeq(accountId: string) {
const user = await db.account.update({
where: { id: accountId },
select: { seq: true },
data: { seq: { increment: 1 } }
});
const seq = user.seq;
return seq;
}
export async function allocateSessionSeq(sessionId: string) {
const session = await db.session.update({
where: { id: sessionId },
select: { seq: true },
data: { seq: { increment: 1 } }
});
const seq = session.seq;
return seq;
}