From bed3f87cbadae211690e256357a423028d25bc8f Mon Sep 17 00:00:00 2001 From: Steve Korshakov Date: Thu, 17 Jul 2025 20:18:35 -0700 Subject: [PATCH] feat: add localId --- .../migration.sql | 11 ++++ prisma/schema.prisma | 2 + sources/app/api.ts | 50 ++++++++++++------- 3 files changed, 45 insertions(+), 18 deletions(-) create mode 100644 prisma/migrations/20250718031550_add_local_id_to_session_message/migration.sql diff --git a/prisma/migrations/20250718031550_add_local_id_to_session_message/migration.sql b/prisma/migrations/20250718031550_add_local_id_to_session_message/migration.sql new file mode 100644 index 0000000..b9f32cb --- /dev/null +++ b/prisma/migrations/20250718031550_add_local_id_to_session_message/migration.sql @@ -0,0 +1,11 @@ +/* + Warnings: + + - A unique constraint covering the columns `[sessionId,localId]` on the table `SessionMessage` will be added. If there are existing duplicate values, this will fail. + +*/ +-- AlterTable +ALTER TABLE "SessionMessage" ADD COLUMN "localId" TEXT; + +-- CreateIndex +CREATE UNIQUE INDEX "SessionMessage_sessionId_localId_key" ON "SessionMessage"("sessionId", "localId"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index eb8a574..07e8895 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -55,11 +55,13 @@ model SessionMessage { id String @id @default(cuid()) sessionId String session Session @relation(fields: [sessionId], references: [id]) + localId String? seq Int /// [SessionMessageContent] content Json createdAt DateTime @default(now()) updatedAt DateTime @updatedAt + @@unique([sessionId, localId]) } model Update { diff --git a/sources/app/api.ts b/sources/app/api.ts index fd89471..604418d 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -94,7 +94,7 @@ export async function startApi() { }); // Send session update to all relevant connections - let emitUpdateToInterestedClients = ({event, userId, sessionId, payload, skipSenderConnection}: { + let emitUpdateToInterestedClients = ({ event, userId, sessionId, payload, skipSenderConnection }: { event: string, userId: string, sessionId: string, @@ -120,7 +120,7 @@ export async function startApi() { } // Send to all session-scoped connections, only that match sessionId - if (connection.connectionType === 'session-scoped' + if (connection.connectionType === 'session-scoped' && connection.sessionId === sessionId ) { log({ module: 'websocket' }, `Sending ${event} to session-scoped connection ${connection.socket.id}`); @@ -595,7 +595,7 @@ export async function startApi() { }); socket.on('message', async (data: any) => { - const { sid, message } = data; + const { sid, message, localId } = data; log({ module: 'websocket' }, `Received message from socket ${socket.id}: ${sid} ${message.length} bytes`); @@ -606,6 +606,7 @@ export async function startApi() { if (!session) { return; } + let useLocalId = typeof localId === 'string' ? localId : null; // Create encrypted message const msgContent: PrismaJson.SessionMessageContent = { @@ -641,12 +642,20 @@ export async function startApi() { const msgSeq = session.seq + 1; const updSeq = user.seq + 1; + if (useLocalId) { + const existing = await tx.sessionMessage.findFirst({ + where: { sessionId: sid, localId: useLocalId } + }); + return { msg: existing, update: null }; + } + // Create message const msg = await tx.sessionMessage.create({ data: { sessionId: sid, seq: msgSeq, - content: msgContent + content: msgContent, + localId: useLocalId } }); @@ -690,21 +699,26 @@ export async function startApi() { throw error; }); - if (!result) return; - + // If no update, we're done + if (!result) { + return; + } + // Emit update to relevant clients - emitUpdateToInterestedClients({ - event: 'update', - userId, - sessionId: sid, - payload: { - id: result.update.id, - seq: result.update.seq, - body: result.update.content, - createdAt: result.update.createdAt.getTime() - }, - skipSenderConnection: connection - }); + if (result.update) { + emitUpdateToInterestedClients({ + event: 'update', + userId, + sessionId: sid, + payload: { + id: result.update.id, + seq: result.update.seq, + body: result.update.content, + createdAt: result.update.createdAt.getTime() + }, + skipSenderConnection: connection + }); + } }); socket.on('update-metadata', async (data: any, callback: (response: any) => void) => {