ref: move message sending to socket

This commit is contained in:
Steve Korshakov 2025-07-13 13:04:49 -07:00
parent c99c1d91e1
commit b0735b28a3

View File

@ -250,114 +250,6 @@ export async function startApi() {
}); });
}); });
// Post message to session
typed.post('/v1/sessions/:sessionId/messages', {
schema: {
params: z.object({
sessionId: z.string()
}),
body: z.object({
t: z.literal('encrypted'),
c: z.string() // Base64 encoded encrypted content
})
},
preHandler: app.authenticate
}, async (request, reply) => {
const uid = request.user.id;
const { sessionId: sid } = request.params;
const msgContent: PrismaJson.SessionMessageContent = {
t: request.body.t,
c: request.body.c
};
// Start transaction to ensure consistency
const result = await db.$transaction(async (tx) => {
// Verify session belongs to user and lock it
const session = await tx.session.findFirst({
where: {
id: sid,
accountId: uid
}
});
if (!session) {
throw new Error('Session not found');
}
// Get user for update
const user = await tx.account.findUnique({
where: { id: uid }
});
if (!user) {
throw new Error('User not found');
}
// Get next sequence numbers
const msgSeq = session.seq + 1;
const updSeq = user.seq + 1;
// Create message
const msg = await tx.sessionMessage.create({
data: {
sessionId: sid,
seq: msgSeq,
content: msgContent
}
});
// Create update
const updContent: PrismaJson.UpdateBody = {
t: 'new-message',
sid: sid,
mid: msg.id,
c: msgContent
};
const update = await tx.update.create({
data: {
accountId: uid,
seq: updSeq,
content: updContent
}
});
// Update sequences
await tx.session.update({
where: { id: sid },
data: { seq: msgSeq }
});
await tx.account.update({
where: { id: uid },
data: { seq: updSeq }
});
return { msg, update };
}).catch((error) => {
if (error.message === 'Session not found') {
reply.code(404).send({ error: 'Session not found' });
return null;
}
throw error;
});
if (!result) return;
// Emit update to connected sockets
pubsub.emit('update', uid, result.update);
return reply.send({
message: {
id: result.msg.id,
seq: result.msg.seq,
content: result.msg.content,
createdAt: result.msg.createdAt.getTime(),
updatedAt: result.msg.updatedAt.getTime()
}
});
});
// Start // Start
const port = process.env.PORT ? parseInt(process.env.PORT, 10) : 3005; const port = process.env.PORT ? parseInt(process.env.PORT, 10) : 3005;
await app.listen({ port, host: '0.0.0.0' }); await app.listen({ port, host: '0.0.0.0' });
@ -422,7 +314,7 @@ export async function startApi() {
socket.emit('update', { socket.emit('update', {
id: update.id, id: update.id,
seq: update.seq, seq: update.seq,
content: update.content, body: update.content,
createdAt: update.createdAt.getTime() createdAt: update.createdAt.getTime()
}); });
} }
@ -442,6 +334,101 @@ export async function startApi() {
log({ module: 'websocket' }, `User disconnected: ${userId}`); log({ module: 'websocket' }, `User disconnected: ${userId}`);
}); });
socket.on('message', async (data: any) => {
const { sid, message } = data;
// Resolve session
const session = await db.session.findUnique({
where: { id: sid, accountId: userId }
});
if (!session) {
return;
}
// Create encrypted message
const msgContent: PrismaJson.SessionMessageContent = {
t: 'encrypted',
c: message
};
// Start transaction to ensure consistency
const result = await db.$transaction(async (tx) => {
// Verify session belongs to user and lock it
const session = await tx.session.findFirst({
where: {
id: sid,
accountId: userId
}
});
if (!session) {
throw new Error('Session not found');
}
// Get user for update
const user = await tx.account.findUnique({
where: { id: userId }
});
if (!user) {
throw new Error('User not found');
}
// Get next sequence numbers
const msgSeq = session.seq + 1;
const updSeq = user.seq + 1;
// Create message
const msg = await tx.sessionMessage.create({
data: {
sessionId: sid,
seq: msgSeq,
content: msgContent
}
});
// Create update
const updContent: PrismaJson.UpdateBody = {
t: 'new-message',
sid: sid,
mid: msg.id,
c: msgContent
};
const update = await tx.update.create({
data: {
accountId: userId,
seq: updSeq,
content: updContent
}
});
// Update sequences
await tx.session.update({
where: { id: sid },
data: { seq: msgSeq }
});
await tx.account.update({
where: { id: userId },
data: { seq: updSeq }
});
return { msg, update };
}).catch((error) => {
if (error.message === 'Session not found') {
return null;
}
throw error;
});
if (!result) return;
// Emit update to connected sockets
pubsub.emit('update', userId, result.update);
});
socket.emit('auth', { success: true, user: userId }); socket.emit('auth', { success: true, user: userId });
log({ module: 'websocket' }, `User connected: ${userId}`); log({ module: 'websocket' }, `User connected: ${userId}`);
}); });