import { Socket } from "socket.io"; import { log } from "@/utils/log"; import { GitHubProfile } from "@/app/types"; import { AccountProfile } from "@/types"; import { getPublicUrl } from "@/storage/files"; // === CONNECTION TYPES === export interface SessionScopedConnection { connectionType: 'session-scoped'; socket: Socket; userId: string; sessionId: string; } export interface UserScopedConnection { connectionType: 'user-scoped'; socket: Socket; userId: string; } export interface MachineScopedConnection { connectionType: 'machine-scoped'; socket: Socket; userId: string; machineId: string; } export type ClientConnection = SessionScopedConnection | UserScopedConnection | MachineScopedConnection; // === RECIPIENT FILTER TYPES === export type RecipientFilter = | { type: 'all-interested-in-session'; sessionId: string } | { type: 'user-scoped-only' } | { type: 'all-user-authenticated-connections' }; // === UPDATE EVENT TYPES (Persistent) === export type UpdateEvent = { type: 'new-message'; sessionId: string; message: { id: string; seq: number; content: any; localId: string | null; createdAt: number; updatedAt: number; } } | { type: 'new-session'; sessionId: string; seq: number; metadata: string; metadataVersion: number; agentState: string | null; agentStateVersion: number; active: boolean; activeAt: number; createdAt: number; updatedAt: number; } | { type: 'update-session'; sessionId: string; metadata?: { value: string | null; version: number; } | null | undefined; agentState?: { value: string | null; version: number; } | null | undefined; } | { type: 'update-account'; userId: string; settings?: { value: string | null; version: number; } | null | undefined; github?: GitHubProfile | null | undefined; } | { type: 'update-machine'; machineId: string; metadata?: { value: string; version: number; }; daemonState?: { value: string; version: number; }; activeAt?: number; }; // === EPHEMERAL EVENT TYPES (Transient) === export type EphemeralEvent = { type: 'activity'; id: string; active: boolean; activeAt: number; thinking?: boolean; } | { type: 'machine-activity'; id: string; active: boolean; activeAt: number; } | { type: 'usage'; id: string; key: string; tokens: Record; cost: Record; timestamp: number; } | { type: 'machine-status'; machineId: string; online: boolean; timestamp: number; }; // === EVENT PAYLOAD TYPES === export interface UpdatePayload { id: string; seq: number; body: { t: UpdateEvent['type']; [key: string]: any; }; createdAt: number; } export interface EphemeralPayload { type: EphemeralEvent['type']; [key: string]: any; } // === EVENT ROUTER CLASS === export class EventRouter { private userConnections = new Map>(); // === CONNECTION MANAGEMENT === addConnection(userId: string, connection: ClientConnection): void { if (!this.userConnections.has(userId)) { this.userConnections.set(userId, new Set()); } this.userConnections.get(userId)!.add(connection); } removeConnection(userId: string, connection: ClientConnection): void { const connections = this.userConnections.get(userId); if (connections) { connections.delete(connection); if (connections.size === 0) { this.userConnections.delete(userId); } } } getConnections(userId: string): Set | undefined { return this.userConnections.get(userId); } // === EVENT EMISSION METHODS === emitUpdate(params: { userId: string; payload: UpdatePayload; recipientFilter?: RecipientFilter; skipSenderConnection?: ClientConnection; }): void { this.emit({ userId: params.userId, eventName: 'update', payload: params.payload, recipientFilter: params.recipientFilter || { type: 'all-user-authenticated-connections' }, skipSenderConnection: params.skipSenderConnection }); } emitEphemeral(params: { userId: string; payload: EphemeralPayload; recipientFilter?: RecipientFilter; skipSenderConnection?: ClientConnection; }): void { this.emit({ userId: params.userId, eventName: 'ephemeral', payload: params.payload, recipientFilter: params.recipientFilter || { type: 'all-user-authenticated-connections' }, skipSenderConnection: params.skipSenderConnection }); } // === PRIVATE ROUTING LOGIC === private shouldSendToConnection( connection: ClientConnection, filter: RecipientFilter ): boolean { switch (filter.type) { case 'all-interested-in-session': // Send to session-scoped with matching session + all user-scoped if (connection.connectionType === 'session-scoped') { if (connection.sessionId !== filter.sessionId) { return false; // Wrong session } } else if (connection.connectionType === 'machine-scoped') { return false; // Machines don't need session updates } // user-scoped always gets it return true; case 'user-scoped-only': return connection.connectionType === 'user-scoped'; case 'all-user-authenticated-connections': // Send to all connection types (default behavior) return true; default: return false; } } private emit(params: { userId: string; eventName: 'update' | 'ephemeral'; payload: any; recipientFilter: RecipientFilter; skipSenderConnection?: ClientConnection; }): void { const connections = this.userConnections.get(params.userId); if (!connections) { log({ module: 'websocket', level: 'warn' }, `No connections found for user ${params.userId}`); return; } for (const connection of connections) { // Skip message echo if (params.skipSenderConnection && connection === params.skipSenderConnection) { continue; } // Apply recipient filter if (!this.shouldSendToConnection(connection, params.recipientFilter)) { continue; } connection.socket.emit(params.eventName, params.payload); } } } // === EVENT BUILDER FUNCTIONS === export function buildNewSessionUpdate(session: { id: string; seq: number; metadata: string; metadataVersion: number; agentState: string | null; agentStateVersion: number; active: boolean; lastActiveAt: Date; createdAt: Date; updatedAt: Date; }, updateSeq: number, updateId: string): UpdatePayload { return { id: updateId, seq: updateSeq, body: { t: 'new-session', id: session.id, seq: session.seq, metadata: session.metadata, metadataVersion: session.metadataVersion, agentState: session.agentState, agentStateVersion: session.agentStateVersion, active: session.active, activeAt: session.lastActiveAt.getTime(), createdAt: session.createdAt.getTime(), updatedAt: session.updatedAt.getTime() }, createdAt: Date.now() }; } export function buildNewMessageUpdate(message: { id: string; seq: number; content: any; localId: string | null; createdAt: Date; updatedAt: Date; }, sessionId: string, updateSeq: number, updateId: string): UpdatePayload { return { id: updateId, seq: updateSeq, body: { t: 'new-message', sid: sessionId, message: { id: message.id, seq: message.seq, content: message.content, localId: message.localId, createdAt: message.createdAt.getTime(), updatedAt: message.updatedAt.getTime() } }, createdAt: Date.now() }; } export function buildUpdateSessionUpdate(sessionId: string, updateSeq: number, updateId: string, metadata?: { value: string; version: number }, agentState?: { value: string; version: number }): UpdatePayload { return { id: updateId, seq: updateSeq, body: { t: 'update-session', id: sessionId, metadata, agentState }, createdAt: Date.now() }; } export function buildUpdateAccountUpdate(userId: string, profile: Partial, updateSeq: number, updateId: string): UpdatePayload { return { id: updateId, seq: updateSeq, body: { t: 'update-account', id: userId, ...profile, avatar: profile.avatar ? { ...profile.avatar, url: getPublicUrl(profile.avatar.path) } : undefined }, createdAt: Date.now() }; } export function buildUpdateMachineUpdate(machineId: string, updateSeq: number, updateId: string, metadata?: { value: string; version: number }, daemonState?: { value: string; version: number }): UpdatePayload { return { id: updateId, seq: updateSeq, body: { t: 'update-machine', machineId, metadata, daemonState }, createdAt: Date.now() }; } export function buildSessionActivityEphemeral(sessionId: string, active: boolean, activeAt: number, thinking?: boolean): EphemeralPayload { return { type: 'activity', id: sessionId, active, activeAt, thinking: thinking || false }; } export function buildMachineActivityEphemeral(machineId: string, active: boolean, activeAt: number): EphemeralPayload { return { type: 'machine-activity', id: machineId, active, activeAt }; } export function buildUsageEphemeral(sessionId: string, key: string, tokens: Record, cost: Record): EphemeralPayload { return { type: 'usage', id: sessionId, key, tokens, cost, timestamp: Date.now() }; } export function buildMachineStatusEphemeral(machineId: string, online: boolean): EphemeralPayload { return { type: 'machine-status', machineId, online, timestamp: Date.now() }; }