From 2dada582284330f4b83afe804e83a19c3c22950c Mon Sep 17 00:00:00 2001 From: Steve Korshakov Date: Mon, 1 Sep 2025 11:57:27 -0700 Subject: [PATCH] fix: fix missing timeouts --- sources/app/api.ts | 5 +---- sources/app/timeout.ts | 45 ++++++++++++++++++++++++++++++++---------- sources/main.ts | 12 ++++++----- 3 files changed, 43 insertions(+), 19 deletions(-) diff --git a/sources/app/api.ts b/sources/app/api.ts index c4eb4e8..e3161d2 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -51,7 +51,7 @@ declare module 'fastify' { } -export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> { +export async function startApi(eventRouter: EventRouter): Promise<{ app: FastifyInstance; io: Server }> { // Configure log('Starting API...'); @@ -249,9 +249,6 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }); - // Initialize event router - const eventRouter = new EventRouter(); - // Auth schema typed.post('/v1/auth', { schema: { diff --git a/sources/app/timeout.ts b/sources/app/timeout.ts index 7128673..977e02a 100644 --- a/sources/app/timeout.ts +++ b/sources/app/timeout.ts @@ -1,10 +1,10 @@ -import { pubsub } from "@/services/pubsub"; import { db } from "@/storage/db"; import { delay } from "@/utils/delay"; import { forever } from "@/utils/forever"; import { shutdownSignal } from "@/utils/shutdown"; +import { buildMachineActivityEphemeral, buildSessionActivityEphemeral, EventRouter } from "@/modules/eventRouter"; -export function startTimeout() { +export function startTimeout(eventRouter: EventRouter) { forever('session-timeout', async () => { while (true) { // Find timed out sessions @@ -17,16 +17,41 @@ export function startTimeout() { } }); for (const session of sessions) { - await db.session.update({ - where: { id: session.id }, + const updated = await db.session.updateManyAndReturn({ + where: { id: session.id, active: true }, data: { active: false } }); - pubsub.emit('update-ephemeral', session.accountId, { - type: 'activity', - id: session.id, - active: false, - activeAt: session.lastActiveAt.getTime(), - thinking: false + if (updated.length === 0) { + continue; + } + eventRouter.emitEphemeral({ + userId: session.accountId, + payload: buildSessionActivityEphemeral(session.id, false, updated[0].lastActiveAt.getTime(), false), + recipientFilter: { type: 'all-user-authenticated-connections' } + }); + } + + // Find timed out machines + const machines = await db.machine.findMany({ + where: { + active: true, + lastActiveAt: { + lte: new Date(Date.now() - 1000 * 60 * 10) // 10 minutes + } + } + }); + for (const machine of machines) { + const updated = await db.machine.updateManyAndReturn({ + where: { id: machine.id, active: true }, + data: { active: false } + }); + if (updated.length === 0) { + continue; + } + eventRouter.emitEphemeral({ + userId: machine.accountId, + payload: buildMachineActivityEphemeral(machine.id, false, updated[0].lastActiveAt.getTime()), + recipientFilter: { type: 'all-user-authenticated-connections' } }); } diff --git a/sources/main.ts b/sources/main.ts index ced58ed..e636105 100644 --- a/sources/main.ts +++ b/sources/main.ts @@ -11,6 +11,7 @@ import { startDatabaseMetricsUpdater } from "@/modules/metrics"; import { initEncrypt } from "./modules/encrypt"; import { initGithub } from "./modules/github"; import { loadFiles } from "./storage/files"; +import { EventRouter } from "./modules/eventRouter"; async function main() { @@ -25,6 +26,7 @@ async function main() { await redis.ping(); // Initialize auth module + const eventRouter = new EventRouter(); await initEncrypt(); await initGithub(); await loadFiles(); @@ -34,10 +36,10 @@ async function main() { // Start // - await startApi(); + await startApi(eventRouter); await startMetricsServer(); startDatabaseMetricsUpdater(); - startTimeout(); + startTimeout(eventRouter); // // Ready @@ -56,7 +58,7 @@ process.on('uncaughtException', (error) => { stack: error.stack, name: error.name }, `Uncaught Exception: ${error.message}`); - + console.error('Uncaught Exception:', error); process.exit(1); }); @@ -64,14 +66,14 @@ process.on('uncaughtException', (error) => { process.on('unhandledRejection', (reason, promise) => { const errorMsg = reason instanceof Error ? reason.message : String(reason); const errorStack = reason instanceof Error ? reason.stack : undefined; - + log({ module: 'process-error', level: 'error', stack: errorStack, reason: String(reason) }, `Unhandled Rejection: ${errorMsg}`); - + console.error('Unhandled Rejection at:', promise, 'reason:', reason); process.exit(1); });