fix: trying to fix message ordering

This commit is contained in:
Steve Korshakov 2025-07-28 23:53:28 -07:00
parent 506b7a41ba
commit 1065d22de7

View File

@ -10,6 +10,7 @@ import { Account } from "@prisma/client";
import { onShutdown } from "@/utils/shutdown";
import { allocateSessionSeq, allocateUserSeq } from "@/services/seq";
import { randomKeyNaked } from "@/utils/randomKeyNaked";
import { AsyncLock } from "@/utils/lock";
// Connection metadata types
interface SessionScopedConnection {
@ -896,6 +897,10 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}
userIdToClientConnections.get(userId)!.add(connection);
// Lock
const receiveMessageLock = new AsyncLock();
const receiveUsageLock = new AsyncLock();
socket.on('disconnect', () => {
// Cleanup
const connections = userIdToClientConnections.get(userId);
@ -1024,80 +1029,82 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
});
socket.on('message', async (data: any) => {
try {
const { sid, message, localId } = data;
await receiveMessageLock.inLock(async () => {
try {
const { sid, message, localId } = data;
log({ module: 'websocket' }, `Received message from socket ${socket.id}: sessionId=${sid}, messageLength=${message.length} bytes, connectionType=${connection.connectionType}, connectionSessionId=${connection.connectionType === 'session-scoped' ? connection.sessionId : 'N/A'}`);
log({ module: 'websocket' }, `Received message from socket ${socket.id}: sessionId=${sid}, messageLength=${message.length} bytes, connectionType=${connection.connectionType}, connectionSessionId=${connection.connectionType === 'session-scoped' ? connection.sessionId : 'N/A'}`);
// Resolve session
const session = await db.session.findUnique({
where: { id: sid, accountId: userId }
});
if (!session) {
return;
}
let useLocalId = typeof localId === 'string' ? localId : null;
// Create encrypted message
const msgContent: PrismaJson.SessionMessageContent = {
t: 'encrypted',
c: message
};
// Resolve seq
const updSeq = await allocateUserSeq(userId);
const msgSeq = await allocateSessionSeq(sid);
// Check if message already exists
if (useLocalId) {
const existing = await db.sessionMessage.findFirst({
where: { sessionId: sid, localId: useLocalId }
// Resolve session
const session = await db.session.findUnique({
where: { id: sid, accountId: userId }
});
if (existing) {
return { msg: existing, update: null };
if (!session) {
return;
}
}
let useLocalId = typeof localId === 'string' ? localId : null;
// Create message
const msg = await db.sessionMessage.create({
data: {
// Create encrypted message
const msgContent: PrismaJson.SessionMessageContent = {
t: 'encrypted',
c: message
};
// Resolve seq
const updSeq = await allocateUserSeq(userId);
const msgSeq = await allocateSessionSeq(sid);
// Check if message already exists
if (useLocalId) {
const existing = await db.sessionMessage.findFirst({
where: { sessionId: sid, localId: useLocalId }
});
if (existing) {
return { msg: existing, update: null };
}
}
// Create message
const msg = await db.sessionMessage.create({
data: {
sessionId: sid,
seq: msgSeq,
content: msgContent,
localId: useLocalId
}
});
// Create update
const update: PrismaJson.UpdateBody = {
t: 'new-message',
sid: sid,
message: {
id: msg.id,
seq: msg.seq,
content: msgContent,
localId: useLocalId,
createdAt: msg.createdAt.getTime(),
updatedAt: msg.updatedAt.getTime()
}
};
// Emit update to relevant clients
emitUpdateToInterestedClients({
event: 'update',
userId,
sessionId: sid,
seq: msgSeq,
content: msgContent,
localId: useLocalId
}
});
// Create update
const update: PrismaJson.UpdateBody = {
t: 'new-message',
sid: sid,
message: {
id: msg.id,
seq: msg.seq,
content: msgContent,
localId: useLocalId,
createdAt: msg.createdAt.getTime(),
updatedAt: msg.updatedAt.getTime()
}
};
// Emit update to relevant clients
emitUpdateToInterestedClients({
event: 'update',
userId,
sessionId: sid,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: update,
createdAt: Date.now()
},
skipSenderConnection: connection
});
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Error in message handler: ${error}`);
}
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: update,
createdAt: Date.now()
},
skipSenderConnection: connection
});
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Error in message handler: ${error}`);
}
});
});
socket.on('update-metadata', async (data: any, callback: (response: any) => void) => {
@ -1423,124 +1430,126 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
// Usage reporting
socket.on('usage-report', async (data: any, callback?: (response: any) => void) => {
try {
const { key, sessionId, tokens, cost } = data;
// Validate required fields
if (!key || typeof key !== 'string') {
if (callback) {
callback({ success: false, error: 'Invalid key' });
}
return;
}
// Validate tokens and cost objects
if (!tokens || typeof tokens !== 'object' || typeof tokens.total !== 'number') {
if (callback) {
callback({ success: false, error: 'Invalid tokens object - must include total' });
}
return;
}
if (!cost || typeof cost !== 'object' || typeof cost.total !== 'number') {
if (callback) {
callback({ success: false, error: 'Invalid cost object - must include total' });
}
return;
}
// Validate sessionId if provided
if (sessionId && typeof sessionId !== 'string') {
if (callback) {
callback({ success: false, error: 'Invalid sessionId' });
}
return;
}
await receiveUsageLock.inLock(async () => {
try {
// If sessionId provided, verify it belongs to the user
if (sessionId) {
const session = await db.session.findFirst({
where: {
id: sessionId,
accountId: userId
}
});
const { key, sessionId, tokens, cost } = data;
if (!session) {
if (callback) {
callback({ success: false, error: 'Session not found' });
}
return;
// Validate required fields
if (!key || typeof key !== 'string') {
if (callback) {
callback({ success: false, error: 'Invalid key' });
}
return;
}
// Prepare usage data
const usageData: PrismaJson.UsageReportData = {
tokens,
cost
};
// Validate tokens and cost objects
if (!tokens || typeof tokens !== 'object' || typeof tokens.total !== 'number') {
if (callback) {
callback({ success: false, error: 'Invalid tokens object - must include total' });
}
return;
}
// Upsert the usage report
const report = await db.usageReport.upsert({
where: {
accountId_sessionId_key: {
if (!cost || typeof cost !== 'object' || typeof cost.total !== 'number') {
if (callback) {
callback({ success: false, error: 'Invalid cost object - must include total' });
}
return;
}
// Validate sessionId if provided
if (sessionId && typeof sessionId !== 'string') {
if (callback) {
callback({ success: false, error: 'Invalid sessionId' });
}
return;
}
try {
// If sessionId provided, verify it belongs to the user
if (sessionId) {
const session = await db.session.findFirst({
where: {
id: sessionId,
accountId: userId
}
});
if (!session) {
if (callback) {
callback({ success: false, error: 'Session not found' });
}
return;
}
}
// Prepare usage data
const usageData: PrismaJson.UsageReportData = {
tokens,
cost
};
// Upsert the usage report
const report = await db.usageReport.upsert({
where: {
accountId_sessionId_key: {
accountId: userId,
sessionId: sessionId || null,
key
}
},
update: {
data: usageData,
updatedAt: new Date()
},
create: {
accountId: userId,
sessionId: sessionId || null,
key
}
},
update: {
data: usageData,
updatedAt: new Date()
},
create: {
accountId: userId,
sessionId: sessionId || null,
key,
data: usageData
}
});
log({ module: 'websocket' }, `Usage report saved: key=${key}, sessionId=${sessionId || 'none'}, userId=${userId}`);
// Emit ephemeral update if sessionId is provided
if (sessionId) {
emitUpdateToInterestedClients({
event: 'ephemeral',
userId,
sessionId,
payload: {
type: 'usage',
id: sessionId,
key,
tokens: usageData.tokens,
cost: usageData.cost,
timestamp: Date.now()
data: usageData
}
});
}
if (callback) {
callback({
success: true,
reportId: report.id,
createdAt: report.createdAt.getTime(),
updatedAt: report.updatedAt.getTime()
});
log({ module: 'websocket' }, `Usage report saved: key=${key}, sessionId=${sessionId || 'none'}, userId=${userId}`);
// Emit ephemeral update if sessionId is provided
if (sessionId) {
emitUpdateToInterestedClients({
event: 'ephemeral',
userId,
sessionId,
payload: {
type: 'usage',
id: sessionId,
key,
tokens: usageData.tokens,
cost: usageData.cost,
timestamp: Date.now()
}
});
}
if (callback) {
callback({
success: true,
reportId: report.id,
createdAt: report.createdAt.getTime(),
updatedAt: report.updatedAt.getTime()
});
}
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Failed to save usage report: ${error}`);
if (callback) {
callback({ success: false, error: 'Failed to save usage report' });
}
}
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Failed to save usage report: ${error}`);
log({ module: 'websocket', level: 'error' }, `Error in usage-report handler: ${error}`);
if (callback) {
callback({ success: false, error: 'Failed to save usage report' });
callback({ success: false, error: 'Internal error' });
}
}
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Error in usage-report handler: ${error}`);
if (callback) {
callback({ success: false, error: 'Internal error' });
}
}
});
});
socket.emit('auth', { success: true, user: userId });