From b0735b28a30746b5da3e320af8f65ef700d04793 Mon Sep 17 00:00:00 2001 From: Steve Korshakov Date: Sun, 13 Jul 2025 13:04:49 -0700 Subject: [PATCH] ref: move message sending to socket --- sources/app/api.ts | 205 +++++++++++++++++++++------------------------ 1 file changed, 96 insertions(+), 109 deletions(-) diff --git a/sources/app/api.ts b/sources/app/api.ts index 05faf41..a578cb2 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -250,114 +250,6 @@ export async function startApi() { }); }); - // Post message to session - typed.post('/v1/sessions/:sessionId/messages', { - schema: { - params: z.object({ - sessionId: z.string() - }), - body: z.object({ - t: z.literal('encrypted'), - c: z.string() // Base64 encoded encrypted content - }) - }, - preHandler: app.authenticate - }, async (request, reply) => { - const uid = request.user.id; - const { sessionId: sid } = request.params; - const msgContent: PrismaJson.SessionMessageContent = { - t: request.body.t, - c: request.body.c - }; - - // Start transaction to ensure consistency - const result = await db.$transaction(async (tx) => { - // Verify session belongs to user and lock it - const session = await tx.session.findFirst({ - where: { - id: sid, - accountId: uid - } - }); - - if (!session) { - throw new Error('Session not found'); - } - - // Get user for update - const user = await tx.account.findUnique({ - where: { id: uid } - }); - - if (!user) { - throw new Error('User not found'); - } - - // Get next sequence numbers - const msgSeq = session.seq + 1; - const updSeq = user.seq + 1; - - // Create message - const msg = await tx.sessionMessage.create({ - data: { - sessionId: sid, - seq: msgSeq, - content: msgContent - } - }); - - // Create update - const updContent: PrismaJson.UpdateBody = { - t: 'new-message', - sid: sid, - mid: msg.id, - c: msgContent - }; - - const update = await tx.update.create({ - data: { - accountId: uid, - seq: updSeq, - content: updContent - } - }); - - // Update sequences - await tx.session.update({ - where: { id: sid }, - data: { seq: msgSeq } - }); - - await tx.account.update({ - where: { id: uid }, - data: { seq: updSeq } - }); - - return { msg, update }; - }).catch((error) => { - if (error.message === 'Session not found') { - reply.code(404).send({ error: 'Session not found' }); - return null; - } - throw error; - }); - - if (!result) return; - - // Emit update to connected sockets - pubsub.emit('update', uid, result.update); - - return reply.send({ - message: { - id: result.msg.id, - seq: result.msg.seq, - content: result.msg.content, - createdAt: result.msg.createdAt.getTime(), - updatedAt: result.msg.updatedAt.getTime() - } - }); - }); - // Start const port = process.env.PORT ? parseInt(process.env.PORT, 10) : 3005; await app.listen({ port, host: '0.0.0.0' }); @@ -422,7 +314,7 @@ export async function startApi() { socket.emit('update', { id: update.id, seq: update.seq, - content: update.content, + body: update.content, createdAt: update.createdAt.getTime() }); } @@ -442,6 +334,101 @@ export async function startApi() { log({ module: 'websocket' }, `User disconnected: ${userId}`); }); + socket.on('message', async (data: any) => { + const { sid, message } = data; + + // Resolve session + const session = await db.session.findUnique({ + where: { id: sid, accountId: userId } + }); + if (!session) { + return; + } + + // Create encrypted message + const msgContent: PrismaJson.SessionMessageContent = { + t: 'encrypted', + c: message + }; + + // Start transaction to ensure consistency + const result = await db.$transaction(async (tx) => { + + // Verify session belongs to user and lock it + const session = await tx.session.findFirst({ + where: { + id: sid, + accountId: userId + } + }); + + if (!session) { + throw new Error('Session not found'); + } + + // Get user for update + const user = await tx.account.findUnique({ + where: { id: userId } + }); + + if (!user) { + throw new Error('User not found'); + } + + // Get next sequence numbers + const msgSeq = session.seq + 1; + const updSeq = user.seq + 1; + + // Create message + const msg = await tx.sessionMessage.create({ + data: { + sessionId: sid, + seq: msgSeq, + content: msgContent + } + }); + + // Create update + const updContent: PrismaJson.UpdateBody = { + t: 'new-message', + sid: sid, + mid: msg.id, + c: msgContent + }; + + const update = await tx.update.create({ + data: { + accountId: userId, + seq: updSeq, + content: updContent + } + }); + + // Update sequences + await tx.session.update({ + where: { id: sid }, + data: { seq: msgSeq } + }); + + await tx.account.update({ + where: { id: userId }, + data: { seq: updSeq } + }); + + return { msg, update }; + }).catch((error) => { + if (error.message === 'Session not found') { + return null; + } + throw error; + }); + + if (!result) return; + + // Emit update to connected sockets + pubsub.emit('update', userId, result.update); + }); + socket.emit('auth', { success: true, user: userId }); log({ module: 'websocket' }, `User connected: ${userId}`); });