happy-server/sources/modules/eventRouter.ts
2025-08-26 22:39:28 -07:00

400 lines
11 KiB
TypeScript

import { Socket } from "socket.io";
import { log } from "@/utils/log";
import { GitHubProfile } from "@/app/types";
import { AccountProfile } from "@/types";
import { getPublicUrl } from "@/storage/files";
// === CONNECTION TYPES ===
export interface SessionScopedConnection {
connectionType: 'session-scoped';
socket: Socket;
userId: string;
sessionId: string;
}
export interface UserScopedConnection {
connectionType: 'user-scoped';
socket: Socket;
userId: string;
}
export interface MachineScopedConnection {
connectionType: 'machine-scoped';
socket: Socket;
userId: string;
machineId: string;
}
export type ClientConnection = SessionScopedConnection | UserScopedConnection | MachineScopedConnection;
// === RECIPIENT FILTER TYPES ===
export type RecipientFilter =
| { type: 'all-interested-in-session'; sessionId: string }
| { type: 'user-scoped-only' }
| { type: 'all-user-authenticated-connections' };
// === UPDATE EVENT TYPES (Persistent) ===
export type UpdateEvent = {
type: 'new-message';
sessionId: string;
message: {
id: string;
seq: number;
content: any;
localId: string | null;
createdAt: number;
updatedAt: number;
}
} | {
type: 'new-session';
sessionId: string;
seq: number;
metadata: string;
metadataVersion: number;
agentState: string | null;
agentStateVersion: number;
active: boolean;
activeAt: number;
createdAt: number;
updatedAt: number;
} | {
type: 'update-session';
sessionId: string;
metadata?: {
value: string | null;
version: number;
} | null | undefined;
agentState?: {
value: string | null;
version: number;
} | null | undefined;
} | {
type: 'update-account';
userId: string;
settings?: {
value: string | null;
version: number;
} | null | undefined;
github?: GitHubProfile | null | undefined;
} | {
type: 'update-machine';
machineId: string;
metadata?: {
value: string;
version: number;
};
daemonState?: {
value: string;
version: number;
};
activeAt?: number;
};
// === EPHEMERAL EVENT TYPES (Transient) ===
export type EphemeralEvent = {
type: 'activity';
id: string;
active: boolean;
activeAt: number;
thinking?: boolean;
} | {
type: 'machine-activity';
id: string;
active: boolean;
activeAt: number;
} | {
type: 'usage';
id: string;
key: string;
tokens: Record<string, number>;
cost: Record<string, number>;
timestamp: number;
} | {
type: 'machine-status';
machineId: string;
online: boolean;
timestamp: number;
};
// === EVENT PAYLOAD TYPES ===
export interface UpdatePayload {
id: string;
seq: number;
body: {
t: UpdateEvent['type'];
[key: string]: any;
};
createdAt: number;
}
export interface EphemeralPayload {
type: EphemeralEvent['type'];
[key: string]: any;
}
// === EVENT ROUTER CLASS ===
export class EventRouter {
private userConnections = new Map<string, Set<ClientConnection>>();
// === CONNECTION MANAGEMENT ===
addConnection(userId: string, connection: ClientConnection): void {
if (!this.userConnections.has(userId)) {
this.userConnections.set(userId, new Set());
}
this.userConnections.get(userId)!.add(connection);
}
removeConnection(userId: string, connection: ClientConnection): void {
const connections = this.userConnections.get(userId);
if (connections) {
connections.delete(connection);
if (connections.size === 0) {
this.userConnections.delete(userId);
}
}
}
getConnections(userId: string): Set<ClientConnection> | undefined {
return this.userConnections.get(userId);
}
// === EVENT EMISSION METHODS ===
emitUpdate(params: {
userId: string;
payload: UpdatePayload;
recipientFilter?: RecipientFilter;
skipSenderConnection?: ClientConnection;
}): void {
this.emit({
userId: params.userId,
eventName: 'update',
payload: params.payload,
recipientFilter: params.recipientFilter || { type: 'all-user-authenticated-connections' },
skipSenderConnection: params.skipSenderConnection
});
}
emitEphemeral(params: {
userId: string;
payload: EphemeralPayload;
recipientFilter?: RecipientFilter;
skipSenderConnection?: ClientConnection;
}): void {
this.emit({
userId: params.userId,
eventName: 'ephemeral',
payload: params.payload,
recipientFilter: params.recipientFilter || { type: 'all-user-authenticated-connections' },
skipSenderConnection: params.skipSenderConnection
});
}
// === PRIVATE ROUTING LOGIC ===
private shouldSendToConnection(
connection: ClientConnection,
filter: RecipientFilter
): boolean {
switch (filter.type) {
case 'all-interested-in-session':
// Send to session-scoped with matching session + all user-scoped
if (connection.connectionType === 'session-scoped') {
if (connection.sessionId !== filter.sessionId) {
return false; // Wrong session
}
} else if (connection.connectionType === 'machine-scoped') {
return false; // Machines don't need session updates
}
// user-scoped always gets it
return true;
case 'user-scoped-only':
return connection.connectionType === 'user-scoped';
case 'all-user-authenticated-connections':
// Send to all connection types (default behavior)
return true;
default:
return false;
}
}
private emit(params: {
userId: string;
eventName: 'update' | 'ephemeral';
payload: any;
recipientFilter: RecipientFilter;
skipSenderConnection?: ClientConnection;
}): void {
const connections = this.userConnections.get(params.userId);
if (!connections) {
log({ module: 'websocket', level: 'warn' }, `No connections found for user ${params.userId}`);
return;
}
for (const connection of connections) {
// Skip message echo
if (params.skipSenderConnection && connection === params.skipSenderConnection) {
continue;
}
// Apply recipient filter
if (!this.shouldSendToConnection(connection, params.recipientFilter)) {
continue;
}
connection.socket.emit(params.eventName, params.payload);
}
}
}
// === EVENT BUILDER FUNCTIONS ===
export function buildNewSessionUpdate(session: {
id: string;
seq: number;
metadata: string;
metadataVersion: number;
agentState: string | null;
agentStateVersion: number;
active: boolean;
lastActiveAt: Date;
createdAt: Date;
updatedAt: Date;
}, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'new-session',
id: session.id,
seq: session.seq,
metadata: session.metadata,
metadataVersion: session.metadataVersion,
agentState: session.agentState,
agentStateVersion: session.agentStateVersion,
active: session.active,
activeAt: session.lastActiveAt.getTime(),
createdAt: session.createdAt.getTime(),
updatedAt: session.updatedAt.getTime()
},
createdAt: Date.now()
};
}
export function buildNewMessageUpdate(message: {
id: string;
seq: number;
content: any;
localId: string | null;
createdAt: Date;
updatedAt: Date;
}, sessionId: string, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'new-message',
sid: sessionId,
message: {
id: message.id,
seq: message.seq,
content: message.content,
localId: message.localId,
createdAt: message.createdAt.getTime(),
updatedAt: message.updatedAt.getTime()
}
},
createdAt: Date.now()
};
}
export function buildUpdateSessionUpdate(sessionId: string, updateSeq: number, updateId: string, metadata?: { value: string; version: number }, agentState?: { value: string; version: number }): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'update-session',
id: sessionId,
metadata,
agentState
},
createdAt: Date.now()
};
}
export function buildUpdateAccountUpdate(userId: string, profile: Partial<AccountProfile>, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'update-account',
id: userId,
...profile,
avatar: profile.avatar ? { ...profile.avatar, url: getPublicUrl(profile.avatar.path) } : undefined
},
createdAt: Date.now()
};
}
export function buildUpdateMachineUpdate(machineId: string, updateSeq: number, updateId: string, metadata?: { value: string; version: number }, daemonState?: { value: string; version: number }): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'update-machine',
machineId,
metadata,
daemonState
},
createdAt: Date.now()
};
}
export function buildSessionActivityEphemeral(sessionId: string, active: boolean, activeAt: number, thinking?: boolean): EphemeralPayload {
return {
type: 'activity',
id: sessionId,
active,
activeAt,
thinking: thinking || false
};
}
export function buildMachineActivityEphemeral(machineId: string, active: boolean, activeAt: number): EphemeralPayload {
return {
type: 'machine-activity',
id: machineId,
active,
activeAt
};
}
export function buildUsageEphemeral(sessionId: string, key: string, tokens: Record<string, number>, cost: Record<string, number>): EphemeralPayload {
return {
type: 'usage',
id: sessionId,
key,
tokens,
cost,
timestamp: Date.now()
};
}
export function buildMachineStatusEphemeral(machineId: string, online: boolean): EphemeralPayload {
return {
type: 'machine-status',
machineId,
online,
timestamp: Date.now()
};
}