feat: add localId

This commit is contained in:
Steve Korshakov 2025-07-17 20:18:35 -07:00
parent 0b3017ef1b
commit bed3f87cba
3 changed files with 45 additions and 18 deletions

View File

@ -0,0 +1,11 @@
/*
Warnings:
- A unique constraint covering the columns `[sessionId,localId]` on the table `SessionMessage` will be added. If there are existing duplicate values, this will fail.
*/
-- AlterTable
ALTER TABLE "SessionMessage" ADD COLUMN "localId" TEXT;
-- CreateIndex
CREATE UNIQUE INDEX "SessionMessage_sessionId_localId_key" ON "SessionMessage"("sessionId", "localId");

View File

@ -55,11 +55,13 @@ model SessionMessage {
id String @id @default(cuid())
sessionId String
session Session @relation(fields: [sessionId], references: [id])
localId String?
seq Int
/// [SessionMessageContent]
content Json
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([sessionId, localId])
}
model Update {

View File

@ -94,7 +94,7 @@ export async function startApi() {
});
// Send session update to all relevant connections
let emitUpdateToInterestedClients = ({event, userId, sessionId, payload, skipSenderConnection}: {
let emitUpdateToInterestedClients = ({ event, userId, sessionId, payload, skipSenderConnection }: {
event: string,
userId: string,
sessionId: string,
@ -120,7 +120,7 @@ export async function startApi() {
}
// Send to all session-scoped connections, only that match sessionId
if (connection.connectionType === 'session-scoped'
if (connection.connectionType === 'session-scoped'
&& connection.sessionId === sessionId
) {
log({ module: 'websocket' }, `Sending ${event} to session-scoped connection ${connection.socket.id}`);
@ -595,7 +595,7 @@ export async function startApi() {
});
socket.on('message', async (data: any) => {
const { sid, message } = data;
const { sid, message, localId } = data;
log({ module: 'websocket' }, `Received message from socket ${socket.id}: ${sid} ${message.length} bytes`);
@ -606,6 +606,7 @@ export async function startApi() {
if (!session) {
return;
}
let useLocalId = typeof localId === 'string' ? localId : null;
// Create encrypted message
const msgContent: PrismaJson.SessionMessageContent = {
@ -641,12 +642,20 @@ export async function startApi() {
const msgSeq = session.seq + 1;
const updSeq = user.seq + 1;
if (useLocalId) {
const existing = await tx.sessionMessage.findFirst({
where: { sessionId: sid, localId: useLocalId }
});
return { msg: existing, update: null };
}
// Create message
const msg = await tx.sessionMessage.create({
data: {
sessionId: sid,
seq: msgSeq,
content: msgContent
content: msgContent,
localId: useLocalId
}
});
@ -690,21 +699,26 @@ export async function startApi() {
throw error;
});
if (!result) return;
// If no update, we're done
if (!result) {
return;
}
// Emit update to relevant clients
emitUpdateToInterestedClients({
event: 'update',
userId,
sessionId: sid,
payload: {
id: result.update.id,
seq: result.update.seq,
body: result.update.content,
createdAt: result.update.createdAt.getTime()
},
skipSenderConnection: connection
});
if (result.update) {
emitUpdateToInterestedClients({
event: 'update',
userId,
sessionId: sid,
payload: {
id: result.update.id,
seq: result.update.seq,
body: result.update.content,
createdAt: result.update.createdAt.getTime()
},
skipSenderConnection: connection
});
}
});
socket.on('update-metadata', async (data: any, callback: (response: any) => void) => {