import { Socket } from "socket.io"; import { log } from "@/utils/log"; import { GitHubProfile } from "@/app/api/types"; import { AccountProfile } from "@/types"; import { getPublicUrl } from "@/storage/files"; // === CONNECTION TYPES === export interface SessionScopedConnection { connectionType: 'session-scoped'; socket: Socket; userId: string; sessionId: string; } export interface UserScopedConnection { connectionType: 'user-scoped'; socket: Socket; userId: string; } export interface MachineScopedConnection { connectionType: 'machine-scoped'; socket: Socket; userId: string; machineId: string; } export type ClientConnection = SessionScopedConnection | UserScopedConnection | MachineScopedConnection; // === RECIPIENT FILTER TYPES === export type RecipientFilter = | { type: 'all-interested-in-session'; sessionId: string } | { type: 'user-scoped-only' } | { type: 'all-user-authenticated-connections' }; // === UPDATE EVENT TYPES (Persistent) === export type UpdateEvent = { type: 'new-message'; sessionId: string; message: { id: string; seq: number; content: any; localId: string | null; createdAt: number; updatedAt: number; } } | { type: 'new-session'; sessionId: string; seq: number; metadata: string; metadataVersion: number; agentState: string | null; agentStateVersion: number; dataEncryptionKey: string | null; active: boolean; activeAt: number; createdAt: number; updatedAt: number; } | { type: 'update-session'; sessionId: string; metadata?: { value: string | null; version: number; } | null | undefined; agentState?: { value: string | null; version: number; } | null | undefined; } | { type: 'update-account'; userId: string; settings?: { value: string | null; version: number; } | null | undefined; github?: GitHubProfile | null | undefined; } | { type: 'new-machine'; machineId: string; seq: number; metadata: string; metadataVersion: number; daemonState: string | null; daemonStateVersion: number; dataEncryptionKey: string | null; active: boolean; activeAt: number; createdAt: number; updatedAt: number; } | { type: 'update-machine'; machineId: string; metadata?: { value: string; version: number; }; daemonState?: { value: string; version: number; }; activeAt?: number; } | { type: 'new-artifact'; artifactId: string; seq: number; header: string; headerVersion: number; body: string; bodyVersion: number; dataEncryptionKey: string | null; createdAt: number; updatedAt: number; } | { type: 'update-artifact'; artifactId: string; header?: { value: string; version: number; }; body?: { value: string; version: number; }; } | { type: 'delete-artifact'; artifactId: string; } | { type: 'relationship-updated'; uid: string; status: 'none' | 'requested' | 'pending' | 'friend' | 'rejected'; timestamp: number; }; // === EPHEMERAL EVENT TYPES (Transient) === export type EphemeralEvent = { type: 'activity'; id: string; active: boolean; activeAt: number; thinking?: boolean; } | { type: 'machine-activity'; id: string; active: boolean; activeAt: number; } | { type: 'usage'; id: string; key: string; tokens: Record; cost: Record; timestamp: number; } | { type: 'machine-status'; machineId: string; online: boolean; timestamp: number; }; // === EVENT PAYLOAD TYPES === export interface UpdatePayload { id: string; seq: number; body: { t: UpdateEvent['type']; [key: string]: any; }; createdAt: number; } export interface EphemeralPayload { type: EphemeralEvent['type']; [key: string]: any; } // === EVENT ROUTER CLASS === class EventRouter { private userConnections = new Map>(); // === CONNECTION MANAGEMENT === addConnection(userId: string, connection: ClientConnection): void { if (!this.userConnections.has(userId)) { this.userConnections.set(userId, new Set()); } this.userConnections.get(userId)!.add(connection); } removeConnection(userId: string, connection: ClientConnection): void { const connections = this.userConnections.get(userId); if (connections) { connections.delete(connection); if (connections.size === 0) { this.userConnections.delete(userId); } } } getConnections(userId: string): Set | undefined { return this.userConnections.get(userId); } // === EVENT EMISSION METHODS === emitUpdate(params: { userId: string; payload: UpdatePayload; recipientFilter?: RecipientFilter; skipSenderConnection?: ClientConnection; }): void { this.emit({ userId: params.userId, eventName: 'update', payload: params.payload, recipientFilter: params.recipientFilter || { type: 'all-user-authenticated-connections' }, skipSenderConnection: params.skipSenderConnection }); } emitEphemeral(params: { userId: string; payload: EphemeralPayload; recipientFilter?: RecipientFilter; skipSenderConnection?: ClientConnection; }): void { this.emit({ userId: params.userId, eventName: 'ephemeral', payload: params.payload, recipientFilter: params.recipientFilter || { type: 'all-user-authenticated-connections' }, skipSenderConnection: params.skipSenderConnection }); } // === PRIVATE ROUTING LOGIC === private shouldSendToConnection( connection: ClientConnection, filter: RecipientFilter ): boolean { switch (filter.type) { case 'all-interested-in-session': // Send to session-scoped with matching session + all user-scoped if (connection.connectionType === 'session-scoped') { if (connection.sessionId !== filter.sessionId) { return false; // Wrong session } } else if (connection.connectionType === 'machine-scoped') { return false; // Machines don't need session updates } // user-scoped always gets it return true; case 'user-scoped-only': return connection.connectionType === 'user-scoped'; case 'all-user-authenticated-connections': // Send to all connection types (default behavior) return true; default: return false; } } private emit(params: { userId: string; eventName: 'update' | 'ephemeral'; payload: any; recipientFilter: RecipientFilter; skipSenderConnection?: ClientConnection; }): void { const connections = this.userConnections.get(params.userId); if (!connections) { log({ module: 'websocket', level: 'warn' }, `No connections found for user ${params.userId}`); return; } for (const connection of connections) { // Skip message echo if (params.skipSenderConnection && connection === params.skipSenderConnection) { continue; } // Apply recipient filter if (!this.shouldSendToConnection(connection, params.recipientFilter)) { continue; } connection.socket.emit(params.eventName, params.payload); } } } export const eventRouter = new EventRouter(); // === EVENT BUILDER FUNCTIONS === export function buildNewSessionUpdate(session: { id: string; seq: number; metadata: string; metadataVersion: number; agentState: string | null; agentStateVersion: number; dataEncryptionKey: Uint8Array | null; active: boolean; lastActiveAt: Date; createdAt: Date; updatedAt: Date; }, updateSeq: number, updateId: string): UpdatePayload { return { id: updateId, seq: updateSeq, body: { t: 'new-session', id: session.id, seq: session.seq, metadata: session.metadata, metadataVersion: session.metadataVersion, agentState: session.agentState, agentStateVersion: session.agentStateVersion, dataEncryptionKey: session.dataEncryptionKey ? Buffer.from(session.dataEncryptionKey).toString('base64') : null, active: session.active, activeAt: session.lastActiveAt.getTime(), createdAt: session.createdAt.getTime(), updatedAt: session.updatedAt.getTime() }, createdAt: Date.now() }; } export function buildNewMessageUpdate(message: { id: string; seq: number; content: any; localId: string | null; createdAt: Date; updatedAt: Date; }, sessionId: string, updateSeq: number, updateId: string): UpdatePayload { return { id: updateId, seq: updateSeq, body: { t: 'new-message', sid: sessionId, message: { id: message.id, seq: message.seq, content: message.content, localId: message.localId, createdAt: message.createdAt.getTime(), updatedAt: message.updatedAt.getTime() } }, createdAt: Date.now() }; } export function buildUpdateSessionUpdate(sessionId: string, updateSeq: number, updateId: string, metadata?: { value: string; version: number }, agentState?: { value: string; version: number }): UpdatePayload { return { id: updateId, seq: updateSeq, body: { t: 'update-session', id: sessionId, metadata, agentState }, createdAt: Date.now() }; } export function buildUpdateAccountUpdate(userId: string, profile: Partial, updateSeq: number, updateId: string): UpdatePayload { return { id: updateId, seq: updateSeq, body: { t: 'update-account', id: userId, ...profile, avatar: profile.avatar ? { ...profile.avatar, url: getPublicUrl(profile.avatar.path) } : undefined }, createdAt: Date.now() }; } export function buildNewMachineUpdate(machine: { id: string; seq: number; metadata: string; metadataVersion: number; daemonState: string | null; daemonStateVersion: number; dataEncryptionKey: Uint8Array | null; active: boolean; lastActiveAt: Date; createdAt: Date; updatedAt: Date; }, updateSeq: number, updateId: string): UpdatePayload { return { id: updateId, seq: updateSeq, body: { t: 'new-machine', machineId: machine.id, seq: machine.seq, metadata: machine.metadata, metadataVersion: machine.metadataVersion, daemonState: machine.daemonState, daemonStateVersion: machine.daemonStateVersion, dataEncryptionKey: machine.dataEncryptionKey ? Buffer.from(machine.dataEncryptionKey).toString('base64') : null, active: machine.active, activeAt: machine.lastActiveAt.getTime(), createdAt: machine.createdAt.getTime(), updatedAt: machine.updatedAt.getTime() }, createdAt: Date.now() }; } export function buildUpdateMachineUpdate(machineId: string, updateSeq: number, updateId: string, metadata?: { value: string; version: number }, daemonState?: { value: string; version: number }): UpdatePayload { return { id: updateId, seq: updateSeq, body: { t: 'update-machine', machineId, metadata, daemonState }, createdAt: Date.now() }; } export function buildSessionActivityEphemeral(sessionId: string, active: boolean, activeAt: number, thinking?: boolean): EphemeralPayload { return { type: 'activity', id: sessionId, active, activeAt, thinking: thinking || false }; } export function buildMachineActivityEphemeral(machineId: string, active: boolean, activeAt: number): EphemeralPayload { return { type: 'machine-activity', id: machineId, active, activeAt }; } export function buildUsageEphemeral(sessionId: string, key: string, tokens: Record, cost: Record): EphemeralPayload { return { type: 'usage', id: sessionId, key, tokens, cost, timestamp: Date.now() }; } export function buildMachineStatusEphemeral(machineId: string, online: boolean): EphemeralPayload { return { type: 'machine-status', machineId, online, timestamp: Date.now() }; } export function buildNewArtifactUpdate(artifact: { id: string; seq: number; header: Uint8Array; headerVersion: number; body: Uint8Array; bodyVersion: number; dataEncryptionKey: Uint8Array; createdAt: Date; updatedAt: Date; }, updateSeq: number, updateId: string): UpdatePayload { return { id: updateId, seq: updateSeq, body: { t: 'new-artifact', artifactId: artifact.id, seq: artifact.seq, header: Buffer.from(artifact.header).toString('base64'), headerVersion: artifact.headerVersion, body: Buffer.from(artifact.body).toString('base64'), bodyVersion: artifact.bodyVersion, dataEncryptionKey: Buffer.from(artifact.dataEncryptionKey).toString('base64'), createdAt: artifact.createdAt.getTime(), updatedAt: artifact.updatedAt.getTime() }, createdAt: Date.now() }; } export function buildUpdateArtifactUpdate(artifactId: string, updateSeq: number, updateId: string, header?: { value: string; version: number }, body?: { value: string; version: number }): UpdatePayload { return { id: updateId, seq: updateSeq, body: { t: 'update-artifact', artifactId, header, body }, createdAt: Date.now() }; } export function buildDeleteArtifactUpdate(artifactId: string, updateSeq: number, updateId: string): UpdatePayload { return { id: updateId, seq: updateSeq, body: { t: 'delete-artifact', artifactId }, createdAt: Date.now() }; } export function buildRelationshipUpdatedEvent( data: { uid: string; status: 'none' | 'requested' | 'pending' | 'friend' | 'rejected'; timestamp: number; }, updateSeq: number, updateId: string ): UpdatePayload { return { id: updateId, seq: updateSeq, body: { t: 'relationship-updated', ...data }, createdAt: Date.now() }; }