diff --git a/.env.dev b/.env.dev index 0d31f60..c8e4eae 100644 --- a/.env.dev +++ b/.env.dev @@ -2,5 +2,9 @@ DATABASE_URL=postgresql://postgres:postgres@localhost:5432/handy HANDY_MASTER_SECRET=your-super-secret-key-for-local-development PORT=3005 +# Metrics server configuration +METRICS_ENABLED=true +METRICS_PORT=9090 + # Uncomment to enable centralized logging for AI debugging (creates .logs directory) DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING=true diff --git a/package.json b/package.json index 078e89a..6db03f9 100644 --- a/package.json +++ b/package.json @@ -47,6 +47,7 @@ "prisma": "^6.11.1", "prisma-json-types-generator": "^3.5.1", "privacy-kit": "^0.0.23", + "prom-client": "^15.1.3", "socket.io": "^4.8.1", "socket.io-adapter": "^2.5.5", "tmp": "^0.2.3", @@ -58,4 +59,4 @@ "zod": "^3.24.2", "zod-to-json-schema": "^3.24.3" } -} \ No newline at end of file +} diff --git a/sources/app/api.ts b/sources/app/api.ts index 40320e9..266f4ec 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -28,6 +28,14 @@ import { buildUsageEphemeral, buildMachineStatusEphemeral } from "@/modules/eventRouter"; +import { + incrementWebSocketConnection, + decrementWebSocketConnection, + sessionAliveEventsCounter, + machineAliveEventsCounter, + websocketEventsCounter +} from "@/modules/metrics"; +import { activityCache } from "@/modules/sessionCache"; declare module 'fastify' { @@ -1223,6 +1231,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }; } eventRouter.addConnection(userId, connection); + incrementWebSocketConnection(connection.connectionType); // Broadcast daemon online status if (connection.connectionType === 'machine-scoped') { @@ -1240,8 +1249,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> const receiveUsageLock = new AsyncLock(); socket.on('disconnect', () => { + websocketEventsCounter.inc({ event_type: 'disconnect' }); + // Cleanup connections eventRouter.removeConnection(userId, connection); + decrementWebSocketConnection(connection.connectionType); // Clean up RPC listeners for this socket const userRpcMap = rpcListeners.get(userId); @@ -1284,6 +1296,10 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> thinking?: boolean; }) => { try { + // Track metrics + websocketEventsCounter.inc({ event_type: 'session-alive' }); + sessionAliveEventsCounter.inc(); + // Basic validation if (!data || typeof data.time !== 'number' || !data.sid) { return; @@ -1299,19 +1315,14 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> const { sid, thinking } = data; - // Resolve session - const session = await db.session.findUnique({ - where: { id: sid, accountId: userId } - }); - if (!session) { + // Check session validity using cache + const isValid = await activityCache.isSessionValid(sid, userId); + if (!isValid) { return; } - // Update last active - await db.session.update({ - where: { id: sid }, - data: { lastActiveAt: new Date(t), active: true } - }); + // Queue database update (will only update if time difference is significant) + activityCache.queueSessionUpdate(sid, t); // Emit session activity update const sessionActivity = buildSessionActivityEphemeral(sid, true, t, thinking || false); @@ -1330,6 +1341,10 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> time: number; }) => { try { + // Track metrics + websocketEventsCounter.inc({ event_type: 'machine-alive' }); + machineAliveEventsCounter.inc(); + // Basic validation if (!data || typeof data.time !== 'number' || !data.machineId) { return; @@ -1343,35 +1358,16 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> return; } - // Resolve machine - const machine = await db.machine.findUnique({ - where: { - accountId_id: { - accountId: userId, - id: data.machineId - } - } - }); - - if (!machine) { + // Check machine validity using cache + const isValid = await activityCache.isMachineValid(data.machineId, userId); + if (!isValid) { return; } - // Update machine lastActiveAt in database - const updatedMachine = await db.machine.update({ - where: { - accountId_id: { - accountId: userId, - id: data.machineId - } - }, - data: { - lastActiveAt: new Date(t), - active: true - } - }); + // Queue database update (will only update if time difference is significant) + activityCache.queueMachineUpdate(data.machineId, t); - const machineActivity = buildMachineActivityEphemeral(updatedMachine.id, true, t); + const machineActivity = buildMachineActivityEphemeral(data.machineId, true, t); eventRouter.emitEphemeral({ userId, payload: machineActivity, @@ -1428,6 +1424,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> socket.on('message', async (data: any) => { await receiveMessageLock.inLock(async () => { try { + websocketEventsCounter.inc({ event_type: 'message' }); const { sid, message, localId } = data; log({ module: 'websocket' }, `Received message from socket ${socket.id}: sessionId=${sid}, messageLength=${message.length} bytes, connectionType=${connection.connectionType}, connectionSessionId=${connection.connectionType === 'session-scoped' ? connection.sessionId : 'N/A'}`); diff --git a/sources/app/metrics.ts b/sources/app/metrics.ts new file mode 100644 index 0000000..d36c6b3 --- /dev/null +++ b/sources/app/metrics.ts @@ -0,0 +1,54 @@ +import fastify from 'fastify'; +import { db } from '@/storage/db'; +import { register } from '@/modules/metrics'; +import { log } from '@/utils/log'; + +export async function createMetricsServer() { + const app = fastify({ + logger: false // Disable logging for metrics server + }); + + app.get('/metrics', async (_request, reply) => { + try { + // Get Prisma metrics in Prometheus format + const prismaMetrics = await db.$metrics.prometheus(); + + // Get custom application metrics + const appMetrics = await register.metrics(); + + // Combine both metrics + const combinedMetrics = prismaMetrics + '\n' + appMetrics; + + reply.type('text/plain; version=0.0.4; charset=utf-8'); + reply.send(combinedMetrics); + } catch (error) { + log({ module: 'metrics', level: 'error' }, `Error generating metrics: ${error}`); + reply.code(500).send('Internal Server Error'); + } + }); + + app.get('/health', async (_request, reply) => { + reply.send({ status: 'ok', timestamp: new Date().toISOString() }); + }); + + return app; +} + +export async function startMetricsServer(): Promise { + const enabled = process.env.METRICS_ENABLED !== 'false'; + if (!enabled) { + log({ module: 'metrics' }, 'Metrics server disabled'); + return; + } + + const port = process.env.METRICS_PORT ? parseInt(process.env.METRICS_PORT, 10) : 9090; + const app = await createMetricsServer(); + + try { + await app.listen({ port, host: '0.0.0.0' }); + log({ module: 'metrics' }, `Metrics server listening on port ${port}`); + } catch (error) { + log({ module: 'metrics', level: 'error' }, `Failed to start metrics server: ${error}`); + throw error; + } +} \ No newline at end of file diff --git a/sources/main.ts b/sources/main.ts index 8fcb79f..557af25 100644 --- a/sources/main.ts +++ b/sources/main.ts @@ -4,6 +4,8 @@ import { awaitShutdown, onShutdown } from "@/utils/shutdown"; import { db } from './storage/db'; import { startTimeout } from "./app/timeout"; import { redis } from "./services/redis"; +import { startMetricsServer } from "@/app/metrics"; +import { activityCache } from "@/modules/sessionCache"; async function main() { @@ -12,6 +14,9 @@ async function main() { onShutdown('db', async () => { await db.$disconnect(); }); + onShutdown('activity-cache', async () => { + activityCache.shutdown(); + }); await redis.ping(); // @@ -19,6 +24,7 @@ async function main() { // await startApi(); + await startMetricsServer(); startTimeout(); // diff --git a/sources/modules/metrics.ts b/sources/modules/metrics.ts new file mode 100644 index 0000000..0a4e1d1 --- /dev/null +++ b/sources/modules/metrics.ts @@ -0,0 +1,77 @@ +import { register, Counter, Gauge, Histogram } from 'prom-client'; + +// Application metrics +export const websocketConnectionsGauge = new Gauge({ + name: 'websocket_connections_total', + help: 'Number of active WebSocket connections', + labelNames: ['type'] as const, + registers: [register] +}); + +export const sessionAliveEventsCounter = new Counter({ + name: 'session_alive_events_total', + help: 'Total number of session-alive events', + registers: [register] +}); + +export const machineAliveEventsCounter = new Counter({ + name: 'machine_alive_events_total', + help: 'Total number of machine-alive events', + registers: [register] +}); + +export const sessionCacheCounter = new Counter({ + name: 'session_cache_operations_total', + help: 'Total session cache operations', + labelNames: ['operation', 'result'] as const, + registers: [register] +}); + +export const databaseUpdatesSkippedCounter = new Counter({ + name: 'database_updates_skipped_total', + help: 'Number of database updates skipped due to debouncing', + labelNames: ['type'] as const, + registers: [register] +}); + +export const websocketEventsCounter = new Counter({ + name: 'websocket_events_total', + help: 'Total WebSocket events received by type', + labelNames: ['event_type'] as const, + registers: [register] +}); + +export const httpRequestsCounter = new Counter({ + name: 'http_requests_total', + help: 'Total number of HTTP requests', + labelNames: ['method', 'route', 'status'] as const, + registers: [register] +}); + +export const httpRequestDurationHistogram = new Histogram({ + name: 'http_request_duration_seconds', + help: 'HTTP request duration in seconds', + labelNames: ['method', 'route', 'status'] as const, + buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10], + registers: [register] +}); + +// WebSocket connection tracking +const connectionCounts = { + 'user-scoped': 0, + 'session-scoped': 0, + 'machine-scoped': 0 +}; + +export function incrementWebSocketConnection(type: 'user-scoped' | 'session-scoped' | 'machine-scoped'): void { + connectionCounts[type]++; + websocketConnectionsGauge.set({ type }, connectionCounts[type]); +} + +export function decrementWebSocketConnection(type: 'user-scoped' | 'session-scoped' | 'machine-scoped'): void { + connectionCounts[type] = Math.max(0, connectionCounts[type] - 1); + websocketConnectionsGauge.set({ type }, connectionCounts[type]); +} + +// Export the register for combining metrics +export { register }; \ No newline at end of file diff --git a/sources/modules/sessionCache.ts b/sources/modules/sessionCache.ts new file mode 100644 index 0000000..fa1f19d --- /dev/null +++ b/sources/modules/sessionCache.ts @@ -0,0 +1,260 @@ +import { db } from "@/storage/db"; +import { log } from "@/utils/log"; +import { sessionCacheCounter, databaseUpdatesSkippedCounter } from "@/modules/metrics"; + +interface SessionCacheEntry { + validUntil: number; + lastUpdateSent: number; + pendingUpdate: number | null; + userId: string; +} + +interface MachineCacheEntry { + validUntil: number; + lastUpdateSent: number; + pendingUpdate: number | null; + userId: string; +} + +class ActivityCache { + private sessionCache = new Map(); + private machineCache = new Map(); + private batchTimer: NodeJS.Timeout | null = null; + + // Cache TTL (30 seconds) + private readonly CACHE_TTL = 30 * 1000; + + // Only update DB if time difference is significant (30 seconds) + private readonly UPDATE_THRESHOLD = 30 * 1000; + + // Batch update interval (5 seconds) + private readonly BATCH_INTERVAL = 5 * 1000; + + constructor() { + this.startBatchTimer(); + } + + private startBatchTimer(): void { + if (this.batchTimer) { + clearInterval(this.batchTimer); + } + + this.batchTimer = setInterval(() => { + this.flushPendingUpdates().catch(error => { + log({ module: 'session-cache', level: 'error' }, `Error flushing updates: ${error}`); + }); + }, this.BATCH_INTERVAL); + } + + async isSessionValid(sessionId: string, userId: string): Promise { + const now = Date.now(); + const cached = this.sessionCache.get(sessionId); + + // Check cache first + if (cached && cached.validUntil > now && cached.userId === userId) { + sessionCacheCounter.inc({ operation: 'session_validation', result: 'hit' }); + return true; + } + + sessionCacheCounter.inc({ operation: 'session_validation', result: 'miss' }); + + // Cache miss - check database + try { + const session = await db.session.findUnique({ + where: { id: sessionId, accountId: userId } + }); + + if (session) { + // Cache the result + this.sessionCache.set(sessionId, { + validUntil: now + this.CACHE_TTL, + lastUpdateSent: session.lastActiveAt.getTime(), + pendingUpdate: null, + userId + }); + return true; + } + + return false; + } catch (error) { + log({ module: 'session-cache', level: 'error' }, `Error validating session ${sessionId}: ${error}`); + return false; + } + } + + async isMachineValid(machineId: string, userId: string): Promise { + const now = Date.now(); + const cached = this.machineCache.get(machineId); + + // Check cache first + if (cached && cached.validUntil > now && cached.userId === userId) { + sessionCacheCounter.inc({ operation: 'machine_validation', result: 'hit' }); + return true; + } + + sessionCacheCounter.inc({ operation: 'machine_validation', result: 'miss' }); + + // Cache miss - check database + try { + const machine = await db.machine.findUnique({ + where: { + accountId_id: { + accountId: userId, + id: machineId + } + } + }); + + if (machine) { + // Cache the result + this.machineCache.set(machineId, { + validUntil: now + this.CACHE_TTL, + lastUpdateSent: machine.lastActiveAt?.getTime() || 0, + pendingUpdate: null, + userId + }); + return true; + } + + return false; + } catch (error) { + log({ module: 'session-cache', level: 'error' }, `Error validating machine ${machineId}: ${error}`); + return false; + } + } + + queueSessionUpdate(sessionId: string, timestamp: number): boolean { + const cached = this.sessionCache.get(sessionId); + if (!cached) { + return false; // Should validate first + } + + // Only queue if time difference is significant + const timeDiff = Math.abs(timestamp - cached.lastUpdateSent); + if (timeDiff > this.UPDATE_THRESHOLD) { + cached.pendingUpdate = timestamp; + return true; + } + + databaseUpdatesSkippedCounter.inc({ type: 'session' }); + return false; // No update needed + } + + queueMachineUpdate(machineId: string, timestamp: number): boolean { + const cached = this.machineCache.get(machineId); + if (!cached) { + return false; // Should validate first + } + + // Only queue if time difference is significant + const timeDiff = Math.abs(timestamp - cached.lastUpdateSent); + if (timeDiff > this.UPDATE_THRESHOLD) { + cached.pendingUpdate = timestamp; + return true; + } + + databaseUpdatesSkippedCounter.inc({ type: 'machine' }); + return false; // No update needed + } + + private async flushPendingUpdates(): Promise { + const sessionUpdates: { id: string, timestamp: number }[] = []; + const machineUpdates: { id: string, timestamp: number, userId: string }[] = []; + + // Collect session updates + for (const [sessionId, entry] of this.sessionCache.entries()) { + if (entry.pendingUpdate) { + sessionUpdates.push({ id: sessionId, timestamp: entry.pendingUpdate }); + entry.lastUpdateSent = entry.pendingUpdate; + entry.pendingUpdate = null; + } + } + + // Collect machine updates + for (const [machineId, entry] of this.machineCache.entries()) { + if (entry.pendingUpdate) { + machineUpdates.push({ + id: machineId, + timestamp: entry.pendingUpdate, + userId: entry.userId + }); + entry.lastUpdateSent = entry.pendingUpdate; + entry.pendingUpdate = null; + } + } + + // Batch update sessions + if (sessionUpdates.length > 0) { + try { + await Promise.all(sessionUpdates.map(update => + db.session.update({ + where: { id: update.id }, + data: { lastActiveAt: new Date(update.timestamp), active: true } + }) + )); + + log({ module: 'session-cache' }, `Flushed ${sessionUpdates.length} session updates`); + } catch (error) { + log({ module: 'session-cache', level: 'error' }, `Error updating sessions: ${error}`); + } + } + + // Batch update machines + if (machineUpdates.length > 0) { + try { + await Promise.all(machineUpdates.map(update => + db.machine.update({ + where: { + accountId_id: { + accountId: update.userId, + id: update.id + } + }, + data: { lastActiveAt: new Date(update.timestamp) } + }) + )); + + log({ module: 'session-cache' }, `Flushed ${machineUpdates.length} machine updates`); + } catch (error) { + log({ module: 'session-cache', level: 'error' }, `Error updating machines: ${error}`); + } + } + } + + // Cleanup old cache entries periodically + cleanup(): void { + const now = Date.now(); + + for (const [sessionId, entry] of this.sessionCache.entries()) { + if (entry.validUntil < now) { + this.sessionCache.delete(sessionId); + } + } + + for (const [machineId, entry] of this.machineCache.entries()) { + if (entry.validUntil < now) { + this.machineCache.delete(machineId); + } + } + } + + shutdown(): void { + if (this.batchTimer) { + clearInterval(this.batchTimer); + this.batchTimer = null; + } + + // Flush any remaining updates + this.flushPendingUpdates().catch(error => { + log({ module: 'session-cache', level: 'error' }, `Error flushing final updates: ${error}`); + }); + } +} + +// Global instance +export const activityCache = new ActivityCache(); + +// Cleanup every 5 minutes +setInterval(() => { + activityCache.cleanup(); +}, 5 * 60 * 1000); \ No newline at end of file diff --git a/yarn.lock b/yarn.lock index 73a25ea..bcddcc0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -381,6 +381,11 @@ resolved "https://registry.yarnpkg.com/@noble/hashes/-/hashes-1.8.0.tgz#cee43d801fcef9644b11b8194857695acd5f815a" integrity sha512-jCs9ldd7NwzpgXDIf6P3+NrHh9/sD6CQdxHyjQI+h/6rDNo88ypBxxz45UDuZHz9r3tNz7N/VInSVoVdtXEI4A== +"@opentelemetry/api@^1.4.0": + version "1.9.0" + resolved "https://registry.yarnpkg.com/@opentelemetry/api/-/api-1.9.0.tgz#d03eba68273dc0f7509e2a3d5cba21eae10379fe" + integrity sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg== + "@peculiar/asn1-cms@^2.3.13", "@peculiar/asn1-cms@^2.3.15": version "2.3.15" resolved "https://registry.yarnpkg.com/@peculiar/asn1-cms/-/asn1-cms-2.3.15.tgz#8baf1fcf51dae2e9122126e13acf6a2e1698d35c" @@ -1024,6 +1029,11 @@ base64id@2.0.0, base64id@~2.0.0: resolved "https://registry.yarnpkg.com/base64id/-/base64id-2.0.0.tgz#2770ac6bc47d312af97a8bf9a634342e0cd25cb6" integrity sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog== +bintrees@1.0.2: + version "1.0.2" + resolved "https://registry.yarnpkg.com/bintrees/-/bintrees-1.0.2.tgz#49f896d6e858a4a499df85c38fb399b9aff840f8" + integrity sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw== + buffer-equal-constant-time@^1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/buffer-equal-constant-time/-/buffer-equal-constant-time-1.0.1.tgz#f8e71132f7ffe6e01a5c9697a4c6f3e48d5cc819" @@ -2037,6 +2047,14 @@ process@^0.11.10: resolved "https://registry.yarnpkg.com/process/-/process-0.11.10.tgz#7332300e840161bda3e69a1d1d91a7d4bc16f182" integrity sha512-cdGef/drWFoydD1JsMzuFf8100nZl+GT+yacc2bEced5f9Rjk4z+WtFUTBu9PhOi9j/jfmBPu0mMEY4wIdAF8A== +prom-client@^15.1.3: + version "15.1.3" + resolved "https://registry.yarnpkg.com/prom-client/-/prom-client-15.1.3.tgz#69fa8de93a88bc9783173db5f758dc1c69fa8fc2" + integrity sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g== + dependencies: + "@opentelemetry/api" "^1.4.0" + tdigest "^0.1.1" + proxy-from-env@^1.1.0: version "1.1.0" resolved "https://registry.yarnpkg.com/proxy-from-env/-/proxy-from-env-1.1.0.tgz#e102f16ca355424865755d2c9e8ea4f24d58c3e2" @@ -2352,6 +2370,13 @@ supports-color@^7.1.0: dependencies: has-flag "^4.0.0" +tdigest@^0.1.1: + version "0.1.2" + resolved "https://registry.yarnpkg.com/tdigest/-/tdigest-0.1.2.tgz#96c64bac4ff10746b910b0e23b515794e12faced" + integrity sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA== + dependencies: + bintrees "1.0.2" + thread-stream@^3.0.0: version "3.1.0" resolved "https://registry.yarnpkg.com/thread-stream/-/thread-stream-3.1.0.tgz#4b2ef252a7c215064507d4ef70c05a5e2d34c4f1"