From 4853e0ed5289b7480b59d72c9896702fe276ed05 Mon Sep 17 00:00:00 2001 From: Steve Korshakov Date: Sun, 13 Jul 2025 13:18:01 -0700 Subject: [PATCH] feat: new session update --- sources/app/api.ts | 123 ++++++++++++++++++++++++++------------- sources/storage/types.ts | 6 ++ 2 files changed, 89 insertions(+), 40 deletions(-) diff --git a/sources/app/api.ts b/sources/app/api.ts index a578cb2..0bb1f04 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -124,7 +124,6 @@ export async function startApi() { take: 150, select: { id: true, - tag: true, seq: true, createdAt: true, updatedAt: true, @@ -144,7 +143,6 @@ export async function startApi() { return reply.send({ sessions: sessions.map((v) => ({ id: v.id, - tag: v.tag, seq: v.seq, createdAt: v.createdAt.getTime(), updatedAt: v.updatedAt.getTime(), @@ -170,36 +168,81 @@ export async function startApi() { const userId = request.user.id; const { tag } = request.body; - const session = await db.session.upsert({ + const session = await db.session.findFirst({ where: { - accountId_tag: { - accountId: userId, - tag: tag - } - }, - update: {}, - create: { accountId: userId, tag: tag - }, - select: { - id: true, - tag: true, - seq: true, - createdAt: true, - updatedAt: true } }); + if (session) { + return reply.send({ + session: { + id: session.id, + seq: session.seq, + createdAt: session.createdAt.getTime(), + updatedAt: session.updatedAt.getTime() + } + }); + } else { + // Create new session with update + const result = await db.$transaction(async (tx) => { + // Get user for update sequence + const user = await tx.account.findUnique({ + where: { id: userId } + }); - return reply.send({ - session: { - id: session.id, - tag: session.tag, - seq: session.seq, - createdAt: session.createdAt.getTime(), - updatedAt: session.updatedAt.getTime() - } - }); + if (!user) { + throw new Error('User not found'); + } + + const updSeq = user.seq + 1; + + // Create session + const session = await tx.session.create({ + data: { + accountId: userId, + tag: tag + } + }); + + // Create update + const updContent: PrismaJson.UpdateBody = { + t: 'new-session', + id: session.id, + seq: session.seq, + createdAt: session.createdAt.getTime(), + updatedAt: session.updatedAt.getTime() + }; + + const update = await tx.update.create({ + data: { + accountId: userId, + seq: updSeq, + content: updContent + } + }); + + // Update user sequence + await tx.account.update({ + where: { id: userId }, + data: { seq: updSeq } + }); + + return { session, update }; + }); + + // Emit update to connected sockets + pubsub.emit('update', userId, result.update); + + return reply.send({ + session: { + id: result.session.id, + seq: result.session.seq, + createdAt: result.session.createdAt.getTime(), + updatedAt: result.session.updatedAt.getTime() + } + }); + } }); // Messages API @@ -258,7 +301,7 @@ export async function startApi() { if (!app.server) { throw new Error('Fastify server not available'); } - + const io = new Server(app.server, { cors: { origin: "*", @@ -275,7 +318,7 @@ export async function startApi() { connectTimeout: 20000, serveClient: false // Don't serve the client files }); - + // Track connected users const userSockets = new Map>(); @@ -350,7 +393,7 @@ export async function startApi() { t: 'encrypted', c: message }; - + // Start transaction to ensure consistency const result = await db.$transaction(async (tx) => { @@ -365,20 +408,20 @@ export async function startApi() { if (!session) { throw new Error('Session not found'); } - + // Get user for update const user = await tx.account.findUnique({ where: { id: userId } }); - + if (!user) { throw new Error('User not found'); } - + // Get next sequence numbers const msgSeq = session.seq + 1; const updSeq = user.seq + 1; - + // Create message const msg = await tx.sessionMessage.create({ data: { @@ -387,7 +430,7 @@ export async function startApi() { content: msgContent } }); - + // Create update const updContent: PrismaJson.UpdateBody = { t: 'new-message', @@ -395,7 +438,7 @@ export async function startApi() { mid: msg.id, c: msgContent }; - + const update = await tx.update.create({ data: { accountId: userId, @@ -403,18 +446,18 @@ export async function startApi() { content: updContent } }); - + // Update sequences await tx.session.update({ where: { id: sid }, data: { seq: msgSeq } }); - + await tx.account.update({ where: { id: userId }, data: { seq: updSeq } }); - + return { msg, update }; }).catch((error) => { if (error.message === 'Session not found') { @@ -422,9 +465,9 @@ export async function startApi() { } throw error; }); - + if (!result) return; - + // Emit update to connected sockets pubsub.emit('update', userId, result.update); }); diff --git a/sources/storage/types.ts b/sources/storage/types.ts index 33a7b20..d948d3c 100644 --- a/sources/storage/types.ts +++ b/sources/storage/types.ts @@ -12,6 +12,12 @@ declare global { sid: string; mid: string; c: SessionMessageContent; + } | { + t: 'new-session'; + id: string; + seq: number; + createdAt: number; + updatedAt: number; }; } }