diff --git a/.env.example b/.env.dev similarity index 50% rename from .env.example rename to .env.dev index 9277a2e..0d31f60 100644 --- a/.env.example +++ b/.env.dev @@ -1,3 +1,6 @@ DATABASE_URL=postgresql://postgres:postgres@localhost:5432/handy HANDY_MASTER_SECRET=your-super-secret-key-for-local-development PORT=3005 + +# Uncomment to enable centralized logging for AI debugging (creates .logs directory) +DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING=true diff --git a/CLAUDE.md b/CLAUDE.md index fb2e7b6..d95a9a4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -175,4 +175,97 @@ The project includes a multi-stage Dockerfile: 3. ALWAYS prefer editing an existing file to creating a new one 4. NEVER proactively create documentation files (*.md) or README files unless explicitly requested 5. Use 4 spaces for tabs (not 2 spaces) -6. Use yarn instead of npm for all package management \ No newline at end of file +6. Use yarn instead of npm for all package management + +## Debugging Notes + +### Remote Logging Setup +- Use `DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING=true` env var to enable +- Server logs to `.logs/` directory with timestamped files (format: `MM-DD-HH-MM-SS.log`) +- Mobile and CLI send logs to `/logs-combined-from-cli-and-mobile-for-simple-ai-debugging` endpoint + +### Common Issues & Tells + +#### Socket/Connection Issues +- **Tell**: "Sending update to user-scoped connection" but mobile not updating +- **Tell**: Multiple "User disconnected" messages indicate socket instability +- **Tell**: "Response from the Engine was empty" = Prisma database connection lost + +#### Auth Flow Debugging +- CLI must hit `/v1/auth/request` to create auth request +- Mobile scans QR and hits `/v1/auth/response` to approve +- **Tell**: 404 on `/v1/auth/response` = server likely restarted/crashed +- **Tell**: "Auth failed - user not found" = token issue or user doesn't exist + +#### Session Creation Flow +- Sessions created via POST `/v1/sessions` with tag-based deduplication +- Server emits "new-session" update to all user connections +- **Tell**: Sessions created but not showing = mobile app not processing updates +- **Tell**: "pathname /" in mobile logs = app stuck at root screen + +#### Environment Variables +- CLI: Use `yarn dev:local-server` (NOT `yarn dev`) to load `.env.dev-local-server` +- Server: Use `yarn dev` to start with proper env files +- **Tell**: Wrong server URL = check `HAPPY_SERVER_URL` env var +- **Tell**: Wrong home dir = check `HAPPY_HOME_DIR` (should be `~/.happy-dev` for local) + +### Quick Diagnostic Commands + +#### IMPORTANT: Always Start Debugging With These +```bash +# 1. CHECK CURRENT TIME - Logs use local time, know what's current! +date + +# 2. CHECK LATEST LOG FILES - Server creates new logs on restart +ls -la .logs/*.log | tail -5 + +# 3. VERIFY YOU'RE LOOKING AT CURRENT LOGS +# Server logs are named: MM-DD-HH-MM-SS.log (month-day-hour-min-sec) +# If current time is 13:45 and latest log is 08-15-10-57-02.log from 10:57, +# that log started 3 hours ago but may still be active! +tail -1 .logs/[LATEST_LOG_FILE] # Check last entry timestamp +``` + +#### Common Debugging Patterns +```bash +# Check server logs for errors +tail -100 .logs/*.log | grep -E "(error|Error|ERROR|failed|Failed)" + +# Monitor session creation +tail -f .logs/*.log | grep -E "(new-session|Session created)" + +# Check active connections +tail -100 .logs/*.log | grep -E "(Token verified|User connected|User disconnected)" + +# See what endpoints are being hit +tail -100 .logs/*.log | grep "incoming request" + +# Debug socket real-time updates +tail -500 .logs/*.log | grep -A 2 -B 2 "new-session" | tail -30 +tail -200 .logs/*.log | grep -E "(websocket|Socket.*connected|Sending update)" | tail -30 + +# Track socket events from mobile client +tail -300 .logs/*.log | grep "remote-log.*mobile" | grep -E "(SyncSocket|handleUpdate)" | tail -20 + +# Monitor session creation flow end-to-end +tail -500 .logs/*.log | grep "session-create" | tail -20 +tail -500 .logs/*.log | grep "cmed556s4002bvb2020igg8jf" -A 3 -B 3 # Replace with actual session ID + +# Check auth flow for sessions API +tail -300 .logs/*.log | grep "auth-decorator.*sessions" | tail -10 + +# Debug machine registration and online status +tail -500 .logs/*.log | grep -E "(machine-alive|machine-register|update-machine)" | tail -20 +tail -500 .logs/*.log | grep "GET /v1/machines" | tail -10 +tail -500 .logs/*.log | grep "POST /v1/machines" | tail -10 + +# Check what mobile app is seeing +tail -500 .logs/*.log | grep "📊 Storage" | tail -20 +tail -500 .logs/*.log | grep "applySessions.*active" | tail -10 +``` + +#### Time Format Reference +- **CLI logs**: `[HH:MM:SS.mmm]` in local time (e.g., `[13:45:23.738]`) +- **Server logs**: Include both `time` (Unix ms) and `localTime` (HH:MM:ss.mmm) +- **Mobile logs**: Sent with `timestamp` in UTC, converted to `localTime` on server +- **All consolidated logs**: Have `localTime` field for easy correlation \ No newline at end of file diff --git a/package.json b/package.json index 4441978..078e89a 100644 --- a/package.json +++ b/package.json @@ -8,9 +8,9 @@ "scripts": { "build": "tsc --noEmit", "start": "tsx ./sources/main.ts", - "dev": "tsx --env-file=.env --env-file=.env.example ./sources/main.ts", + "dev": "lsof -ti tcp:3005 | xargs kill -9 && tsx --env-file=.env --env-file=.env.dev ./sources/main.ts", "test": "vitest run", - "migrate": "dotenv -e .env.example -- prisma migrate dev", + "migrate": "dotenv -e .env.dev -- prisma migrate dev", "generate": "prisma generate", "postinstall": "prisma generate", "db": "docker run -d -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=handy -v $(pwd)/.pgdata:/var/lib/postgresql/data -p 5432:5432 postgres", diff --git a/prisma/migrations/20250812092041_add_machine_model/migration.sql b/prisma/migrations/20250812092041_add_machine_model/migration.sql new file mode 100644 index 0000000..1e06047 --- /dev/null +++ b/prisma/migrations/20250812092041_add_machine_model/migration.sql @@ -0,0 +1,23 @@ +-- CreateTable +CREATE TABLE "Machine" ( + "id" TEXT NOT NULL, + "accountId" TEXT NOT NULL, + "metadata" TEXT NOT NULL, + "metadataVersion" INTEGER NOT NULL DEFAULT 0, + "seq" INTEGER NOT NULL DEFAULT 0, + "active" BOOLEAN NOT NULL DEFAULT true, + "lastActiveAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "Machine_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "Machine_accountId_idx" ON "Machine"("accountId"); + +-- CreateIndex +CREATE UNIQUE INDEX "Machine_accountId_id_key" ON "Machine"("accountId", "id"); + +-- AddForeignKey +ALTER TABLE "Machine" ADD CONSTRAINT "Machine_accountId_fkey" FOREIGN KEY ("accountId") REFERENCES "Account"("id") ON DELETE RESTRICT ON UPDATE CASCADE; diff --git a/prisma/migrations/20250818044258_add_daemon_state_to_machine/migration.sql b/prisma/migrations/20250818044258_add_daemon_state_to_machine/migration.sql new file mode 100644 index 0000000..ff78ad3 --- /dev/null +++ b/prisma/migrations/20250818044258_add_daemon_state_to_machine/migration.sql @@ -0,0 +1,3 @@ +-- AlterTable +ALTER TABLE "Machine" ADD COLUMN "daemonState" TEXT, +ADD COLUMN "daemonStateVersion" INTEGER NOT NULL DEFAULT 0; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index f5ad7f1..692a10e 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -31,6 +31,7 @@ model Account { TerminalAuthRequest TerminalAuthRequest[] AccountAuthRequest AccountAuthRequest[] UsageReport UsageReport[] + Machine Machine[] } model TerminalAuthRequest { @@ -148,3 +149,25 @@ model UsageReport { @@index([accountId]) @@index([sessionId]) } + +// +// Machines +// + +model Machine { + id String @id + accountId String + account Account @relation(fields: [accountId], references: [id]) + metadata String // Encrypted - contains static machine info + metadataVersion Int @default(0) + daemonState String? // Encrypted - contains dynamic daemon state + daemonStateVersion Int @default(0) + seq Int @default(0) + active Boolean @default(true) + lastActiveAt DateTime @default(now()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@unique([accountId, id]) + @@index([accountId]) +} diff --git a/sources/app/api.ts b/sources/app/api.ts index 1962453..435e19b 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -1,5 +1,5 @@ import fastify, { FastifyInstance } from "fastify"; -import { log } from "@/utils/log"; +import { log, logger } from "@/utils/log"; import { serializerCompiler, validatorCompiler, ZodTypeProvider } from "fastify-type-provider-zod"; import { Server, Socket } from "socket.io"; import { z } from "zod"; @@ -12,6 +12,13 @@ import { allocateSessionSeq, allocateUserSeq } from "@/services/seq"; import { randomKeyNaked } from "@/utils/randomKeyNaked"; import { AsyncLock } from "@/utils/lock"; + +// Recipient filter types +type RecipientFilter = + | { type: 'all-interested-in-session'; sessionId: string } + | { type: 'user-scoped-only' } + | { type: 'all-user-authenticated-connections' }; + // Connection metadata types interface SessionScopedConnection { connectionType: 'session-scoped'; @@ -79,13 +86,16 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> app.decorate('authenticate', async function (request: any, reply: any) { try { const authHeader = request.headers.authorization; + log({ module: 'auth-decorator' }, `Auth check - path: ${request.url}, has header: ${!!authHeader}, header start: ${authHeader?.substring(0, 50)}...`); if (!authHeader || !authHeader.startsWith('Bearer ')) { + log({ module: 'auth-decorator' }, `Auth failed - missing or invalid header`); return reply.code(401).send({ error: 'Missing authorization header' }); } const token = authHeader.substring(7); const verified = await tokenVerifier.verify(token); if (!verified) { + log({ module: 'auth-decorator' }, `Auth failed - invalid token`); return reply.code(401).send({ error: 'Invalid token' }); } @@ -95,9 +105,12 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }); if (!user) { + log({ module: 'auth-decorator' }, `Auth failed - user not found: ${verified.user}`); return reply.code(401).send({ error: 'User not found' }); } + log({ module: 'auth-decorator' }, `Auth success - user: ${user.id}`); + request.user = user; } catch (error) { return reply.code(401).send({ error: 'Authentication failed' }); @@ -105,11 +118,17 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }); // Send session update to all relevant connections - let emitUpdateToInterestedClients = ({ event, userId, sessionId, payload, skipSenderConnection }: { + let emitUpdateToInterestedClients = ({ + event, + userId, + payload, + recipientFilter = { type: 'all-user-authenticated-connections' }, + skipSenderConnection + }: { event: string, userId: string, - sessionId: string, payload: any, + recipientFilter?: RecipientFilter, skipSenderConnection?: ClientConnection }) => { const connections = userIdToClientConnections.get(userId); @@ -124,27 +143,33 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> continue; } - // Send to all user-scoped connections - we already matched user - if (connection.connectionType === 'user-scoped') { - log({ module: 'websocket' }, `Sending ${event} to user-scoped connection ${connection.socket.id}`); - connection.socket.emit(event, payload); + // Apply recipient filter + switch (recipientFilter.type) { + case 'all-interested-in-session': + // Send to session-scoped with matching session + all user-scoped + if (connection.connectionType === 'session-scoped') { + if (connection.sessionId !== recipientFilter.sessionId) { + continue; // Wrong session + } + } else if (connection.connectionType === 'machine-scoped') { + continue; // Machines don't need session updates + } + // user-scoped always gets it + break; + + case 'user-scoped-only': + if (connection.connectionType !== 'user-scoped') { + continue; + } + break; + + case 'all-user-authenticated-connections': + // Send to all connection types (default behavior) + break; } - // Send to all session-scoped connections, only that match sessionId - if (connection.connectionType === 'session-scoped') { - const matches = connection.sessionId === sessionId; - log({ module: 'websocket' }, `Session-scoped connection ${connection.socket.id}: sessionId=${connection.sessionId}, messageSessionId=${sessionId}, matches=${matches}`); - if (matches) { - log({ module: 'websocket' }, `Sending ${event} to session-scoped connection ${connection.socket.id}`); - connection.socket.emit(event, payload); - } - } - - // Send to all machine-scoped connections - they get all user updates - if (connection.connectionType === 'machine-scoped') { - log({ module: 'websocket' }, `Sending ${event} to machine-scoped connection ${connection.socket.id}`); - connection.socket.emit(event, payload); - } + log({ module: 'websocket' }, `Sending ${event} to ${connection.connectionType} connection ${connection.socket.id}`); + connection.socket.emit(event, payload); } } @@ -205,10 +230,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> return reply.code(401).send({ error: 'Invalid public key' }); } + const publicKeyHex = privacyKit.encodeHex(publicKey); + log({ module: 'auth-request' }, `Terminal auth request - publicKey hex: ${publicKeyHex}`); + const answer = await db.terminalAuthRequest.upsert({ - where: { publicKey: privacyKit.encodeHex(publicKey) }, + where: { publicKey: publicKeyHex }, update: {}, - create: { publicKey: privacyKit.encodeHex(publicKey) } + create: { publicKey: publicKeyHex } }); if (answer.response && answer.responseAccountId) { @@ -233,15 +261,26 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }) } }, async (request, reply) => { + log({ module: 'auth-response' }, `Auth response endpoint hit - user: ${request.user?.id || 'NO USER'}, publicKey: ${request.body.publicKey.substring(0, 20)}...`); const publicKey = privacyKit.decodeBase64(request.body.publicKey); const isValid = tweetnacl.box.publicKeyLength === publicKey.length; if (!isValid) { + log({ module: 'auth-response' }, `Invalid public key length: ${publicKey.length}`); return reply.code(401).send({ error: 'Invalid public key' }); } + const publicKeyHex = privacyKit.encodeHex(publicKey); + log({ module: 'auth-response' }, `Looking for auth request with publicKey hex: ${publicKeyHex}`); const authRequest = await db.terminalAuthRequest.findUnique({ - where: { publicKey: privacyKit.encodeHex(publicKey) } + where: { publicKey: publicKeyHex } }); if (!authRequest) { + log({ module: 'auth-response' }, `Auth request not found for publicKey: ${publicKeyHex}`); + // Let's also check what auth requests exist + const allRequests = await db.terminalAuthRequest.findMany({ + take: 5, + orderBy: { createdAt: 'desc' } + }); + log({ module: 'auth-response' }, `Recent auth requests in DB: ${JSON.stringify(allRequests.map(r => ({ id: r.id, publicKey: r.publicKey.substring(0, 20) + '...', hasResponse: !!r.response })))}`); return reply.code(404).send({ error: 'Request not found' }); } if (!authRequest.response) { @@ -426,7 +465,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> const lastMessage = v.messages[0]; const sessionUpdatedAt = v.updatedAt.getTime(); const lastMessageCreatedAt = lastMessage ? lastMessage.createdAt.getTime() : 0; - + return { id: v.id, seq: v.seq, @@ -471,6 +510,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }); if (session) { + logger.info({ module: 'session-create', sessionId: session.id, userId, tag }, `Found existing session: ${session.id} for tag ${tag}`); return reply.send({ session: { id: session.id, @@ -492,6 +532,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> const updSeq = await allocateUserSeq(userId); // Create session + logger.info({ module: 'session-create', userId, tag }, `Creating new session for user ${userId} with tag ${tag}`); const session = await db.session.create({ data: { accountId: userId, @@ -499,6 +540,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> metadata: metadata } }); + logger.info({ module: 'session-create', sessionId: session.id, userId }, `Session created: ${session.id}`); // Create update const updContent: PrismaJson.UpdateBody = { @@ -516,16 +558,24 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }; // Emit update to connected sockets + const updatePayload = { + id: randomKeyNaked(12), + seq: updSeq, + body: updContent, + createdAt: Date.now() + }; + logger.info({ + module: 'session-create', + userId, + sessionId: session.id, + updateType: 'new-session', + updatePayload: JSON.stringify(updatePayload) + }, `Emitting new-session update to all user connections`); emitUpdateToInterestedClients({ event: 'update', userId, - sessionId: session.id, - payload: { - id: randomKeyNaked(12), - seq: updSeq, - body: updContent, - createdAt: Date.now() - } + payload: updatePayload, + recipientFilter: { type: 'all-user-authenticated-connections' } }); return reply.send({ @@ -753,18 +803,18 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }; - // Get all user connections (not session-specific) - const connections = userIdToClientConnections.get(userId); - if (connections) { - for (const connection of connections) { - connection.socket.emit('update', { - id: randomKeyNaked(12), - seq: updSeq, - body: updContent, - createdAt: Date.now() - }); - } - } + // Send to all user connections + emitUpdateToInterestedClients({ + event: 'update', + userId, + payload: { + id: randomKeyNaked(12), + seq: updSeq, + body: updContent, + createdAt: Date.now() + }, + recipientFilter: { type: 'all-user-authenticated-connections' } + }); return reply.send({ success: true, @@ -964,6 +1014,220 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }); }); + // Machines + + // POST /v1/machines - Create machine or return existing + typed.post('/v1/machines', { + preHandler: app.authenticate, + schema: { + body: z.object({ + id: z.string(), + metadata: z.string(), // Encrypted metadata + daemonState: z.string().optional() // Encrypted daemon state + }) + } + }, async (request, reply) => { + const userId = request.user.id; + const { id, metadata, daemonState } = request.body; + + // Check if machine exists (like sessions do) + const machine = await db.machine.findFirst({ + where: { + accountId: userId, + id: id + } + }); + + if (machine) { + // Machine exists - just return it + logger.info({ module: 'machines', machineId: id, userId }, 'Found existing machine'); + return reply.send({ + machine: { + id: machine.id, + metadata: machine.metadata, + metadataVersion: machine.metadataVersion, + daemonState: machine.daemonState, + daemonStateVersion: machine.daemonStateVersion, + active: machine.active, + activeAt: machine.lastActiveAt.getTime(), // Return as activeAt for API consistency + createdAt: machine.createdAt.getTime(), + updatedAt: machine.updatedAt.getTime() + } + }); + } else { + // Create new machine + logger.info({ module: 'machines', machineId: id, userId }, 'Creating new machine'); + + const newMachine = await db.machine.create({ + data: { + id, + accountId: userId, + metadata, + metadataVersion: 1, + daemonState: daemonState || null, + daemonStateVersion: daemonState ? 1 : 0 + // active defaults to true in schema + // lastActiveAt defaults to now() in schema + } + }); + + // Emit update for new machine + const updSeq = await allocateUserSeq(userId); + emitUpdateToInterestedClients({ + event: 'update', + userId, + payload: { + id: randomKeyNaked(12), + seq: updSeq, + body: { + t: 'update-machine', + machineId: newMachine.id, + metadata: { + version: 1, + value: metadata + } + }, + createdAt: Date.now() + } + }); + + return reply.send({ + machine: { + id: newMachine.id, + metadata: newMachine.metadata, + metadataVersion: newMachine.metadataVersion, + daemonState: newMachine.daemonState, + daemonStateVersion: newMachine.daemonStateVersion, + active: newMachine.active, + activeAt: newMachine.lastActiveAt.getTime(), // Return as activeAt for API consistency + createdAt: newMachine.createdAt.getTime(), + updatedAt: newMachine.updatedAt.getTime() + } + }); + } + }); + + + // Machines API + typed.get('/v1/machines', { + preHandler: app.authenticate, + }, async (request, reply) => { + const userId = request.user.id; + + const machines = await db.machine.findMany({ + where: { accountId: userId }, + orderBy: { lastActiveAt: 'desc' } + }); + + return machines.map(m => ({ + id: m.id, + metadata: m.metadata, + metadataVersion: m.metadataVersion, + daemonState: m.daemonState, + daemonStateVersion: m.daemonStateVersion, + seq: m.seq, + active: m.active, + activeAt: m.lastActiveAt.getTime(), + createdAt: m.createdAt.getTime(), + updatedAt: m.updatedAt.getTime() + })); + }); + + // GET /v1/machines/:id - Get single machine by ID + typed.get('/v1/machines/:id', { + preHandler: app.authenticate, + schema: { + params: z.object({ + id: z.string() + }) + } + }, async (request, reply) => { + const userId = request.user.id; + const { id } = request.params; + + const machine = await db.machine.findFirst({ + where: { + accountId: userId, + id: id + } + }); + + if (!machine) { + return reply.code(404).send({ error: 'Machine not found' }); + } + + return { + machine: { + id: machine.id, + metadata: machine.metadata, + metadataVersion: machine.metadataVersion, + daemonState: machine.daemonState, + daemonStateVersion: machine.daemonStateVersion, + seq: machine.seq, + active: machine.active, + activeAt: machine.lastActiveAt.getTime(), + createdAt: machine.createdAt.getTime(), + updatedAt: machine.updatedAt.getTime() + } + }; + }); + + // Combined logging endpoint (only when explicitly enabled) + if (process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING) { + typed.post('/logs-combined-from-cli-and-mobile-for-simple-ai-debugging', { + schema: { + body: z.object({ + timestamp: z.string(), + level: z.string(), + message: z.string(), + messageRawObject: z.any().optional(), + source: z.enum(['mobile', 'cli']), + platform: z.string().optional() + }) + } + }, async (request, reply) => { + const { timestamp, level, message, source, platform } = request.body; + + // Log ONLY to separate remote logger (file only, no console) + const logData = { + source, + platform, + timestamp + }; + + // Use the file-only logger if available + const { fileConsolidatedLogger } = await import('@/utils/log'); + + if (!fileConsolidatedLogger) { + // Should never happen since we check env var above, but be safe + return reply.send({ success: true }); + } + + switch (level.toLowerCase()) { + case 'error': + fileConsolidatedLogger.error(logData, message); + break; + case 'warn': + case 'warning': + fileConsolidatedLogger.warn(logData, message); + break; + case 'debug': + fileConsolidatedLogger.debug(logData, message); + break; + default: + fileConsolidatedLogger.info(logData, message); + } + + return reply.send({ success: true }); + }); + } + + // Catch-all route for debugging 404s + app.setNotFoundHandler((request, reply) => { + log({ module: '404-handler' }, `404 - Method: ${request.method}, Path: ${request.url}, Headers: ${JSON.stringify(request.headers)}`); + reply.code(404).send({ error: 'Not found', path: request.url, method: request.method }); + }); + // Start const port = process.env.PORT ? parseInt(process.env.PORT, 10) : 3005; await app.listen({ port, host: '0.0.0.0' }); @@ -1018,7 +1282,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> socket.disconnect(); return; } - + // Validate machine-scoped clients have machineId if (clientType === 'machine-scoped' && !machineId) { log({ module: 'websocket' }, `Machine-scoped client missing machineId`); @@ -1066,20 +1330,20 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> userIdToClientConnections.set(userId, new Set()); } userIdToClientConnections.get(userId)!.add(connection); - + // Broadcast daemon online status if (connection.connectionType === 'machine-scoped') { // Broadcast daemon online emitUpdateToInterestedClients({ event: 'ephemeral', userId, - sessionId: '', // No specific session payload: { - type: 'daemon-status', - machineId, - status: 'online', - timestamp: Date.now() - } + type: 'machine-activity', + id: machineId, + active: true, + activeAt: Date.now() + }, + recipientFilter: { type: 'user-scoped-only' } }); } @@ -1120,37 +1384,44 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } log({ module: 'websocket' }, `User disconnected: ${userId}`); - + // Broadcast daemon offline status if (connection.connectionType === 'machine-scoped') { emitUpdateToInterestedClients({ event: 'ephemeral', userId, - sessionId: '', // No specific session payload: { - type: 'daemon-status', - machineId: connection.machineId, - status: 'offline', - timestamp: Date.now() - } + type: 'machine-activity', + id: connection.machineId, + active: false, + activeAt: Date.now() + }, + recipientFilter: { type: 'user-scoped-only' } }); } }); - socket.on('session-alive', async (data: any) => { + socket.on('session-alive', async (data: { + sid: string; + time: number; + thinking?: boolean; + }) => { try { - const { sid, time, thinking } = data; - let t = time; - if (typeof t !== 'number') { + // Basic validation + if (!data || typeof data.time !== 'number' || !data.sid) { return; } + + let t = data.time; if (t > Date.now()) { t = Date.now(); } - if (t < Date.now() - 1000 * 60 * 10) { // Ignore if time is in the past 10 minutes + if (t < Date.now() - 1000 * 60 * 10) { return; } + const { sid, thinking } = data; + // Resolve session const session = await db.session.findUnique({ where: { id: sid, accountId: userId } @@ -1159,31 +1430,96 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> return; } - // Update last active at + // Update last active await db.session.update({ where: { id: sid }, data: { lastActiveAt: new Date(t), active: true } }); - // Emit update to connected sockets + // Emit update emitUpdateToInterestedClients({ event: 'ephemeral', userId, - sessionId: sid, payload: { type: 'activity', id: sid, active: true, activeAt: t, - thinking - } + thinking: thinking || false + }, + recipientFilter: { type: 'all-user-authenticated-connections' } }); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in session-alive: ${error}`); } }); - socket.on('session-end', async (data: any) => { + socket.on('machine-alive', async (data: { + machineId: string; + time: number; + }) => { + try { + // Basic validation + if (!data || typeof data.time !== 'number' || !data.machineId) { + return; + } + + let t = data.time; + if (t > Date.now()) { + t = Date.now(); + } + if (t < Date.now() - 1000 * 60 * 10) { + return; + } + + // Resolve machine + const machine = await db.machine.findUnique({ + where: { + accountId_id: { + accountId: userId, + id: data.machineId + } + } + }); + + if (!machine) { + return; + } + + // Update machine lastActiveAt in database + const updatedMachine = await db.machine.update({ + where: { + accountId_id: { + accountId: userId, + id: data.machineId + } + }, + data: { + lastActiveAt: new Date(t), + active: true + } + }); + + emitUpdateToInterestedClients({ + event: 'ephemeral', + userId, + payload: { + type: 'machine-activity', + id: updatedMachine.id, + active: true, + activeAt: t, + }, + recipientFilter: { type: 'user-scoped-only' } + }); + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Error in machine-alive: ${error}`); + } + }); + + socket.on('session-end', async (data: { + sid: string; + time: number; + }) => { try { const { sid, time } = data; let t = time; @@ -1215,14 +1551,14 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'ephemeral', userId, - sessionId: sid, payload: { type: 'activity', id: sid, active: false, activeAt: t, thinking: false - } + }, + recipientFilter: { type: 'all-user-authenticated-connections' } }); } catch (error) { log({ module: 'websocket', level: 'error' }, `Error in session-end: ${error}`); @@ -1293,13 +1629,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'update', userId, - sessionId: sid, payload: { id: randomKeyNaked(12), seq: updSeq, body: update, createdAt: Date.now() }, + recipientFilter: { type: 'all-interested-in-session', sessionId: sid }, skipSenderConnection: connection }); } catch (error) { @@ -1360,13 +1696,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'update', userId, - sessionId: sid, payload: { id: randomKeyNaked(12), seq: updSeq, body: updContent, createdAt: Date.now() - } + }, + recipientFilter: { type: 'all-interested-in-session', sessionId: sid } }); // Send success response with new version via callback @@ -1437,13 +1773,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'update', userId, - sessionId: sid, payload: { id: randomKeyNaked(12), seq: updSeq, body: updContent, createdAt: Date.now() - } + }, + recipientFilter: { type: 'all-interested-in-session', sessionId: sid } }); // Send success response with new version via callback @@ -1456,6 +1792,217 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }); + // Machine metadata update with optimistic concurrency control + socket.on('machine-update-metadata', async (data: any, callback: (response: any) => void) => { + try { + const { machineId, metadata, expectedVersion } = data; + + // Validate input + if (!machineId || typeof metadata !== 'string' || typeof expectedVersion !== 'number') { + if (callback) { + callback({ result: 'error', message: 'Invalid parameters' }); + } + return; + } + + // Resolve machine + const machine = await db.machine.findFirst({ + where: { + accountId: userId, + id: machineId + } + }); + if (!machine) { + if (callback) { + callback({ result: 'error', message: 'Machine not found' }); + } + return; + } + + // Check version + if (machine.metadataVersion !== expectedVersion) { + callback({ + result: 'version-mismatch', + version: machine.metadataVersion, + metadata: machine.metadata + }); + return; + } + + // Update metadata with atomic version check + const { count } = await db.machine.updateMany({ + where: { + accountId: userId, + id: machineId, + metadataVersion: expectedVersion // Atomic CAS + }, + data: { + metadata: metadata, + metadataVersion: expectedVersion + 1 + // NOT updating active or lastActiveAt here + } + }); + + if (count === 0) { + // Re-fetch current version + const current = await db.machine.findFirst({ + where: { + accountId: userId, + id: machineId + } + }); + callback({ + result: 'version-mismatch', + version: current?.metadataVersion || 0, + metadata: current?.metadata + }); + return; + } + + // Generate update + const updSeq = await allocateUserSeq(userId); + const updContent: PrismaJson.UpdateBody = { + t: 'update-machine', + machineId: machineId, + metadata: { + value: metadata, + version: expectedVersion + 1 + } + }; + + // Emit to all connections + emitUpdateToInterestedClients({ + event: 'update', + userId, + payload: { + id: randomKeyNaked(12), + seq: updSeq, + body: updContent, + createdAt: Date.now() + }, + recipientFilter: { type: 'all-user-authenticated-connections' } + }); + + // Send success response with new version + callback({ + result: 'success', + version: expectedVersion + 1, + metadata: metadata + }); + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Error in machine-update-metadata: ${error}`); + if (callback) { + callback({ result: 'error', message: 'Internal error' }); + } + } + }); + + // Machine daemon state update with optimistic concurrency control + socket.on('machine-update-state', async (data: any, callback: (response: any) => void) => { + try { + const { machineId, daemonState, expectedVersion } = data; + + // Validate input + if (!machineId || typeof daemonState !== 'string' || typeof expectedVersion !== 'number') { + if (callback) { + callback({ result: 'error', message: 'Invalid parameters' }); + } + return; + } + + // Resolve machine + const machine = await db.machine.findFirst({ + where: { + accountId: userId, + id: machineId + } + }); + if (!machine) { + if (callback) { + callback({ result: 'error', message: 'Machine not found' }); + } + return; + } + + // Check version + if (machine.daemonStateVersion !== expectedVersion) { + callback({ + result: 'version-mismatch', + version: machine.daemonStateVersion, + daemonState: machine.daemonState + }); + return; + } + + // Update daemon state with atomic version check + const { count } = await db.machine.updateMany({ + where: { + accountId: userId, + id: machineId, + daemonStateVersion: expectedVersion // Atomic CAS + }, + data: { + daemonState: daemonState, + daemonStateVersion: expectedVersion + 1, + active: true, + lastActiveAt: new Date() + } + }); + + if (count === 0) { + // Re-fetch current version + const current = await db.machine.findFirst({ + where: { + accountId: userId, + id: machineId + } + }); + callback({ + result: 'version-mismatch', + version: current?.daemonStateVersion || 0, + daemonState: current?.daemonState + }); + return; + } + + // Generate update + const updSeq = await allocateUserSeq(userId); + const updContent: PrismaJson.UpdateBody = { + t: 'update-machine', + machineId: machineId, + daemonState: { + value: daemonState, + version: expectedVersion + 1 + } + }; + + // Emit to all connections + emitUpdateToInterestedClients({ + event: 'update', + userId, + payload: { + id: randomKeyNaked(12), + seq: updSeq, + body: updContent, + createdAt: Date.now() + }, + recipientFilter: { type: 'all-user-authenticated-connections' } + }); + + // Send success response with new version + callback({ + result: 'success', + version: expectedVersion + 1, + daemonState: daemonState + }); + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Error in machine-update-state: ${error}`); + if (callback) { + callback({ result: 'error', message: 'Internal error' }); + } + } + }); + // RPC register - Register this socket as a listener for an RPC method socket.on('rpc-register', async (data: any) => { try { @@ -1718,7 +2265,6 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> emitUpdateToInterestedClients({ event: 'ephemeral', userId, - sessionId, payload: { type: 'usage', id: sessionId, @@ -1726,7 +2272,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> tokens: usageData.tokens, cost: usageData.cost, timestamp: Date.now() - } + }, + recipientFilter: { type: 'user-scoped-only' } }); } diff --git a/sources/storage/types.ts b/sources/storage/types.ts index eb12f66..a564f2c 100644 --- a/sources/storage/types.ts +++ b/sources/storage/types.ts @@ -60,6 +60,18 @@ declare global { value: string | null; version: number; } | null | undefined; + } | { + t: 'update-machine'; + machineId: string; + metadata?: { + value: string; + version: number; + }; + daemonState?: { + value: string; + version: number; + }; + activeAt?: number; }; } } diff --git a/sources/utils/log.ts b/sources/utils/log.ts index a2d9cc5..d07402a 100644 --- a/sources/utils/log.ts +++ b/sources/utils/log.ts @@ -2,17 +2,37 @@ import pino from 'pino'; import { mkdirSync } from 'fs'; import { join } from 'path'; -const isDebug = process.env.DEBUG === 'true' || process.env.NODE_ENV === 'development'; -const logsDir = join(process.cwd(), '.logs'); +// Single log file name created once at startup +let consolidatedLogFile: string | undefined; -if (isDebug) { +if (process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING) { + const logsDir = join(process.cwd(), '.logs'); try { mkdirSync(logsDir, { recursive: true }); + // Create filename once at startup + const now = new Date(); + const month = String(now.getMonth() + 1).padStart(2, '0'); + const day = String(now.getDate()).padStart(2, '0'); + const hour = String(now.getHours()).padStart(2, '0'); + const min = String(now.getMinutes()).padStart(2, '0'); + const sec = String(now.getSeconds()).padStart(2, '0'); + consolidatedLogFile = join(logsDir, `${month}-${day}-${hour}-${min}-${sec}.log`); + console.log(`[PINO] Remote debugging logs enabled - writing to ${consolidatedLogFile}`); } catch (error) { console.error('Failed to create logs directory:', error); } } +// Format time as HH:MM:ss.mmm in local time +function formatLocalTime(timestamp?: number) { + const date = timestamp ? new Date(timestamp) : new Date(); + const hours = String(date.getHours()).padStart(2, '0'); + const mins = String(date.getMinutes()).padStart(2, '0'); + const secs = String(date.getSeconds()).padStart(2, '0'); + const ms = String(date.getMilliseconds()).padStart(3, '0'); + return `${hours}:${mins}:${secs}.${ms}`; +} + const transports: any[] = []; transports.push({ @@ -21,28 +41,66 @@ transports.push({ colorize: true, translateTime: 'HH:MM:ss.l', ignore: 'pid,hostname', - messageFormat: '{levelLabel} [{time}] {msg}', + messageFormat: '{levelLabel} {msg} | [{time}]', errorLikeObjectKeys: ['err', 'error'], }, }); -if (isDebug) { +if (process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING && consolidatedLogFile) { transports.push({ target: 'pino/file', options: { - destination: join(logsDir, `server-${new Date().toISOString().split('T')[0]}.log`), + destination: consolidatedLogFile, mkdir: true, + messageFormat: '{levelLabel} {msg} | [server time: {time}]', }, }); } +// Main server logger with local time formatting export const logger = pino({ - level: isDebug ? 'debug' : 'info', + level: 'debug', transport: { targets: transports, }, + formatters: { + log: (object: any) => { + // Add localTime to every log entry + return { + ...object, + localTime: formatLocalTime(typeof object.time === 'number' ? object.time : undefined), + }; + } + }, + timestamp: () => `,"time":${Date.now()},"localTime":"${formatLocalTime()}"`, }); +// Optional file-only logger for remote logs from CLI/mobile +export const fileConsolidatedLogger = process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING && consolidatedLogFile ? + pino({ + level: 'debug', + transport: { + targets: [{ + target: 'pino/file', + options: { + destination: consolidatedLogFile, + mkdir: true, + }, + }], + }, + formatters: { + log: (object: any) => { + // Add localTime to every log entry + // Note: source property already exists from CLI/mobile logs + return { + ...object, + localTime: formatLocalTime(typeof object.time === 'number' ? object.time : undefined), + }; + } + }, + timestamp: () => `,"time":${Date.now()},"localTime":"${formatLocalTime()}"`, + }) : undefined; + export function log(src: any, ...args: any[]) { logger.info(src, ...args); }