Merge remote-tracking branch 'origin/daemon-persistent'

This commit is contained in:
Steve Korshakov 2025-08-18 18:56:44 -07:00
commit 3594b675a8
9 changed files with 852 additions and 90 deletions

View File

@ -1,3 +1,6 @@
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/handy
HANDY_MASTER_SECRET=your-super-secret-key-for-local-development
PORT=3005
# Uncomment to enable centralized logging for AI debugging (creates .logs directory)
DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING=true

View File

@ -175,4 +175,97 @@ The project includes a multi-stage Dockerfile:
3. ALWAYS prefer editing an existing file to creating a new one
4. NEVER proactively create documentation files (*.md) or README files unless explicitly requested
5. Use 4 spaces for tabs (not 2 spaces)
6. Use yarn instead of npm for all package management
6. Use yarn instead of npm for all package management
## Debugging Notes
### Remote Logging Setup
- Use `DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING=true` env var to enable
- Server logs to `.logs/` directory with timestamped files (format: `MM-DD-HH-MM-SS.log`)
- Mobile and CLI send logs to `/logs-combined-from-cli-and-mobile-for-simple-ai-debugging` endpoint
### Common Issues & Tells
#### Socket/Connection Issues
- **Tell**: "Sending update to user-scoped connection" but mobile not updating
- **Tell**: Multiple "User disconnected" messages indicate socket instability
- **Tell**: "Response from the Engine was empty" = Prisma database connection lost
#### Auth Flow Debugging
- CLI must hit `/v1/auth/request` to create auth request
- Mobile scans QR and hits `/v1/auth/response` to approve
- **Tell**: 404 on `/v1/auth/response` = server likely restarted/crashed
- **Tell**: "Auth failed - user not found" = token issue or user doesn't exist
#### Session Creation Flow
- Sessions created via POST `/v1/sessions` with tag-based deduplication
- Server emits "new-session" update to all user connections
- **Tell**: Sessions created but not showing = mobile app not processing updates
- **Tell**: "pathname /" in mobile logs = app stuck at root screen
#### Environment Variables
- CLI: Use `yarn dev:local-server` (NOT `yarn dev`) to load `.env.dev-local-server`
- Server: Use `yarn dev` to start with proper env files
- **Tell**: Wrong server URL = check `HAPPY_SERVER_URL` env var
- **Tell**: Wrong home dir = check `HAPPY_HOME_DIR` (should be `~/.happy-dev` for local)
### Quick Diagnostic Commands
#### IMPORTANT: Always Start Debugging With These
```bash
# 1. CHECK CURRENT TIME - Logs use local time, know what's current!
date
# 2. CHECK LATEST LOG FILES - Server creates new logs on restart
ls -la .logs/*.log | tail -5
# 3. VERIFY YOU'RE LOOKING AT CURRENT LOGS
# Server logs are named: MM-DD-HH-MM-SS.log (month-day-hour-min-sec)
# If current time is 13:45 and latest log is 08-15-10-57-02.log from 10:57,
# that log started 3 hours ago but may still be active!
tail -1 .logs/[LATEST_LOG_FILE] # Check last entry timestamp
```
#### Common Debugging Patterns
```bash
# Check server logs for errors
tail -100 .logs/*.log | grep -E "(error|Error|ERROR|failed|Failed)"
# Monitor session creation
tail -f .logs/*.log | grep -E "(new-session|Session created)"
# Check active connections
tail -100 .logs/*.log | grep -E "(Token verified|User connected|User disconnected)"
# See what endpoints are being hit
tail -100 .logs/*.log | grep "incoming request"
# Debug socket real-time updates
tail -500 .logs/*.log | grep -A 2 -B 2 "new-session" | tail -30
tail -200 .logs/*.log | grep -E "(websocket|Socket.*connected|Sending update)" | tail -30
# Track socket events from mobile client
tail -300 .logs/*.log | grep "remote-log.*mobile" | grep -E "(SyncSocket|handleUpdate)" | tail -20
# Monitor session creation flow end-to-end
tail -500 .logs/*.log | grep "session-create" | tail -20
tail -500 .logs/*.log | grep "cmed556s4002bvb2020igg8jf" -A 3 -B 3 # Replace with actual session ID
# Check auth flow for sessions API
tail -300 .logs/*.log | grep "auth-decorator.*sessions" | tail -10
# Debug machine registration and online status
tail -500 .logs/*.log | grep -E "(machine-alive|machine-register|update-machine)" | tail -20
tail -500 .logs/*.log | grep "GET /v1/machines" | tail -10
tail -500 .logs/*.log | grep "POST /v1/machines" | tail -10
# Check what mobile app is seeing
tail -500 .logs/*.log | grep "📊 Storage" | tail -20
tail -500 .logs/*.log | grep "applySessions.*active" | tail -10
```
#### Time Format Reference
- **CLI logs**: `[HH:MM:SS.mmm]` in local time (e.g., `[13:45:23.738]`)
- **Server logs**: Include both `time` (Unix ms) and `localTime` (HH:MM:ss.mmm)
- **Mobile logs**: Sent with `timestamp` in UTC, converted to `localTime` on server
- **All consolidated logs**: Have `localTime` field for easy correlation

View File

@ -8,9 +8,9 @@
"scripts": {
"build": "tsc --noEmit",
"start": "tsx ./sources/main.ts",
"dev": "tsx --env-file=.env --env-file=.env.example ./sources/main.ts",
"dev": "lsof -ti tcp:3005 | xargs kill -9 && tsx --env-file=.env --env-file=.env.dev ./sources/main.ts",
"test": "vitest run",
"migrate": "dotenv -e .env.example -- prisma migrate dev",
"migrate": "dotenv -e .env.dev -- prisma migrate dev",
"generate": "prisma generate",
"postinstall": "prisma generate",
"db": "docker run -d -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=handy -v $(pwd)/.pgdata:/var/lib/postgresql/data -p 5432:5432 postgres",

View File

@ -0,0 +1,23 @@
-- CreateTable
CREATE TABLE "Machine" (
"id" TEXT NOT NULL,
"accountId" TEXT NOT NULL,
"metadata" TEXT NOT NULL,
"metadataVersion" INTEGER NOT NULL DEFAULT 0,
"seq" INTEGER NOT NULL DEFAULT 0,
"active" BOOLEAN NOT NULL DEFAULT true,
"lastActiveAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
CONSTRAINT "Machine_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "Machine_accountId_idx" ON "Machine"("accountId");
-- CreateIndex
CREATE UNIQUE INDEX "Machine_accountId_id_key" ON "Machine"("accountId", "id");
-- AddForeignKey
ALTER TABLE "Machine" ADD CONSTRAINT "Machine_accountId_fkey" FOREIGN KEY ("accountId") REFERENCES "Account"("id") ON DELETE RESTRICT ON UPDATE CASCADE;

View File

@ -0,0 +1,3 @@
-- AlterTable
ALTER TABLE "Machine" ADD COLUMN "daemonState" TEXT,
ADD COLUMN "daemonStateVersion" INTEGER NOT NULL DEFAULT 0;

View File

@ -31,6 +31,7 @@ model Account {
TerminalAuthRequest TerminalAuthRequest[]
AccountAuthRequest AccountAuthRequest[]
UsageReport UsageReport[]
Machine Machine[]
}
model TerminalAuthRequest {
@ -148,3 +149,25 @@ model UsageReport {
@@index([accountId])
@@index([sessionId])
}
//
// Machines
//
model Machine {
id String @id
accountId String
account Account @relation(fields: [accountId], references: [id])
metadata String // Encrypted - contains static machine info
metadataVersion Int @default(0)
daemonState String? // Encrypted - contains dynamic daemon state
daemonStateVersion Int @default(0)
seq Int @default(0)
active Boolean @default(true)
lastActiveAt DateTime @default(now())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([accountId, id])
@@index([accountId])
}

View File

@ -1,5 +1,5 @@
import fastify, { FastifyInstance } from "fastify";
import { log } from "@/utils/log";
import { log, logger } from "@/utils/log";
import { serializerCompiler, validatorCompiler, ZodTypeProvider } from "fastify-type-provider-zod";
import { Server, Socket } from "socket.io";
import { z } from "zod";
@ -12,6 +12,13 @@ import { allocateSessionSeq, allocateUserSeq } from "@/services/seq";
import { randomKeyNaked } from "@/utils/randomKeyNaked";
import { AsyncLock } from "@/utils/lock";
// Recipient filter types
type RecipientFilter =
| { type: 'all-interested-in-session'; sessionId: string }
| { type: 'user-scoped-only' }
| { type: 'all-user-authenticated-connections' };
// Connection metadata types
interface SessionScopedConnection {
connectionType: 'session-scoped';
@ -79,13 +86,16 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
app.decorate('authenticate', async function (request: any, reply: any) {
try {
const authHeader = request.headers.authorization;
log({ module: 'auth-decorator' }, `Auth check - path: ${request.url}, has header: ${!!authHeader}, header start: ${authHeader?.substring(0, 50)}...`);
if (!authHeader || !authHeader.startsWith('Bearer ')) {
log({ module: 'auth-decorator' }, `Auth failed - missing or invalid header`);
return reply.code(401).send({ error: 'Missing authorization header' });
}
const token = authHeader.substring(7);
const verified = await tokenVerifier.verify(token);
if (!verified) {
log({ module: 'auth-decorator' }, `Auth failed - invalid token`);
return reply.code(401).send({ error: 'Invalid token' });
}
@ -95,9 +105,12 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
});
if (!user) {
log({ module: 'auth-decorator' }, `Auth failed - user not found: ${verified.user}`);
return reply.code(401).send({ error: 'User not found' });
}
log({ module: 'auth-decorator' }, `Auth success - user: ${user.id}`);
request.user = user;
} catch (error) {
return reply.code(401).send({ error: 'Authentication failed' });
@ -105,11 +118,17 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
});
// Send session update to all relevant connections
let emitUpdateToInterestedClients = ({ event, userId, sessionId, payload, skipSenderConnection }: {
let emitUpdateToInterestedClients = ({
event,
userId,
payload,
recipientFilter = { type: 'all-user-authenticated-connections' },
skipSenderConnection
}: {
event: string,
userId: string,
sessionId: string,
payload: any,
recipientFilter?: RecipientFilter,
skipSenderConnection?: ClientConnection
}) => {
const connections = userIdToClientConnections.get(userId);
@ -124,27 +143,33 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
continue;
}
// Send to all user-scoped connections - we already matched user
if (connection.connectionType === 'user-scoped') {
log({ module: 'websocket' }, `Sending ${event} to user-scoped connection ${connection.socket.id}`);
connection.socket.emit(event, payload);
// Apply recipient filter
switch (recipientFilter.type) {
case 'all-interested-in-session':
// Send to session-scoped with matching session + all user-scoped
if (connection.connectionType === 'session-scoped') {
if (connection.sessionId !== recipientFilter.sessionId) {
continue; // Wrong session
}
} else if (connection.connectionType === 'machine-scoped') {
continue; // Machines don't need session updates
}
// user-scoped always gets it
break;
case 'user-scoped-only':
if (connection.connectionType !== 'user-scoped') {
continue;
}
break;
case 'all-user-authenticated-connections':
// Send to all connection types (default behavior)
break;
}
// Send to all session-scoped connections, only that match sessionId
if (connection.connectionType === 'session-scoped') {
const matches = connection.sessionId === sessionId;
log({ module: 'websocket' }, `Session-scoped connection ${connection.socket.id}: sessionId=${connection.sessionId}, messageSessionId=${sessionId}, matches=${matches}`);
if (matches) {
log({ module: 'websocket' }, `Sending ${event} to session-scoped connection ${connection.socket.id}`);
connection.socket.emit(event, payload);
}
}
// Send to all machine-scoped connections - they get all user updates
if (connection.connectionType === 'machine-scoped') {
log({ module: 'websocket' }, `Sending ${event} to machine-scoped connection ${connection.socket.id}`);
connection.socket.emit(event, payload);
}
log({ module: 'websocket' }, `Sending ${event} to ${connection.connectionType} connection ${connection.socket.id}`);
connection.socket.emit(event, payload);
}
}
@ -205,10 +230,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
return reply.code(401).send({ error: 'Invalid public key' });
}
const publicKeyHex = privacyKit.encodeHex(publicKey);
log({ module: 'auth-request' }, `Terminal auth request - publicKey hex: ${publicKeyHex}`);
const answer = await db.terminalAuthRequest.upsert({
where: { publicKey: privacyKit.encodeHex(publicKey) },
where: { publicKey: publicKeyHex },
update: {},
create: { publicKey: privacyKit.encodeHex(publicKey) }
create: { publicKey: publicKeyHex }
});
if (answer.response && answer.responseAccountId) {
@ -233,15 +261,26 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
})
}
}, async (request, reply) => {
log({ module: 'auth-response' }, `Auth response endpoint hit - user: ${request.user?.id || 'NO USER'}, publicKey: ${request.body.publicKey.substring(0, 20)}...`);
const publicKey = privacyKit.decodeBase64(request.body.publicKey);
const isValid = tweetnacl.box.publicKeyLength === publicKey.length;
if (!isValid) {
log({ module: 'auth-response' }, `Invalid public key length: ${publicKey.length}`);
return reply.code(401).send({ error: 'Invalid public key' });
}
const publicKeyHex = privacyKit.encodeHex(publicKey);
log({ module: 'auth-response' }, `Looking for auth request with publicKey hex: ${publicKeyHex}`);
const authRequest = await db.terminalAuthRequest.findUnique({
where: { publicKey: privacyKit.encodeHex(publicKey) }
where: { publicKey: publicKeyHex }
});
if (!authRequest) {
log({ module: 'auth-response' }, `Auth request not found for publicKey: ${publicKeyHex}`);
// Let's also check what auth requests exist
const allRequests = await db.terminalAuthRequest.findMany({
take: 5,
orderBy: { createdAt: 'desc' }
});
log({ module: 'auth-response' }, `Recent auth requests in DB: ${JSON.stringify(allRequests.map(r => ({ id: r.id, publicKey: r.publicKey.substring(0, 20) + '...', hasResponse: !!r.response })))}`);
return reply.code(404).send({ error: 'Request not found' });
}
if (!authRequest.response) {
@ -426,7 +465,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
const lastMessage = v.messages[0];
const sessionUpdatedAt = v.updatedAt.getTime();
const lastMessageCreatedAt = lastMessage ? lastMessage.createdAt.getTime() : 0;
return {
id: v.id,
seq: v.seq,
@ -471,6 +510,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}
});
if (session) {
logger.info({ module: 'session-create', sessionId: session.id, userId, tag }, `Found existing session: ${session.id} for tag ${tag}`);
return reply.send({
session: {
id: session.id,
@ -492,6 +532,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
const updSeq = await allocateUserSeq(userId);
// Create session
logger.info({ module: 'session-create', userId, tag }, `Creating new session for user ${userId} with tag ${tag}`);
const session = await db.session.create({
data: {
accountId: userId,
@ -499,6 +540,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
metadata: metadata
}
});
logger.info({ module: 'session-create', sessionId: session.id, userId }, `Session created: ${session.id}`);
// Create update
const updContent: PrismaJson.UpdateBody = {
@ -516,16 +558,24 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
};
// Emit update to connected sockets
const updatePayload = {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
};
logger.info({
module: 'session-create',
userId,
sessionId: session.id,
updateType: 'new-session',
updatePayload: JSON.stringify(updatePayload)
}, `Emitting new-session update to all user connections`);
emitUpdateToInterestedClients({
event: 'update',
userId,
sessionId: session.id,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
}
payload: updatePayload,
recipientFilter: { type: 'all-user-authenticated-connections' }
});
return reply.send({
@ -753,18 +803,18 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}
};
// Get all user connections (not session-specific)
const connections = userIdToClientConnections.get(userId);
if (connections) {
for (const connection of connections) {
connection.socket.emit('update', {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
});
}
}
// Send to all user connections
emitUpdateToInterestedClients({
event: 'update',
userId,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
},
recipientFilter: { type: 'all-user-authenticated-connections' }
});
return reply.send({
success: true,
@ -964,6 +1014,220 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
});
});
// Machines
// POST /v1/machines - Create machine or return existing
typed.post('/v1/machines', {
preHandler: app.authenticate,
schema: {
body: z.object({
id: z.string(),
metadata: z.string(), // Encrypted metadata
daemonState: z.string().optional() // Encrypted daemon state
})
}
}, async (request, reply) => {
const userId = request.user.id;
const { id, metadata, daemonState } = request.body;
// Check if machine exists (like sessions do)
const machine = await db.machine.findFirst({
where: {
accountId: userId,
id: id
}
});
if (machine) {
// Machine exists - just return it
logger.info({ module: 'machines', machineId: id, userId }, 'Found existing machine');
return reply.send({
machine: {
id: machine.id,
metadata: machine.metadata,
metadataVersion: machine.metadataVersion,
daemonState: machine.daemonState,
daemonStateVersion: machine.daemonStateVersion,
active: machine.active,
activeAt: machine.lastActiveAt.getTime(), // Return as activeAt for API consistency
createdAt: machine.createdAt.getTime(),
updatedAt: machine.updatedAt.getTime()
}
});
} else {
// Create new machine
logger.info({ module: 'machines', machineId: id, userId }, 'Creating new machine');
const newMachine = await db.machine.create({
data: {
id,
accountId: userId,
metadata,
metadataVersion: 1,
daemonState: daemonState || null,
daemonStateVersion: daemonState ? 1 : 0
// active defaults to true in schema
// lastActiveAt defaults to now() in schema
}
});
// Emit update for new machine
const updSeq = await allocateUserSeq(userId);
emitUpdateToInterestedClients({
event: 'update',
userId,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: {
t: 'update-machine',
machineId: newMachine.id,
metadata: {
version: 1,
value: metadata
}
},
createdAt: Date.now()
}
});
return reply.send({
machine: {
id: newMachine.id,
metadata: newMachine.metadata,
metadataVersion: newMachine.metadataVersion,
daemonState: newMachine.daemonState,
daemonStateVersion: newMachine.daemonStateVersion,
active: newMachine.active,
activeAt: newMachine.lastActiveAt.getTime(), // Return as activeAt for API consistency
createdAt: newMachine.createdAt.getTime(),
updatedAt: newMachine.updatedAt.getTime()
}
});
}
});
// Machines API
typed.get('/v1/machines', {
preHandler: app.authenticate,
}, async (request, reply) => {
const userId = request.user.id;
const machines = await db.machine.findMany({
where: { accountId: userId },
orderBy: { lastActiveAt: 'desc' }
});
return machines.map(m => ({
id: m.id,
metadata: m.metadata,
metadataVersion: m.metadataVersion,
daemonState: m.daemonState,
daemonStateVersion: m.daemonStateVersion,
seq: m.seq,
active: m.active,
activeAt: m.lastActiveAt.getTime(),
createdAt: m.createdAt.getTime(),
updatedAt: m.updatedAt.getTime()
}));
});
// GET /v1/machines/:id - Get single machine by ID
typed.get('/v1/machines/:id', {
preHandler: app.authenticate,
schema: {
params: z.object({
id: z.string()
})
}
}, async (request, reply) => {
const userId = request.user.id;
const { id } = request.params;
const machine = await db.machine.findFirst({
where: {
accountId: userId,
id: id
}
});
if (!machine) {
return reply.code(404).send({ error: 'Machine not found' });
}
return {
machine: {
id: machine.id,
metadata: machine.metadata,
metadataVersion: machine.metadataVersion,
daemonState: machine.daemonState,
daemonStateVersion: machine.daemonStateVersion,
seq: machine.seq,
active: machine.active,
activeAt: machine.lastActiveAt.getTime(),
createdAt: machine.createdAt.getTime(),
updatedAt: machine.updatedAt.getTime()
}
};
});
// Combined logging endpoint (only when explicitly enabled)
if (process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING) {
typed.post('/logs-combined-from-cli-and-mobile-for-simple-ai-debugging', {
schema: {
body: z.object({
timestamp: z.string(),
level: z.string(),
message: z.string(),
messageRawObject: z.any().optional(),
source: z.enum(['mobile', 'cli']),
platform: z.string().optional()
})
}
}, async (request, reply) => {
const { timestamp, level, message, source, platform } = request.body;
// Log ONLY to separate remote logger (file only, no console)
const logData = {
source,
platform,
timestamp
};
// Use the file-only logger if available
const { fileConsolidatedLogger } = await import('@/utils/log');
if (!fileConsolidatedLogger) {
// Should never happen since we check env var above, but be safe
return reply.send({ success: true });
}
switch (level.toLowerCase()) {
case 'error':
fileConsolidatedLogger.error(logData, message);
break;
case 'warn':
case 'warning':
fileConsolidatedLogger.warn(logData, message);
break;
case 'debug':
fileConsolidatedLogger.debug(logData, message);
break;
default:
fileConsolidatedLogger.info(logData, message);
}
return reply.send({ success: true });
});
}
// Catch-all route for debugging 404s
app.setNotFoundHandler((request, reply) => {
log({ module: '404-handler' }, `404 - Method: ${request.method}, Path: ${request.url}, Headers: ${JSON.stringify(request.headers)}`);
reply.code(404).send({ error: 'Not found', path: request.url, method: request.method });
});
// Start
const port = process.env.PORT ? parseInt(process.env.PORT, 10) : 3005;
await app.listen({ port, host: '0.0.0.0' });
@ -1018,7 +1282,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
socket.disconnect();
return;
}
// Validate machine-scoped clients have machineId
if (clientType === 'machine-scoped' && !machineId) {
log({ module: 'websocket' }, `Machine-scoped client missing machineId`);
@ -1066,20 +1330,20 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
userIdToClientConnections.set(userId, new Set());
}
userIdToClientConnections.get(userId)!.add(connection);
// Broadcast daemon online status
if (connection.connectionType === 'machine-scoped') {
// Broadcast daemon online
emitUpdateToInterestedClients({
event: 'ephemeral',
userId,
sessionId: '', // No specific session
payload: {
type: 'daemon-status',
machineId,
status: 'online',
timestamp: Date.now()
}
type: 'machine-activity',
id: machineId,
active: true,
activeAt: Date.now()
},
recipientFilter: { type: 'user-scoped-only' }
});
}
@ -1120,37 +1384,44 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}
log({ module: 'websocket' }, `User disconnected: ${userId}`);
// Broadcast daemon offline status
if (connection.connectionType === 'machine-scoped') {
emitUpdateToInterestedClients({
event: 'ephemeral',
userId,
sessionId: '', // No specific session
payload: {
type: 'daemon-status',
machineId: connection.machineId,
status: 'offline',
timestamp: Date.now()
}
type: 'machine-activity',
id: connection.machineId,
active: false,
activeAt: Date.now()
},
recipientFilter: { type: 'user-scoped-only' }
});
}
});
socket.on('session-alive', async (data: any) => {
socket.on('session-alive', async (data: {
sid: string;
time: number;
thinking?: boolean;
}) => {
try {
const { sid, time, thinking } = data;
let t = time;
if (typeof t !== 'number') {
// Basic validation
if (!data || typeof data.time !== 'number' || !data.sid) {
return;
}
let t = data.time;
if (t > Date.now()) {
t = Date.now();
}
if (t < Date.now() - 1000 * 60 * 10) { // Ignore if time is in the past 10 minutes
if (t < Date.now() - 1000 * 60 * 10) {
return;
}
const { sid, thinking } = data;
// Resolve session
const session = await db.session.findUnique({
where: { id: sid, accountId: userId }
@ -1159,31 +1430,96 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
return;
}
// Update last active at
// Update last active
await db.session.update({
where: { id: sid },
data: { lastActiveAt: new Date(t), active: true }
});
// Emit update to connected sockets
// Emit update
emitUpdateToInterestedClients({
event: 'ephemeral',
userId,
sessionId: sid,
payload: {
type: 'activity',
id: sid,
active: true,
activeAt: t,
thinking
}
thinking: thinking || false
},
recipientFilter: { type: 'all-user-authenticated-connections' }
});
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Error in session-alive: ${error}`);
}
});
socket.on('session-end', async (data: any) => {
socket.on('machine-alive', async (data: {
machineId: string;
time: number;
}) => {
try {
// Basic validation
if (!data || typeof data.time !== 'number' || !data.machineId) {
return;
}
let t = data.time;
if (t > Date.now()) {
t = Date.now();
}
if (t < Date.now() - 1000 * 60 * 10) {
return;
}
// Resolve machine
const machine = await db.machine.findUnique({
where: {
accountId_id: {
accountId: userId,
id: data.machineId
}
}
});
if (!machine) {
return;
}
// Update machine lastActiveAt in database
const updatedMachine = await db.machine.update({
where: {
accountId_id: {
accountId: userId,
id: data.machineId
}
},
data: {
lastActiveAt: new Date(t),
active: true
}
});
emitUpdateToInterestedClients({
event: 'ephemeral',
userId,
payload: {
type: 'machine-activity',
id: updatedMachine.id,
active: true,
activeAt: t,
},
recipientFilter: { type: 'user-scoped-only' }
});
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Error in machine-alive: ${error}`);
}
});
socket.on('session-end', async (data: {
sid: string;
time: number;
}) => {
try {
const { sid, time } = data;
let t = time;
@ -1215,14 +1551,14 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
emitUpdateToInterestedClients({
event: 'ephemeral',
userId,
sessionId: sid,
payload: {
type: 'activity',
id: sid,
active: false,
activeAt: t,
thinking: false
}
},
recipientFilter: { type: 'all-user-authenticated-connections' }
});
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Error in session-end: ${error}`);
@ -1293,13 +1629,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
emitUpdateToInterestedClients({
event: 'update',
userId,
sessionId: sid,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: update,
createdAt: Date.now()
},
recipientFilter: { type: 'all-interested-in-session', sessionId: sid },
skipSenderConnection: connection
});
} catch (error) {
@ -1360,13 +1696,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
emitUpdateToInterestedClients({
event: 'update',
userId,
sessionId: sid,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
}
},
recipientFilter: { type: 'all-interested-in-session', sessionId: sid }
});
// Send success response with new version via callback
@ -1437,13 +1773,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
emitUpdateToInterestedClients({
event: 'update',
userId,
sessionId: sid,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
}
},
recipientFilter: { type: 'all-interested-in-session', sessionId: sid }
});
// Send success response with new version via callback
@ -1456,6 +1792,217 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}
});
// Machine metadata update with optimistic concurrency control
socket.on('machine-update-metadata', async (data: any, callback: (response: any) => void) => {
try {
const { machineId, metadata, expectedVersion } = data;
// Validate input
if (!machineId || typeof metadata !== 'string' || typeof expectedVersion !== 'number') {
if (callback) {
callback({ result: 'error', message: 'Invalid parameters' });
}
return;
}
// Resolve machine
const machine = await db.machine.findFirst({
where: {
accountId: userId,
id: machineId
}
});
if (!machine) {
if (callback) {
callback({ result: 'error', message: 'Machine not found' });
}
return;
}
// Check version
if (machine.metadataVersion !== expectedVersion) {
callback({
result: 'version-mismatch',
version: machine.metadataVersion,
metadata: machine.metadata
});
return;
}
// Update metadata with atomic version check
const { count } = await db.machine.updateMany({
where: {
accountId: userId,
id: machineId,
metadataVersion: expectedVersion // Atomic CAS
},
data: {
metadata: metadata,
metadataVersion: expectedVersion + 1
// NOT updating active or lastActiveAt here
}
});
if (count === 0) {
// Re-fetch current version
const current = await db.machine.findFirst({
where: {
accountId: userId,
id: machineId
}
});
callback({
result: 'version-mismatch',
version: current?.metadataVersion || 0,
metadata: current?.metadata
});
return;
}
// Generate update
const updSeq = await allocateUserSeq(userId);
const updContent: PrismaJson.UpdateBody = {
t: 'update-machine',
machineId: machineId,
metadata: {
value: metadata,
version: expectedVersion + 1
}
};
// Emit to all connections
emitUpdateToInterestedClients({
event: 'update',
userId,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
},
recipientFilter: { type: 'all-user-authenticated-connections' }
});
// Send success response with new version
callback({
result: 'success',
version: expectedVersion + 1,
metadata: metadata
});
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Error in machine-update-metadata: ${error}`);
if (callback) {
callback({ result: 'error', message: 'Internal error' });
}
}
});
// Machine daemon state update with optimistic concurrency control
socket.on('machine-update-state', async (data: any, callback: (response: any) => void) => {
try {
const { machineId, daemonState, expectedVersion } = data;
// Validate input
if (!machineId || typeof daemonState !== 'string' || typeof expectedVersion !== 'number') {
if (callback) {
callback({ result: 'error', message: 'Invalid parameters' });
}
return;
}
// Resolve machine
const machine = await db.machine.findFirst({
where: {
accountId: userId,
id: machineId
}
});
if (!machine) {
if (callback) {
callback({ result: 'error', message: 'Machine not found' });
}
return;
}
// Check version
if (machine.daemonStateVersion !== expectedVersion) {
callback({
result: 'version-mismatch',
version: machine.daemonStateVersion,
daemonState: machine.daemonState
});
return;
}
// Update daemon state with atomic version check
const { count } = await db.machine.updateMany({
where: {
accountId: userId,
id: machineId,
daemonStateVersion: expectedVersion // Atomic CAS
},
data: {
daemonState: daemonState,
daemonStateVersion: expectedVersion + 1,
active: true,
lastActiveAt: new Date()
}
});
if (count === 0) {
// Re-fetch current version
const current = await db.machine.findFirst({
where: {
accountId: userId,
id: machineId
}
});
callback({
result: 'version-mismatch',
version: current?.daemonStateVersion || 0,
daemonState: current?.daemonState
});
return;
}
// Generate update
const updSeq = await allocateUserSeq(userId);
const updContent: PrismaJson.UpdateBody = {
t: 'update-machine',
machineId: machineId,
daemonState: {
value: daemonState,
version: expectedVersion + 1
}
};
// Emit to all connections
emitUpdateToInterestedClients({
event: 'update',
userId,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
},
recipientFilter: { type: 'all-user-authenticated-connections' }
});
// Send success response with new version
callback({
result: 'success',
version: expectedVersion + 1,
daemonState: daemonState
});
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Error in machine-update-state: ${error}`);
if (callback) {
callback({ result: 'error', message: 'Internal error' });
}
}
});
// RPC register - Register this socket as a listener for an RPC method
socket.on('rpc-register', async (data: any) => {
try {
@ -1718,7 +2265,6 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
emitUpdateToInterestedClients({
event: 'ephemeral',
userId,
sessionId,
payload: {
type: 'usage',
id: sessionId,
@ -1726,7 +2272,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
tokens: usageData.tokens,
cost: usageData.cost,
timestamp: Date.now()
}
},
recipientFilter: { type: 'user-scoped-only' }
});
}

View File

@ -60,6 +60,18 @@ declare global {
value: string | null;
version: number;
} | null | undefined;
} | {
t: 'update-machine';
machineId: string;
metadata?: {
value: string;
version: number;
};
daemonState?: {
value: string;
version: number;
};
activeAt?: number;
};
}
}

View File

@ -2,17 +2,37 @@ import pino from 'pino';
import { mkdirSync } from 'fs';
import { join } from 'path';
const isDebug = process.env.DEBUG === 'true' || process.env.NODE_ENV === 'development';
const logsDir = join(process.cwd(), '.logs');
// Single log file name created once at startup
let consolidatedLogFile: string | undefined;
if (isDebug) {
if (process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING) {
const logsDir = join(process.cwd(), '.logs');
try {
mkdirSync(logsDir, { recursive: true });
// Create filename once at startup
const now = new Date();
const month = String(now.getMonth() + 1).padStart(2, '0');
const day = String(now.getDate()).padStart(2, '0');
const hour = String(now.getHours()).padStart(2, '0');
const min = String(now.getMinutes()).padStart(2, '0');
const sec = String(now.getSeconds()).padStart(2, '0');
consolidatedLogFile = join(logsDir, `${month}-${day}-${hour}-${min}-${sec}.log`);
console.log(`[PINO] Remote debugging logs enabled - writing to ${consolidatedLogFile}`);
} catch (error) {
console.error('Failed to create logs directory:', error);
}
}
// Format time as HH:MM:ss.mmm in local time
function formatLocalTime(timestamp?: number) {
const date = timestamp ? new Date(timestamp) : new Date();
const hours = String(date.getHours()).padStart(2, '0');
const mins = String(date.getMinutes()).padStart(2, '0');
const secs = String(date.getSeconds()).padStart(2, '0');
const ms = String(date.getMilliseconds()).padStart(3, '0');
return `${hours}:${mins}:${secs}.${ms}`;
}
const transports: any[] = [];
transports.push({
@ -21,28 +41,66 @@ transports.push({
colorize: true,
translateTime: 'HH:MM:ss.l',
ignore: 'pid,hostname',
messageFormat: '{levelLabel} [{time}] {msg}',
messageFormat: '{levelLabel} {msg} | [{time}]',
errorLikeObjectKeys: ['err', 'error'],
},
});
if (isDebug) {
if (process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING && consolidatedLogFile) {
transports.push({
target: 'pino/file',
options: {
destination: join(logsDir, `server-${new Date().toISOString().split('T')[0]}.log`),
destination: consolidatedLogFile,
mkdir: true,
messageFormat: '{levelLabel} {msg} | [server time: {time}]',
},
});
}
// Main server logger with local time formatting
export const logger = pino({
level: isDebug ? 'debug' : 'info',
level: 'debug',
transport: {
targets: transports,
},
formatters: {
log: (object: any) => {
// Add localTime to every log entry
return {
...object,
localTime: formatLocalTime(typeof object.time === 'number' ? object.time : undefined),
};
}
},
timestamp: () => `,"time":${Date.now()},"localTime":"${formatLocalTime()}"`,
});
// Optional file-only logger for remote logs from CLI/mobile
export const fileConsolidatedLogger = process.env.DANGEROUSLY_LOG_TO_SERVER_FOR_AI_AUTO_DEBUGGING && consolidatedLogFile ?
pino({
level: 'debug',
transport: {
targets: [{
target: 'pino/file',
options: {
destination: consolidatedLogFile,
mkdir: true,
},
}],
},
formatters: {
log: (object: any) => {
// Add localTime to every log entry
// Note: source property already exists from CLI/mobile logs
return {
...object,
localTime: formatLocalTime(typeof object.time === 'number' ? object.time : undefined),
};
}
},
timestamp: () => `,"time":${Date.now()},"localTime":"${formatLocalTime()}"`,
}) : undefined;
export function log(src: any, ...args: any[]) {
logger.info(src, ...args);
}