fix: add metrics

This commit is contained in:
Steve Korshakov 2025-08-19 18:03:03 -07:00
parent 62ac1e4132
commit 35299fbbdf
8 changed files with 460 additions and 36 deletions

View File

@ -2,5 +2,9 @@ DATABASE_URL=postgresql://postgres:postgres@localhost:5432/handy
HANDY_MASTER_SECRET=your-super-secret-key-for-local-development
PORT=3005
# Metrics server configuration
METRICS_ENABLED=true
METRICS_PORT=9090
# Uncomment to enable centralized logging for AI debugging (creates .logs directory)
DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING=true

View File

@ -47,6 +47,7 @@
"prisma": "^6.11.1",
"prisma-json-types-generator": "^3.5.1",
"privacy-kit": "^0.0.23",
"prom-client": "^15.1.3",
"socket.io": "^4.8.1",
"socket.io-adapter": "^2.5.5",
"tmp": "^0.2.3",
@ -58,4 +59,4 @@
"zod": "^3.24.2",
"zod-to-json-schema": "^3.24.3"
}
}
}

View File

@ -28,6 +28,14 @@ import {
buildUsageEphemeral,
buildMachineStatusEphemeral
} from "@/modules/eventRouter";
import {
incrementWebSocketConnection,
decrementWebSocketConnection,
sessionAliveEventsCounter,
machineAliveEventsCounter,
websocketEventsCounter
} from "@/modules/metrics";
import { activityCache } from "@/modules/sessionCache";
declare module 'fastify' {
@ -1223,6 +1231,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
};
}
eventRouter.addConnection(userId, connection);
incrementWebSocketConnection(connection.connectionType);
// Broadcast daemon online status
if (connection.connectionType === 'machine-scoped') {
@ -1240,8 +1249,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
const receiveUsageLock = new AsyncLock();
socket.on('disconnect', () => {
websocketEventsCounter.inc({ event_type: 'disconnect' });
// Cleanup connections
eventRouter.removeConnection(userId, connection);
decrementWebSocketConnection(connection.connectionType);
// Clean up RPC listeners for this socket
const userRpcMap = rpcListeners.get(userId);
@ -1284,6 +1296,10 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
thinking?: boolean;
}) => {
try {
// Track metrics
websocketEventsCounter.inc({ event_type: 'session-alive' });
sessionAliveEventsCounter.inc();
// Basic validation
if (!data || typeof data.time !== 'number' || !data.sid) {
return;
@ -1299,19 +1315,14 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
const { sid, thinking } = data;
// Resolve session
const session = await db.session.findUnique({
where: { id: sid, accountId: userId }
});
if (!session) {
// Check session validity using cache
const isValid = await activityCache.isSessionValid(sid, userId);
if (!isValid) {
return;
}
// Update last active
await db.session.update({
where: { id: sid },
data: { lastActiveAt: new Date(t), active: true }
});
// Queue database update (will only update if time difference is significant)
activityCache.queueSessionUpdate(sid, t);
// Emit session activity update
const sessionActivity = buildSessionActivityEphemeral(sid, true, t, thinking || false);
@ -1330,6 +1341,10 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
time: number;
}) => {
try {
// Track metrics
websocketEventsCounter.inc({ event_type: 'machine-alive' });
machineAliveEventsCounter.inc();
// Basic validation
if (!data || typeof data.time !== 'number' || !data.machineId) {
return;
@ -1343,35 +1358,16 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
return;
}
// Resolve machine
const machine = await db.machine.findUnique({
where: {
accountId_id: {
accountId: userId,
id: data.machineId
}
}
});
if (!machine) {
// Check machine validity using cache
const isValid = await activityCache.isMachineValid(data.machineId, userId);
if (!isValid) {
return;
}
// Update machine lastActiveAt in database
const updatedMachine = await db.machine.update({
where: {
accountId_id: {
accountId: userId,
id: data.machineId
}
},
data: {
lastActiveAt: new Date(t),
active: true
}
});
// Queue database update (will only update if time difference is significant)
activityCache.queueMachineUpdate(data.machineId, t);
const machineActivity = buildMachineActivityEphemeral(updatedMachine.id, true, t);
const machineActivity = buildMachineActivityEphemeral(data.machineId, true, t);
eventRouter.emitEphemeral({
userId,
payload: machineActivity,
@ -1428,6 +1424,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
socket.on('message', async (data: any) => {
await receiveMessageLock.inLock(async () => {
try {
websocketEventsCounter.inc({ event_type: 'message' });
const { sid, message, localId } = data;
log({ module: 'websocket' }, `Received message from socket ${socket.id}: sessionId=${sid}, messageLength=${message.length} bytes, connectionType=${connection.connectionType}, connectionSessionId=${connection.connectionType === 'session-scoped' ? connection.sessionId : 'N/A'}`);

54
sources/app/metrics.ts Normal file
View File

@ -0,0 +1,54 @@
import fastify from 'fastify';
import { db } from '@/storage/db';
import { register } from '@/modules/metrics';
import { log } from '@/utils/log';
export async function createMetricsServer() {
const app = fastify({
logger: false // Disable logging for metrics server
});
app.get('/metrics', async (_request, reply) => {
try {
// Get Prisma metrics in Prometheus format
const prismaMetrics = await db.$metrics.prometheus();
// Get custom application metrics
const appMetrics = await register.metrics();
// Combine both metrics
const combinedMetrics = prismaMetrics + '\n' + appMetrics;
reply.type('text/plain; version=0.0.4; charset=utf-8');
reply.send(combinedMetrics);
} catch (error) {
log({ module: 'metrics', level: 'error' }, `Error generating metrics: ${error}`);
reply.code(500).send('Internal Server Error');
}
});
app.get('/health', async (_request, reply) => {
reply.send({ status: 'ok', timestamp: new Date().toISOString() });
});
return app;
}
export async function startMetricsServer(): Promise<void> {
const enabled = process.env.METRICS_ENABLED !== 'false';
if (!enabled) {
log({ module: 'metrics' }, 'Metrics server disabled');
return;
}
const port = process.env.METRICS_PORT ? parseInt(process.env.METRICS_PORT, 10) : 9090;
const app = await createMetricsServer();
try {
await app.listen({ port, host: '0.0.0.0' });
log({ module: 'metrics' }, `Metrics server listening on port ${port}`);
} catch (error) {
log({ module: 'metrics', level: 'error' }, `Failed to start metrics server: ${error}`);
throw error;
}
}

View File

@ -4,6 +4,8 @@ import { awaitShutdown, onShutdown } from "@/utils/shutdown";
import { db } from './storage/db';
import { startTimeout } from "./app/timeout";
import { redis } from "./services/redis";
import { startMetricsServer } from "@/app/metrics";
import { activityCache } from "@/modules/sessionCache";
async function main() {
@ -12,6 +14,9 @@ async function main() {
onShutdown('db', async () => {
await db.$disconnect();
});
onShutdown('activity-cache', async () => {
activityCache.shutdown();
});
await redis.ping();
//
@ -19,6 +24,7 @@ async function main() {
//
await startApi();
await startMetricsServer();
startTimeout();
//

View File

@ -0,0 +1,77 @@
import { register, Counter, Gauge, Histogram } from 'prom-client';
// Application metrics
export const websocketConnectionsGauge = new Gauge({
name: 'websocket_connections_total',
help: 'Number of active WebSocket connections',
labelNames: ['type'] as const,
registers: [register]
});
export const sessionAliveEventsCounter = new Counter({
name: 'session_alive_events_total',
help: 'Total number of session-alive events',
registers: [register]
});
export const machineAliveEventsCounter = new Counter({
name: 'machine_alive_events_total',
help: 'Total number of machine-alive events',
registers: [register]
});
export const sessionCacheCounter = new Counter({
name: 'session_cache_operations_total',
help: 'Total session cache operations',
labelNames: ['operation', 'result'] as const,
registers: [register]
});
export const databaseUpdatesSkippedCounter = new Counter({
name: 'database_updates_skipped_total',
help: 'Number of database updates skipped due to debouncing',
labelNames: ['type'] as const,
registers: [register]
});
export const websocketEventsCounter = new Counter({
name: 'websocket_events_total',
help: 'Total WebSocket events received by type',
labelNames: ['event_type'] as const,
registers: [register]
});
export const httpRequestsCounter = new Counter({
name: 'http_requests_total',
help: 'Total number of HTTP requests',
labelNames: ['method', 'route', 'status'] as const,
registers: [register]
});
export const httpRequestDurationHistogram = new Histogram({
name: 'http_request_duration_seconds',
help: 'HTTP request duration in seconds',
labelNames: ['method', 'route', 'status'] as const,
buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10],
registers: [register]
});
// WebSocket connection tracking
const connectionCounts = {
'user-scoped': 0,
'session-scoped': 0,
'machine-scoped': 0
};
export function incrementWebSocketConnection(type: 'user-scoped' | 'session-scoped' | 'machine-scoped'): void {
connectionCounts[type]++;
websocketConnectionsGauge.set({ type }, connectionCounts[type]);
}
export function decrementWebSocketConnection(type: 'user-scoped' | 'session-scoped' | 'machine-scoped'): void {
connectionCounts[type] = Math.max(0, connectionCounts[type] - 1);
websocketConnectionsGauge.set({ type }, connectionCounts[type]);
}
// Export the register for combining metrics
export { register };

View File

@ -0,0 +1,260 @@
import { db } from "@/storage/db";
import { log } from "@/utils/log";
import { sessionCacheCounter, databaseUpdatesSkippedCounter } from "@/modules/metrics";
interface SessionCacheEntry {
validUntil: number;
lastUpdateSent: number;
pendingUpdate: number | null;
userId: string;
}
interface MachineCacheEntry {
validUntil: number;
lastUpdateSent: number;
pendingUpdate: number | null;
userId: string;
}
class ActivityCache {
private sessionCache = new Map<string, SessionCacheEntry>();
private machineCache = new Map<string, MachineCacheEntry>();
private batchTimer: NodeJS.Timeout | null = null;
// Cache TTL (30 seconds)
private readonly CACHE_TTL = 30 * 1000;
// Only update DB if time difference is significant (30 seconds)
private readonly UPDATE_THRESHOLD = 30 * 1000;
// Batch update interval (5 seconds)
private readonly BATCH_INTERVAL = 5 * 1000;
constructor() {
this.startBatchTimer();
}
private startBatchTimer(): void {
if (this.batchTimer) {
clearInterval(this.batchTimer);
}
this.batchTimer = setInterval(() => {
this.flushPendingUpdates().catch(error => {
log({ module: 'session-cache', level: 'error' }, `Error flushing updates: ${error}`);
});
}, this.BATCH_INTERVAL);
}
async isSessionValid(sessionId: string, userId: string): Promise<boolean> {
const now = Date.now();
const cached = this.sessionCache.get(sessionId);
// Check cache first
if (cached && cached.validUntil > now && cached.userId === userId) {
sessionCacheCounter.inc({ operation: 'session_validation', result: 'hit' });
return true;
}
sessionCacheCounter.inc({ operation: 'session_validation', result: 'miss' });
// Cache miss - check database
try {
const session = await db.session.findUnique({
where: { id: sessionId, accountId: userId }
});
if (session) {
// Cache the result
this.sessionCache.set(sessionId, {
validUntil: now + this.CACHE_TTL,
lastUpdateSent: session.lastActiveAt.getTime(),
pendingUpdate: null,
userId
});
return true;
}
return false;
} catch (error) {
log({ module: 'session-cache', level: 'error' }, `Error validating session ${sessionId}: ${error}`);
return false;
}
}
async isMachineValid(machineId: string, userId: string): Promise<boolean> {
const now = Date.now();
const cached = this.machineCache.get(machineId);
// Check cache first
if (cached && cached.validUntil > now && cached.userId === userId) {
sessionCacheCounter.inc({ operation: 'machine_validation', result: 'hit' });
return true;
}
sessionCacheCounter.inc({ operation: 'machine_validation', result: 'miss' });
// Cache miss - check database
try {
const machine = await db.machine.findUnique({
where: {
accountId_id: {
accountId: userId,
id: machineId
}
}
});
if (machine) {
// Cache the result
this.machineCache.set(machineId, {
validUntil: now + this.CACHE_TTL,
lastUpdateSent: machine.lastActiveAt?.getTime() || 0,
pendingUpdate: null,
userId
});
return true;
}
return false;
} catch (error) {
log({ module: 'session-cache', level: 'error' }, `Error validating machine ${machineId}: ${error}`);
return false;
}
}
queueSessionUpdate(sessionId: string, timestamp: number): boolean {
const cached = this.sessionCache.get(sessionId);
if (!cached) {
return false; // Should validate first
}
// Only queue if time difference is significant
const timeDiff = Math.abs(timestamp - cached.lastUpdateSent);
if (timeDiff > this.UPDATE_THRESHOLD) {
cached.pendingUpdate = timestamp;
return true;
}
databaseUpdatesSkippedCounter.inc({ type: 'session' });
return false; // No update needed
}
queueMachineUpdate(machineId: string, timestamp: number): boolean {
const cached = this.machineCache.get(machineId);
if (!cached) {
return false; // Should validate first
}
// Only queue if time difference is significant
const timeDiff = Math.abs(timestamp - cached.lastUpdateSent);
if (timeDiff > this.UPDATE_THRESHOLD) {
cached.pendingUpdate = timestamp;
return true;
}
databaseUpdatesSkippedCounter.inc({ type: 'machine' });
return false; // No update needed
}
private async flushPendingUpdates(): Promise<void> {
const sessionUpdates: { id: string, timestamp: number }[] = [];
const machineUpdates: { id: string, timestamp: number, userId: string }[] = [];
// Collect session updates
for (const [sessionId, entry] of this.sessionCache.entries()) {
if (entry.pendingUpdate) {
sessionUpdates.push({ id: sessionId, timestamp: entry.pendingUpdate });
entry.lastUpdateSent = entry.pendingUpdate;
entry.pendingUpdate = null;
}
}
// Collect machine updates
for (const [machineId, entry] of this.machineCache.entries()) {
if (entry.pendingUpdate) {
machineUpdates.push({
id: machineId,
timestamp: entry.pendingUpdate,
userId: entry.userId
});
entry.lastUpdateSent = entry.pendingUpdate;
entry.pendingUpdate = null;
}
}
// Batch update sessions
if (sessionUpdates.length > 0) {
try {
await Promise.all(sessionUpdates.map(update =>
db.session.update({
where: { id: update.id },
data: { lastActiveAt: new Date(update.timestamp), active: true }
})
));
log({ module: 'session-cache' }, `Flushed ${sessionUpdates.length} session updates`);
} catch (error) {
log({ module: 'session-cache', level: 'error' }, `Error updating sessions: ${error}`);
}
}
// Batch update machines
if (machineUpdates.length > 0) {
try {
await Promise.all(machineUpdates.map(update =>
db.machine.update({
where: {
accountId_id: {
accountId: update.userId,
id: update.id
}
},
data: { lastActiveAt: new Date(update.timestamp) }
})
));
log({ module: 'session-cache' }, `Flushed ${machineUpdates.length} machine updates`);
} catch (error) {
log({ module: 'session-cache', level: 'error' }, `Error updating machines: ${error}`);
}
}
}
// Cleanup old cache entries periodically
cleanup(): void {
const now = Date.now();
for (const [sessionId, entry] of this.sessionCache.entries()) {
if (entry.validUntil < now) {
this.sessionCache.delete(sessionId);
}
}
for (const [machineId, entry] of this.machineCache.entries()) {
if (entry.validUntil < now) {
this.machineCache.delete(machineId);
}
}
}
shutdown(): void {
if (this.batchTimer) {
clearInterval(this.batchTimer);
this.batchTimer = null;
}
// Flush any remaining updates
this.flushPendingUpdates().catch(error => {
log({ module: 'session-cache', level: 'error' }, `Error flushing final updates: ${error}`);
});
}
}
// Global instance
export const activityCache = new ActivityCache();
// Cleanup every 5 minutes
setInterval(() => {
activityCache.cleanup();
}, 5 * 60 * 1000);

View File

@ -381,6 +381,11 @@
resolved "https://registry.yarnpkg.com/@noble/hashes/-/hashes-1.8.0.tgz#cee43d801fcef9644b11b8194857695acd5f815a"
integrity sha512-jCs9ldd7NwzpgXDIf6P3+NrHh9/sD6CQdxHyjQI+h/6rDNo88ypBxxz45UDuZHz9r3tNz7N/VInSVoVdtXEI4A==
"@opentelemetry/api@^1.4.0":
version "1.9.0"
resolved "https://registry.yarnpkg.com/@opentelemetry/api/-/api-1.9.0.tgz#d03eba68273dc0f7509e2a3d5cba21eae10379fe"
integrity sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==
"@peculiar/asn1-cms@^2.3.13", "@peculiar/asn1-cms@^2.3.15":
version "2.3.15"
resolved "https://registry.yarnpkg.com/@peculiar/asn1-cms/-/asn1-cms-2.3.15.tgz#8baf1fcf51dae2e9122126e13acf6a2e1698d35c"
@ -1024,6 +1029,11 @@ base64id@2.0.0, base64id@~2.0.0:
resolved "https://registry.yarnpkg.com/base64id/-/base64id-2.0.0.tgz#2770ac6bc47d312af97a8bf9a634342e0cd25cb6"
integrity sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog==
bintrees@1.0.2:
version "1.0.2"
resolved "https://registry.yarnpkg.com/bintrees/-/bintrees-1.0.2.tgz#49f896d6e858a4a499df85c38fb399b9aff840f8"
integrity sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw==
buffer-equal-constant-time@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/buffer-equal-constant-time/-/buffer-equal-constant-time-1.0.1.tgz#f8e71132f7ffe6e01a5c9697a4c6f3e48d5cc819"
@ -2037,6 +2047,14 @@ process@^0.11.10:
resolved "https://registry.yarnpkg.com/process/-/process-0.11.10.tgz#7332300e840161bda3e69a1d1d91a7d4bc16f182"
integrity sha512-cdGef/drWFoydD1JsMzuFf8100nZl+GT+yacc2bEced5f9Rjk4z+WtFUTBu9PhOi9j/jfmBPu0mMEY4wIdAF8A==
prom-client@^15.1.3:
version "15.1.3"
resolved "https://registry.yarnpkg.com/prom-client/-/prom-client-15.1.3.tgz#69fa8de93a88bc9783173db5f758dc1c69fa8fc2"
integrity sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g==
dependencies:
"@opentelemetry/api" "^1.4.0"
tdigest "^0.1.1"
proxy-from-env@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/proxy-from-env/-/proxy-from-env-1.1.0.tgz#e102f16ca355424865755d2c9e8ea4f24d58c3e2"
@ -2352,6 +2370,13 @@ supports-color@^7.1.0:
dependencies:
has-flag "^4.0.0"
tdigest@^0.1.1:
version "0.1.2"
resolved "https://registry.yarnpkg.com/tdigest/-/tdigest-0.1.2.tgz#96c64bac4ff10746b910b0e23b515794e12faced"
integrity sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA==
dependencies:
bintrees "1.0.2"
thread-stream@^3.0.0:
version "3.1.0"
resolved "https://registry.yarnpkg.com/thread-stream/-/thread-stream-3.1.0.tgz#4b2ef252a7c215064507d4ef70c05a5e2d34c4f1"