ref: new event router

This commit is contained in:
Steve Korshakov 2025-08-18 19:32:10 -07:00
parent 3594b675a8
commit 732697a402
2 changed files with 489 additions and 301 deletions

View File

@ -11,37 +11,25 @@ import { onShutdown } from "@/utils/shutdown";
import { allocateSessionSeq, allocateUserSeq } from "@/services/seq";
import { randomKeyNaked } from "@/utils/randomKeyNaked";
import { AsyncLock } from "@/utils/lock";
import {
EventRouter,
ClientConnection,
SessionScopedConnection,
UserScopedConnection,
MachineScopedConnection,
RecipientFilter,
buildNewSessionUpdate,
buildNewMessageUpdate,
buildUpdateSessionUpdate,
buildUpdateAccountUpdate,
buildUpdateMachineUpdate,
buildSessionActivityEphemeral,
buildMachineActivityEphemeral,
buildUsageEphemeral,
buildMachineStatusEphemeral
} from "@/modules/eventRouter";
// Recipient filter types
type RecipientFilter =
| { type: 'all-interested-in-session'; sessionId: string }
| { type: 'user-scoped-only' }
| { type: 'all-user-authenticated-connections' };
// Connection metadata types
interface SessionScopedConnection {
connectionType: 'session-scoped';
socket: Socket;
userId: string;
sessionId: string;
}
interface UserScopedConnection {
connectionType: 'user-scoped';
socket: Socket;
userId: string;
}
interface MachineScopedConnection {
connectionType: 'machine-scoped';
socket: Socket;
userId: string;
machineId: string;
}
type ClientConnection = SessionScopedConnection | UserScopedConnection | MachineScopedConnection;
declare module 'fastify' {
interface FastifyRequest {
user: Account;
@ -117,61 +105,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}
});
// Send session update to all relevant connections
let emitUpdateToInterestedClients = ({
event,
userId,
payload,
recipientFilter = { type: 'all-user-authenticated-connections' },
skipSenderConnection
}: {
event: string,
userId: string,
payload: any,
recipientFilter?: RecipientFilter,
skipSenderConnection?: ClientConnection
}) => {
const connections = userIdToClientConnections.get(userId);
if (!connections) {
log({ module: 'websocket', level: 'warn' }, `No connections found for user ${userId}`);
return;
}
for (const connection of connections) {
// Skip message echo
if (skipSenderConnection && connection === skipSenderConnection) {
continue;
}
// Apply recipient filter
switch (recipientFilter.type) {
case 'all-interested-in-session':
// Send to session-scoped with matching session + all user-scoped
if (connection.connectionType === 'session-scoped') {
if (connection.sessionId !== recipientFilter.sessionId) {
continue; // Wrong session
}
} else if (connection.connectionType === 'machine-scoped') {
continue; // Machines don't need session updates
}
// user-scoped always gets it
break;
case 'user-scoped-only':
if (connection.connectionType !== 'user-scoped') {
continue;
}
break;
case 'all-user-authenticated-connections':
// Send to all connection types (default behavior)
break;
}
log({ module: 'websocket' }, `Sending ${event} to ${connection.connectionType} connection ${connection.socket.id}`);
connection.socket.emit(event, payload);
}
}
// Initialize event router
const eventRouter = new EventRouter();
// Auth schema
typed.post('/v1/auth', {
@ -542,28 +477,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
});
logger.info({ module: 'session-create', sessionId: session.id, userId }, `Session created: ${session.id}`);
// Create update
const updContent: PrismaJson.UpdateBody = {
t: 'new-session',
id: session.id,
seq: session.seq,
metadata: session.metadata,
metadataVersion: session.metadataVersion,
agentState: session.agentState,
agentStateVersion: session.agentStateVersion,
active: session.active,
activeAt: session.lastActiveAt.getTime(),
createdAt: session.createdAt.getTime(),
updatedAt: session.updatedAt.getTime()
};
// Emit update to connected sockets
const updatePayload = {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
};
// Emit new session update
const updatePayload = buildNewSessionUpdate(session, updSeq, randomKeyNaked(12));
logger.info({
module: 'session-create',
userId,
@ -571,8 +486,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
updateType: 'new-session',
updatePayload: JSON.stringify(updatePayload)
}, `Emitting new-session update to all user connections`);
emitUpdateToInterestedClients({
event: 'update',
eventRouter.emitUpdate({
userId,
payload: updatePayload,
recipientFilter: { type: 'all-user-authenticated-connections' }
@ -794,25 +708,16 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
// Generate update for connected clients
const updSeq = await allocateUserSeq(userId);
const updContent: PrismaJson.UpdateBody = {
t: 'update-account',
id: userId,
settings: {
value: settings,
version: expectedVersion + 1
}
const settingsUpdate = {
value: settings,
version: expectedVersion + 1
};
// Send to all user connections
emitUpdateToInterestedClients({
event: 'update',
// Send account update to all user connections
const updatePayload = buildUpdateAccountUpdate(userId, settingsUpdate, updSeq, randomKeyNaked(12));
eventRouter.emitUpdate({
userId,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
},
payload: updatePayload,
recipientFilter: { type: 'all-user-authenticated-connections' }
});
@ -1073,22 +978,14 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
// Emit update for new machine
const updSeq = await allocateUserSeq(userId);
emitUpdateToInterestedClients({
event: 'update',
const machineMetadata = {
version: 1,
value: metadata
};
const updatePayload = buildUpdateMachineUpdate(newMachine.id, updSeq, randomKeyNaked(12), machineMetadata);
eventRouter.emitUpdate({
userId,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: {
t: 'update-machine',
machineId: newMachine.id,
metadata: {
version: 1,
value: metadata
}
},
createdAt: Date.now()
}
payload: updatePayload
});
return reply.send({
@ -1254,8 +1151,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
serveClient: false // Don't serve the client files
});
// Track connections by scope type
const userIdToClientConnections = new Map<string, Set<ClientConnection>>();
// Connection tracking is now handled by EventRouter
// Track RPC listeners: Map<userId, Map<rpcMethodWithSessionPrefix, Socket>>
// Only session-scoped clients (CLI) register handlers, only user-scoped clients (mobile) call them
@ -1326,23 +1222,15 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
userId
};
}
if (!userIdToClientConnections.has(userId)) {
userIdToClientConnections.set(userId, new Set());
}
userIdToClientConnections.get(userId)!.add(connection);
eventRouter.addConnection(userId, connection);
// Broadcast daemon online status
if (connection.connectionType === 'machine-scoped') {
// Broadcast daemon online
emitUpdateToInterestedClients({
event: 'ephemeral',
const machineActivity = buildMachineActivityEphemeral(machineId!, true, Date.now());
eventRouter.emitEphemeral({
userId,
payload: {
type: 'machine-activity',
id: machineId,
active: true,
activeAt: Date.now()
},
payload: machineActivity,
recipientFilter: { type: 'user-scoped-only' }
});
}
@ -1352,14 +1240,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
const receiveUsageLock = new AsyncLock();
socket.on('disconnect', () => {
// Cleanup
const connections = userIdToClientConnections.get(userId);
if (connections) {
connections.delete(connection);
if (connections.size === 0) {
userIdToClientConnections.delete(userId);
}
}
// Cleanup connections
eventRouter.removeConnection(userId, connection);
// Clean up RPC listeners for this socket
const userRpcMap = rpcListeners.get(userId);
@ -1387,15 +1269,10 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
// Broadcast daemon offline status
if (connection.connectionType === 'machine-scoped') {
emitUpdateToInterestedClients({
event: 'ephemeral',
const machineActivity = buildMachineActivityEphemeral(connection.machineId, false, Date.now());
eventRouter.emitEphemeral({
userId,
payload: {
type: 'machine-activity',
id: connection.machineId,
active: false,
activeAt: Date.now()
},
payload: machineActivity,
recipientFilter: { type: 'user-scoped-only' }
});
}
@ -1436,17 +1313,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
data: { lastActiveAt: new Date(t), active: true }
});
// Emit update
emitUpdateToInterestedClients({
event: 'ephemeral',
// Emit session activity update
const sessionActivity = buildSessionActivityEphemeral(sid, true, t, thinking || false);
eventRouter.emitEphemeral({
userId,
payload: {
type: 'activity',
id: sid,
active: true,
activeAt: t,
thinking: thinking || false
},
payload: sessionActivity,
recipientFilter: { type: 'all-user-authenticated-connections' }
});
} catch (error) {
@ -1500,15 +1371,10 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}
});
emitUpdateToInterestedClients({
event: 'ephemeral',
const machineActivity = buildMachineActivityEphemeral(updatedMachine.id, true, t);
eventRouter.emitEphemeral({
userId,
payload: {
type: 'machine-activity',
id: updatedMachine.id,
active: true,
activeAt: t,
},
payload: machineActivity,
recipientFilter: { type: 'user-scoped-only' }
});
} catch (error) {
@ -1547,17 +1413,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
data: { lastActiveAt: new Date(t), active: false }
});
// Emit update to connected sockets
emitUpdateToInterestedClients({
event: 'ephemeral',
// Emit session activity update
const sessionActivity = buildSessionActivityEphemeral(sid, false, t, false);
eventRouter.emitEphemeral({
userId,
payload: {
type: 'activity',
id: sid,
active: false,
activeAt: t,
thinking: false
},
payload: sessionActivity,
recipientFilter: { type: 'all-user-authenticated-connections' }
});
} catch (error) {
@ -1611,30 +1471,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}
});
// Create update
const update: PrismaJson.UpdateBody = {
t: 'new-message',
sid: sid,
message: {
id: msg.id,
seq: msg.seq,
content: msgContent,
localId: useLocalId,
createdAt: msg.createdAt.getTime(),
updatedAt: msg.updatedAt.getTime()
}
};
// Emit update to relevant clients
emitUpdateToInterestedClients({
event: 'update',
// Emit new message update to relevant clients
const updatePayload = buildNewMessageUpdate(msg, sid, updSeq, randomKeyNaked(12));
eventRouter.emitUpdate({
userId,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: update,
createdAt: Date.now()
},
payload: updatePayload,
recipientFilter: { type: 'all-interested-in-session', sessionId: sid },
skipSenderConnection: connection
});
@ -1683,25 +1524,16 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
return null;
}
// Generate update
// Generate session metadata update
const updSeq = await allocateUserSeq(userId);
const updContent: PrismaJson.UpdateBody = {
t: 'update-session',
id: sid,
metadata: {
value: metadata,
version: expectedVersion + 1
}
const metadataUpdate = {
value: metadata,
version: expectedVersion + 1
};
emitUpdateToInterestedClients({
event: 'update',
const updatePayload = buildUpdateSessionUpdate(sid, updSeq, randomKeyNaked(12), metadataUpdate);
eventRouter.emitUpdate({
userId,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
},
payload: updatePayload,
recipientFilter: { type: 'all-interested-in-session', sessionId: sid }
});
@ -1758,27 +1590,16 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
return null;
}
// Generate update
// Generate session agent state update
const updSeq = await allocateUserSeq(userId);
const updContent: PrismaJson.UpdateBody = {
t: 'update-session',
id: sid,
agentState: {
value: agentState,
version: expectedVersion + 1
}
const agentStateUpdate = {
value: agentState,
version: expectedVersion + 1
};
// Emit update to connected sockets
emitUpdateToInterestedClients({
event: 'update',
const updatePayload = buildUpdateSessionUpdate(sid, updSeq, randomKeyNaked(12), undefined, agentStateUpdate);
eventRouter.emitUpdate({
userId,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
},
payload: updatePayload,
recipientFilter: { type: 'all-interested-in-session', sessionId: sid }
});
@ -1859,27 +1680,16 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
return;
}
// Generate update
// Generate machine metadata update
const updSeq = await allocateUserSeq(userId);
const updContent: PrismaJson.UpdateBody = {
t: 'update-machine',
machineId: machineId,
metadata: {
value: metadata,
version: expectedVersion + 1
}
const metadataUpdate = {
value: metadata,
version: expectedVersion + 1
};
// Emit to all connections
emitUpdateToInterestedClients({
event: 'update',
const updatePayload = buildUpdateMachineUpdate(machineId, updSeq, randomKeyNaked(12), metadataUpdate);
eventRouter.emitUpdate({
userId,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
},
payload: updatePayload,
recipientFilter: { type: 'all-user-authenticated-connections' }
});
@ -1965,27 +1775,16 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
return;
}
// Generate update
// Generate machine daemon state update
const updSeq = await allocateUserSeq(userId);
const updContent: PrismaJson.UpdateBody = {
t: 'update-machine',
machineId: machineId,
daemonState: {
value: daemonState,
version: expectedVersion + 1
}
const daemonStateUpdate = {
value: daemonState,
version: expectedVersion + 1
};
// Emit to all connections
emitUpdateToInterestedClients({
event: 'update',
const updatePayload = buildUpdateMachineUpdate(machineId, updSeq, randomKeyNaked(12), undefined, daemonStateUpdate);
eventRouter.emitUpdate({
userId,
payload: {
id: randomKeyNaked(12),
seq: updSeq,
body: updContent,
createdAt: Date.now()
},
payload: updatePayload,
recipientFilter: { type: 'all-user-authenticated-connections' }
});
@ -2260,19 +2059,12 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
log({ module: 'websocket' }, `Usage report saved: key=${key}, sessionId=${sessionId || 'none'}, userId=${userId}`);
// Emit ephemeral update if sessionId is provided
// Emit usage ephemeral update if sessionId is provided
if (sessionId) {
emitUpdateToInterestedClients({
event: 'ephemeral',
const usageEvent = buildUsageEphemeral(sessionId, key, usageData.tokens, usageData.cost);
eventRouter.emitEphemeral({
userId,
payload: {
type: 'usage',
id: sessionId,
key,
tokens: usageData.tokens,
cost: usageData.cost,
timestamp: Date.now()
},
payload: usageEvent,
recipientFilter: { type: 'user-scoped-only' }
});
}

View File

@ -0,0 +1,396 @@
import { Socket } from "socket.io";
import { log } from "@/utils/log";
// === CONNECTION TYPES ===
export interface SessionScopedConnection {
connectionType: 'session-scoped';
socket: Socket;
userId: string;
sessionId: string;
}
export interface UserScopedConnection {
connectionType: 'user-scoped';
socket: Socket;
userId: string;
}
export interface MachineScopedConnection {
connectionType: 'machine-scoped';
socket: Socket;
userId: string;
machineId: string;
}
export type ClientConnection = SessionScopedConnection | UserScopedConnection | MachineScopedConnection;
// === RECIPIENT FILTER TYPES ===
export type RecipientFilter =
| { type: 'all-interested-in-session'; sessionId: string }
| { type: 'user-scoped-only' }
| { type: 'all-user-authenticated-connections' };
// === UPDATE EVENT TYPES (Persistent) ===
export type UpdateEvent = {
type: 'new-message';
sessionId: string;
message: {
id: string;
seq: number;
content: any;
localId: string | null;
createdAt: number;
updatedAt: number;
}
} | {
type: 'new-session';
sessionId: string;
seq: number;
metadata: string;
metadataVersion: number;
agentState: string | null;
agentStateVersion: number;
active: boolean;
activeAt: number;
createdAt: number;
updatedAt: number;
} | {
type: 'update-session';
sessionId: string;
metadata?: {
value: string | null;
version: number;
} | null | undefined;
agentState?: {
value: string | null;
version: number;
} | null | undefined;
} | {
type: 'update-account';
userId: string;
settings?: {
value: string | null;
version: number;
} | null | undefined;
} | {
type: 'update-machine';
machineId: string;
metadata?: {
value: string;
version: number;
};
daemonState?: {
value: string;
version: number;
};
activeAt?: number;
};
// === EPHEMERAL EVENT TYPES (Transient) ===
export type EphemeralEvent = {
type: 'activity';
id: string;
active: boolean;
activeAt: number;
thinking?: boolean;
} | {
type: 'machine-activity';
id: string;
active: boolean;
activeAt: number;
} | {
type: 'usage';
id: string;
key: string;
tokens: Record<string, number>;
cost: Record<string, number>;
timestamp: number;
} | {
type: 'machine-status';
machineId: string;
online: boolean;
timestamp: number;
};
// === EVENT PAYLOAD TYPES ===
export interface UpdatePayload {
id: string;
seq: number;
body: {
t: UpdateEvent['type'];
[key: string]: any;
};
createdAt: number;
}
export interface EphemeralPayload {
type: EphemeralEvent['type'];
[key: string]: any;
}
// === EVENT ROUTER CLASS ===
export class EventRouter {
private userConnections = new Map<string, Set<ClientConnection>>();
// === CONNECTION MANAGEMENT ===
addConnection(userId: string, connection: ClientConnection): void {
if (!this.userConnections.has(userId)) {
this.userConnections.set(userId, new Set());
}
this.userConnections.get(userId)!.add(connection);
}
removeConnection(userId: string, connection: ClientConnection): void {
const connections = this.userConnections.get(userId);
if (connections) {
connections.delete(connection);
if (connections.size === 0) {
this.userConnections.delete(userId);
}
}
}
getConnections(userId: string): Set<ClientConnection> | undefined {
return this.userConnections.get(userId);
}
// === EVENT EMISSION METHODS ===
emitUpdate(params: {
userId: string;
payload: UpdatePayload;
recipientFilter?: RecipientFilter;
skipSenderConnection?: ClientConnection;
}): void {
this.emit({
userId: params.userId,
eventName: 'update',
payload: params.payload,
recipientFilter: params.recipientFilter || { type: 'all-user-authenticated-connections' },
skipSenderConnection: params.skipSenderConnection
});
}
emitEphemeral(params: {
userId: string;
payload: EphemeralPayload;
recipientFilter?: RecipientFilter;
skipSenderConnection?: ClientConnection;
}): void {
this.emit({
userId: params.userId,
eventName: 'ephemeral',
payload: params.payload,
recipientFilter: params.recipientFilter || { type: 'all-user-authenticated-connections' },
skipSenderConnection: params.skipSenderConnection
});
}
// === PRIVATE ROUTING LOGIC ===
private shouldSendToConnection(
connection: ClientConnection,
filter: RecipientFilter
): boolean {
switch (filter.type) {
case 'all-interested-in-session':
// Send to session-scoped with matching session + all user-scoped
if (connection.connectionType === 'session-scoped') {
if (connection.sessionId !== filter.sessionId) {
return false; // Wrong session
}
} else if (connection.connectionType === 'machine-scoped') {
return false; // Machines don't need session updates
}
// user-scoped always gets it
return true;
case 'user-scoped-only':
return connection.connectionType === 'user-scoped';
case 'all-user-authenticated-connections':
// Send to all connection types (default behavior)
return true;
default:
return false;
}
}
private emit(params: {
userId: string;
eventName: 'update' | 'ephemeral';
payload: any;
recipientFilter: RecipientFilter;
skipSenderConnection?: ClientConnection;
}): void {
const connections = this.userConnections.get(params.userId);
if (!connections) {
log({ module: 'websocket', level: 'warn' }, `No connections found for user ${params.userId}`);
return;
}
for (const connection of connections) {
// Skip message echo
if (params.skipSenderConnection && connection === params.skipSenderConnection) {
continue;
}
// Apply recipient filter
if (!this.shouldSendToConnection(connection, params.recipientFilter)) {
continue;
}
log({ module: 'websocket' }, `Sending ${params.eventName} to ${connection.connectionType} connection ${connection.socket.id}`);
connection.socket.emit(params.eventName, params.payload);
}
}
}
// === EVENT BUILDER FUNCTIONS ===
export function buildNewSessionUpdate(session: {
id: string;
seq: number;
metadata: string;
metadataVersion: number;
agentState: string | null;
agentStateVersion: number;
active: boolean;
lastActiveAt: Date;
createdAt: Date;
updatedAt: Date;
}, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'new-session',
id: session.id,
seq: session.seq,
metadata: session.metadata,
metadataVersion: session.metadataVersion,
agentState: session.agentState,
agentStateVersion: session.agentStateVersion,
active: session.active,
activeAt: session.lastActiveAt.getTime(),
createdAt: session.createdAt.getTime(),
updatedAt: session.updatedAt.getTime()
},
createdAt: Date.now()
};
}
export function buildNewMessageUpdate(message: {
id: string;
seq: number;
content: any;
localId: string | null;
createdAt: Date;
updatedAt: Date;
}, sessionId: string, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'new-message',
sid: sessionId,
message: {
id: message.id,
seq: message.seq,
content: message.content,
localId: message.localId,
createdAt: message.createdAt.getTime(),
updatedAt: message.updatedAt.getTime()
}
},
createdAt: Date.now()
};
}
export function buildUpdateSessionUpdate(sessionId: string, updateSeq: number, updateId: string, metadata?: { value: string; version: number }, agentState?: { value: string; version: number }): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'update-session',
id: sessionId,
metadata,
agentState
},
createdAt: Date.now()
};
}
export function buildUpdateAccountUpdate(userId: string, settings: { value: string | null; version: number }, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'update-account',
id: userId,
settings
},
createdAt: Date.now()
};
}
export function buildUpdateMachineUpdate(machineId: string, updateSeq: number, updateId: string, metadata?: { value: string; version: number }, daemonState?: { value: string; version: number }): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'update-machine',
machineId,
metadata,
daemonState
},
createdAt: Date.now()
};
}
export function buildSessionActivityEphemeral(sessionId: string, active: boolean, activeAt: number, thinking?: boolean): EphemeralPayload {
return {
type: 'activity',
id: sessionId,
active,
activeAt,
thinking: thinking || false
};
}
export function buildMachineActivityEphemeral(machineId: string, active: boolean, activeAt: number): EphemeralPayload {
return {
type: 'machine-activity',
id: machineId,
active,
activeAt
};
}
export function buildUsageEphemeral(sessionId: string, key: string, tokens: Record<string, number>, cost: Record<string, number>): EphemeralPayload {
return {
type: 'usage',
id: sessionId,
key,
tokens,
cost,
timestamp: Date.now()
};
}
export function buildMachineStatusEphemeral(machineId: string, online: boolean): EphemeralPayload {
return {
type: 'machine-status',
machineId,
online,
timestamp: Date.now()
};
}