From 732697a402514bbefaf3bc693ad0fb7570c44c5d Mon Sep 17 00:00:00 2001 From: Steve Korshakov Date: Mon, 18 Aug 2025 19:32:10 -0700 Subject: [PATCH] ref: new event router --- sources/app/api.ts | 394 ++++++++------------------------ sources/modules/eventRouter.ts | 396 +++++++++++++++++++++++++++++++++ 2 files changed, 489 insertions(+), 301 deletions(-) create mode 100644 sources/modules/eventRouter.ts diff --git a/sources/app/api.ts b/sources/app/api.ts index 435e19b..40320e9 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -11,37 +11,25 @@ import { onShutdown } from "@/utils/shutdown"; import { allocateSessionSeq, allocateUserSeq } from "@/services/seq"; import { randomKeyNaked } from "@/utils/randomKeyNaked"; import { AsyncLock } from "@/utils/lock"; +import { + EventRouter, + ClientConnection, + SessionScopedConnection, + UserScopedConnection, + MachineScopedConnection, + RecipientFilter, + buildNewSessionUpdate, + buildNewMessageUpdate, + buildUpdateSessionUpdate, + buildUpdateAccountUpdate, + buildUpdateMachineUpdate, + buildSessionActivityEphemeral, + buildMachineActivityEphemeral, + buildUsageEphemeral, + buildMachineStatusEphemeral +} from "@/modules/eventRouter"; -// Recipient filter types -type RecipientFilter = - | { type: 'all-interested-in-session'; sessionId: string } - | { type: 'user-scoped-only' } - | { type: 'all-user-authenticated-connections' }; - -// Connection metadata types -interface SessionScopedConnection { - connectionType: 'session-scoped'; - socket: Socket; - userId: string; - sessionId: string; -} - -interface UserScopedConnection { - connectionType: 'user-scoped'; - socket: Socket; - userId: string; -} - -interface MachineScopedConnection { - connectionType: 'machine-scoped'; - socket: Socket; - userId: string; - machineId: string; -} - -type ClientConnection = SessionScopedConnection | UserScopedConnection | MachineScopedConnection; - declare module 'fastify' { interface FastifyRequest { user: Account; @@ -117,61 +105,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }); - // Send session update to all relevant connections - let emitUpdateToInterestedClients = ({ - event, - userId, - payload, - recipientFilter = { type: 'all-user-authenticated-connections' }, - skipSenderConnection - }: { - event: string, - userId: string, - payload: any, - recipientFilter?: RecipientFilter, - skipSenderConnection?: ClientConnection - }) => { - const connections = userIdToClientConnections.get(userId); - if (!connections) { - log({ module: 'websocket', level: 'warn' }, `No connections found for user ${userId}`); - return; - } - - for (const connection of connections) { - // Skip message echo - if (skipSenderConnection && connection === skipSenderConnection) { - continue; - } - - // Apply recipient filter - switch (recipientFilter.type) { - case 'all-interested-in-session': - // Send to session-scoped with matching session + all user-scoped - if (connection.connectionType === 'session-scoped') { - if (connection.sessionId !== recipientFilter.sessionId) { - continue; // Wrong session - } - } else if (connection.connectionType === 'machine-scoped') { - continue; // Machines don't need session updates - } - // user-scoped always gets it - break; - - case 'user-scoped-only': - if (connection.connectionType !== 'user-scoped') { - continue; - } - break; - - case 'all-user-authenticated-connections': - // Send to all connection types (default behavior) - break; - } - - log({ module: 'websocket' }, `Sending ${event} to ${connection.connectionType} connection ${connection.socket.id}`); - connection.socket.emit(event, payload); - } - } + // Initialize event router + const eventRouter = new EventRouter(); // Auth schema typed.post('/v1/auth', { @@ -542,28 +477,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }); logger.info({ module: 'session-create', sessionId: session.id, userId }, `Session created: ${session.id}`); - // Create update - const updContent: PrismaJson.UpdateBody = { - t: 'new-session', - id: session.id, - seq: session.seq, - metadata: session.metadata, - metadataVersion: session.metadataVersion, - agentState: session.agentState, - agentStateVersion: session.agentStateVersion, - active: session.active, - activeAt: session.lastActiveAt.getTime(), - createdAt: session.createdAt.getTime(), - updatedAt: session.updatedAt.getTime() - }; - - // Emit update to connected sockets - const updatePayload = { - id: randomKeyNaked(12), - seq: updSeq, - body: updContent, - createdAt: Date.now() - }; + // Emit new session update + const updatePayload = buildNewSessionUpdate(session, updSeq, randomKeyNaked(12)); logger.info({ module: 'session-create', userId, @@ -571,8 +486,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> updateType: 'new-session', updatePayload: JSON.stringify(updatePayload) }, `Emitting new-session update to all user connections`); - emitUpdateToInterestedClients({ - event: 'update', + eventRouter.emitUpdate({ userId, payload: updatePayload, recipientFilter: { type: 'all-user-authenticated-connections' } @@ -794,25 +708,16 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> // Generate update for connected clients const updSeq = await allocateUserSeq(userId); - const updContent: PrismaJson.UpdateBody = { - t: 'update-account', - id: userId, - settings: { - value: settings, - version: expectedVersion + 1 - } + const settingsUpdate = { + value: settings, + version: expectedVersion + 1 }; - // Send to all user connections - emitUpdateToInterestedClients({ - event: 'update', + // Send account update to all user connections + const updatePayload = buildUpdateAccountUpdate(userId, settingsUpdate, updSeq, randomKeyNaked(12)); + eventRouter.emitUpdate({ userId, - payload: { - id: randomKeyNaked(12), - seq: updSeq, - body: updContent, - createdAt: Date.now() - }, + payload: updatePayload, recipientFilter: { type: 'all-user-authenticated-connections' } }); @@ -1073,22 +978,14 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> // Emit update for new machine const updSeq = await allocateUserSeq(userId); - emitUpdateToInterestedClients({ - event: 'update', + const machineMetadata = { + version: 1, + value: metadata + }; + const updatePayload = buildUpdateMachineUpdate(newMachine.id, updSeq, randomKeyNaked(12), machineMetadata); + eventRouter.emitUpdate({ userId, - payload: { - id: randomKeyNaked(12), - seq: updSeq, - body: { - t: 'update-machine', - machineId: newMachine.id, - metadata: { - version: 1, - value: metadata - } - }, - createdAt: Date.now() - } + payload: updatePayload }); return reply.send({ @@ -1254,8 +1151,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> serveClient: false // Don't serve the client files }); - // Track connections by scope type - const userIdToClientConnections = new Map>(); + // Connection tracking is now handled by EventRouter // Track RPC listeners: Map> // Only session-scoped clients (CLI) register handlers, only user-scoped clients (mobile) call them @@ -1326,23 +1222,15 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> userId }; } - if (!userIdToClientConnections.has(userId)) { - userIdToClientConnections.set(userId, new Set()); - } - userIdToClientConnections.get(userId)!.add(connection); + eventRouter.addConnection(userId, connection); // Broadcast daemon online status if (connection.connectionType === 'machine-scoped') { // Broadcast daemon online - emitUpdateToInterestedClients({ - event: 'ephemeral', + const machineActivity = buildMachineActivityEphemeral(machineId!, true, Date.now()); + eventRouter.emitEphemeral({ userId, - payload: { - type: 'machine-activity', - id: machineId, - active: true, - activeAt: Date.now() - }, + payload: machineActivity, recipientFilter: { type: 'user-scoped-only' } }); } @@ -1352,14 +1240,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> const receiveUsageLock = new AsyncLock(); socket.on('disconnect', () => { - // Cleanup - const connections = userIdToClientConnections.get(userId); - if (connections) { - connections.delete(connection); - if (connections.size === 0) { - userIdToClientConnections.delete(userId); - } - } + // Cleanup connections + eventRouter.removeConnection(userId, connection); // Clean up RPC listeners for this socket const userRpcMap = rpcListeners.get(userId); @@ -1387,15 +1269,10 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> // Broadcast daemon offline status if (connection.connectionType === 'machine-scoped') { - emitUpdateToInterestedClients({ - event: 'ephemeral', + const machineActivity = buildMachineActivityEphemeral(connection.machineId, false, Date.now()); + eventRouter.emitEphemeral({ userId, - payload: { - type: 'machine-activity', - id: connection.machineId, - active: false, - activeAt: Date.now() - }, + payload: machineActivity, recipientFilter: { type: 'user-scoped-only' } }); } @@ -1436,17 +1313,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> data: { lastActiveAt: new Date(t), active: true } }); - // Emit update - emitUpdateToInterestedClients({ - event: 'ephemeral', + // Emit session activity update + const sessionActivity = buildSessionActivityEphemeral(sid, true, t, thinking || false); + eventRouter.emitEphemeral({ userId, - payload: { - type: 'activity', - id: sid, - active: true, - activeAt: t, - thinking: thinking || false - }, + payload: sessionActivity, recipientFilter: { type: 'all-user-authenticated-connections' } }); } catch (error) { @@ -1500,15 +1371,10 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }); - emitUpdateToInterestedClients({ - event: 'ephemeral', + const machineActivity = buildMachineActivityEphemeral(updatedMachine.id, true, t); + eventRouter.emitEphemeral({ userId, - payload: { - type: 'machine-activity', - id: updatedMachine.id, - active: true, - activeAt: t, - }, + payload: machineActivity, recipientFilter: { type: 'user-scoped-only' } }); } catch (error) { @@ -1547,17 +1413,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> data: { lastActiveAt: new Date(t), active: false } }); - // Emit update to connected sockets - emitUpdateToInterestedClients({ - event: 'ephemeral', + // Emit session activity update + const sessionActivity = buildSessionActivityEphemeral(sid, false, t, false); + eventRouter.emitEphemeral({ userId, - payload: { - type: 'activity', - id: sid, - active: false, - activeAt: t, - thinking: false - }, + payload: sessionActivity, recipientFilter: { type: 'all-user-authenticated-connections' } }); } catch (error) { @@ -1611,30 +1471,11 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }); - // Create update - const update: PrismaJson.UpdateBody = { - t: 'new-message', - sid: sid, - message: { - id: msg.id, - seq: msg.seq, - content: msgContent, - localId: useLocalId, - createdAt: msg.createdAt.getTime(), - updatedAt: msg.updatedAt.getTime() - } - }; - - // Emit update to relevant clients - emitUpdateToInterestedClients({ - event: 'update', + // Emit new message update to relevant clients + const updatePayload = buildNewMessageUpdate(msg, sid, updSeq, randomKeyNaked(12)); + eventRouter.emitUpdate({ userId, - payload: { - id: randomKeyNaked(12), - seq: updSeq, - body: update, - createdAt: Date.now() - }, + payload: updatePayload, recipientFilter: { type: 'all-interested-in-session', sessionId: sid }, skipSenderConnection: connection }); @@ -1683,25 +1524,16 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> return null; } - // Generate update + // Generate session metadata update const updSeq = await allocateUserSeq(userId); - const updContent: PrismaJson.UpdateBody = { - t: 'update-session', - id: sid, - metadata: { - value: metadata, - version: expectedVersion + 1 - } + const metadataUpdate = { + value: metadata, + version: expectedVersion + 1 }; - emitUpdateToInterestedClients({ - event: 'update', + const updatePayload = buildUpdateSessionUpdate(sid, updSeq, randomKeyNaked(12), metadataUpdate); + eventRouter.emitUpdate({ userId, - payload: { - id: randomKeyNaked(12), - seq: updSeq, - body: updContent, - createdAt: Date.now() - }, + payload: updatePayload, recipientFilter: { type: 'all-interested-in-session', sessionId: sid } }); @@ -1758,27 +1590,16 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> return null; } - // Generate update + // Generate session agent state update const updSeq = await allocateUserSeq(userId); - const updContent: PrismaJson.UpdateBody = { - t: 'update-session', - id: sid, - agentState: { - value: agentState, - version: expectedVersion + 1 - } + const agentStateUpdate = { + value: agentState, + version: expectedVersion + 1 }; - - // Emit update to connected sockets - emitUpdateToInterestedClients({ - event: 'update', + const updatePayload = buildUpdateSessionUpdate(sid, updSeq, randomKeyNaked(12), undefined, agentStateUpdate); + eventRouter.emitUpdate({ userId, - payload: { - id: randomKeyNaked(12), - seq: updSeq, - body: updContent, - createdAt: Date.now() - }, + payload: updatePayload, recipientFilter: { type: 'all-interested-in-session', sessionId: sid } }); @@ -1859,27 +1680,16 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> return; } - // Generate update + // Generate machine metadata update const updSeq = await allocateUserSeq(userId); - const updContent: PrismaJson.UpdateBody = { - t: 'update-machine', - machineId: machineId, - metadata: { - value: metadata, - version: expectedVersion + 1 - } + const metadataUpdate = { + value: metadata, + version: expectedVersion + 1 }; - - // Emit to all connections - emitUpdateToInterestedClients({ - event: 'update', + const updatePayload = buildUpdateMachineUpdate(machineId, updSeq, randomKeyNaked(12), metadataUpdate); + eventRouter.emitUpdate({ userId, - payload: { - id: randomKeyNaked(12), - seq: updSeq, - body: updContent, - createdAt: Date.now() - }, + payload: updatePayload, recipientFilter: { type: 'all-user-authenticated-connections' } }); @@ -1965,27 +1775,16 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> return; } - // Generate update + // Generate machine daemon state update const updSeq = await allocateUserSeq(userId); - const updContent: PrismaJson.UpdateBody = { - t: 'update-machine', - machineId: machineId, - daemonState: { - value: daemonState, - version: expectedVersion + 1 - } + const daemonStateUpdate = { + value: daemonState, + version: expectedVersion + 1 }; - - // Emit to all connections - emitUpdateToInterestedClients({ - event: 'update', + const updatePayload = buildUpdateMachineUpdate(machineId, updSeq, randomKeyNaked(12), undefined, daemonStateUpdate); + eventRouter.emitUpdate({ userId, - payload: { - id: randomKeyNaked(12), - seq: updSeq, - body: updContent, - createdAt: Date.now() - }, + payload: updatePayload, recipientFilter: { type: 'all-user-authenticated-connections' } }); @@ -2260,19 +2059,12 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> log({ module: 'websocket' }, `Usage report saved: key=${key}, sessionId=${sessionId || 'none'}, userId=${userId}`); - // Emit ephemeral update if sessionId is provided + // Emit usage ephemeral update if sessionId is provided if (sessionId) { - emitUpdateToInterestedClients({ - event: 'ephemeral', + const usageEvent = buildUsageEphemeral(sessionId, key, usageData.tokens, usageData.cost); + eventRouter.emitEphemeral({ userId, - payload: { - type: 'usage', - id: sessionId, - key, - tokens: usageData.tokens, - cost: usageData.cost, - timestamp: Date.now() - }, + payload: usageEvent, recipientFilter: { type: 'user-scoped-only' } }); } diff --git a/sources/modules/eventRouter.ts b/sources/modules/eventRouter.ts new file mode 100644 index 0000000..248735f --- /dev/null +++ b/sources/modules/eventRouter.ts @@ -0,0 +1,396 @@ +import { Socket } from "socket.io"; +import { log } from "@/utils/log"; + +// === CONNECTION TYPES === + +export interface SessionScopedConnection { + connectionType: 'session-scoped'; + socket: Socket; + userId: string; + sessionId: string; +} + +export interface UserScopedConnection { + connectionType: 'user-scoped'; + socket: Socket; + userId: string; +} + +export interface MachineScopedConnection { + connectionType: 'machine-scoped'; + socket: Socket; + userId: string; + machineId: string; +} + +export type ClientConnection = SessionScopedConnection | UserScopedConnection | MachineScopedConnection; + +// === RECIPIENT FILTER TYPES === + +export type RecipientFilter = + | { type: 'all-interested-in-session'; sessionId: string } + | { type: 'user-scoped-only' } + | { type: 'all-user-authenticated-connections' }; + +// === UPDATE EVENT TYPES (Persistent) === + +export type UpdateEvent = { + type: 'new-message'; + sessionId: string; + message: { + id: string; + seq: number; + content: any; + localId: string | null; + createdAt: number; + updatedAt: number; + } +} | { + type: 'new-session'; + sessionId: string; + seq: number; + metadata: string; + metadataVersion: number; + agentState: string | null; + agentStateVersion: number; + active: boolean; + activeAt: number; + createdAt: number; + updatedAt: number; +} | { + type: 'update-session'; + sessionId: string; + metadata?: { + value: string | null; + version: number; + } | null | undefined; + agentState?: { + value: string | null; + version: number; + } | null | undefined; +} | { + type: 'update-account'; + userId: string; + settings?: { + value: string | null; + version: number; + } | null | undefined; +} | { + type: 'update-machine'; + machineId: string; + metadata?: { + value: string; + version: number; + }; + daemonState?: { + value: string; + version: number; + }; + activeAt?: number; +}; + +// === EPHEMERAL EVENT TYPES (Transient) === + +export type EphemeralEvent = { + type: 'activity'; + id: string; + active: boolean; + activeAt: number; + thinking?: boolean; +} | { + type: 'machine-activity'; + id: string; + active: boolean; + activeAt: number; +} | { + type: 'usage'; + id: string; + key: string; + tokens: Record; + cost: Record; + timestamp: number; +} | { + type: 'machine-status'; + machineId: string; + online: boolean; + timestamp: number; +}; + +// === EVENT PAYLOAD TYPES === + +export interface UpdatePayload { + id: string; + seq: number; + body: { + t: UpdateEvent['type']; + [key: string]: any; + }; + createdAt: number; +} + +export interface EphemeralPayload { + type: EphemeralEvent['type']; + [key: string]: any; +} + +// === EVENT ROUTER CLASS === + +export class EventRouter { + private userConnections = new Map>(); + + // === CONNECTION MANAGEMENT === + + addConnection(userId: string, connection: ClientConnection): void { + if (!this.userConnections.has(userId)) { + this.userConnections.set(userId, new Set()); + } + this.userConnections.get(userId)!.add(connection); + } + + removeConnection(userId: string, connection: ClientConnection): void { + const connections = this.userConnections.get(userId); + if (connections) { + connections.delete(connection); + if (connections.size === 0) { + this.userConnections.delete(userId); + } + } + } + + getConnections(userId: string): Set | undefined { + return this.userConnections.get(userId); + } + + // === EVENT EMISSION METHODS === + + emitUpdate(params: { + userId: string; + payload: UpdatePayload; + recipientFilter?: RecipientFilter; + skipSenderConnection?: ClientConnection; + }): void { + this.emit({ + userId: params.userId, + eventName: 'update', + payload: params.payload, + recipientFilter: params.recipientFilter || { type: 'all-user-authenticated-connections' }, + skipSenderConnection: params.skipSenderConnection + }); + } + + emitEphemeral(params: { + userId: string; + payload: EphemeralPayload; + recipientFilter?: RecipientFilter; + skipSenderConnection?: ClientConnection; + }): void { + this.emit({ + userId: params.userId, + eventName: 'ephemeral', + payload: params.payload, + recipientFilter: params.recipientFilter || { type: 'all-user-authenticated-connections' }, + skipSenderConnection: params.skipSenderConnection + }); + } + + // === PRIVATE ROUTING LOGIC === + + private shouldSendToConnection( + connection: ClientConnection, + filter: RecipientFilter + ): boolean { + switch (filter.type) { + case 'all-interested-in-session': + // Send to session-scoped with matching session + all user-scoped + if (connection.connectionType === 'session-scoped') { + if (connection.sessionId !== filter.sessionId) { + return false; // Wrong session + } + } else if (connection.connectionType === 'machine-scoped') { + return false; // Machines don't need session updates + } + // user-scoped always gets it + return true; + + case 'user-scoped-only': + return connection.connectionType === 'user-scoped'; + + case 'all-user-authenticated-connections': + // Send to all connection types (default behavior) + return true; + + default: + return false; + } + } + + private emit(params: { + userId: string; + eventName: 'update' | 'ephemeral'; + payload: any; + recipientFilter: RecipientFilter; + skipSenderConnection?: ClientConnection; + }): void { + const connections = this.userConnections.get(params.userId); + if (!connections) { + log({ module: 'websocket', level: 'warn' }, `No connections found for user ${params.userId}`); + return; + } + + for (const connection of connections) { + // Skip message echo + if (params.skipSenderConnection && connection === params.skipSenderConnection) { + continue; + } + + // Apply recipient filter + if (!this.shouldSendToConnection(connection, params.recipientFilter)) { + continue; + } + + log({ module: 'websocket' }, `Sending ${params.eventName} to ${connection.connectionType} connection ${connection.socket.id}`); + connection.socket.emit(params.eventName, params.payload); + } + } +} + +// === EVENT BUILDER FUNCTIONS === + +export function buildNewSessionUpdate(session: { + id: string; + seq: number; + metadata: string; + metadataVersion: number; + agentState: string | null; + agentStateVersion: number; + active: boolean; + lastActiveAt: Date; + createdAt: Date; + updatedAt: Date; +}, updateSeq: number, updateId: string): UpdatePayload { + return { + id: updateId, + seq: updateSeq, + body: { + t: 'new-session', + id: session.id, + seq: session.seq, + metadata: session.metadata, + metadataVersion: session.metadataVersion, + agentState: session.agentState, + agentStateVersion: session.agentStateVersion, + active: session.active, + activeAt: session.lastActiveAt.getTime(), + createdAt: session.createdAt.getTime(), + updatedAt: session.updatedAt.getTime() + }, + createdAt: Date.now() + }; +} + +export function buildNewMessageUpdate(message: { + id: string; + seq: number; + content: any; + localId: string | null; + createdAt: Date; + updatedAt: Date; +}, sessionId: string, updateSeq: number, updateId: string): UpdatePayload { + return { + id: updateId, + seq: updateSeq, + body: { + t: 'new-message', + sid: sessionId, + message: { + id: message.id, + seq: message.seq, + content: message.content, + localId: message.localId, + createdAt: message.createdAt.getTime(), + updatedAt: message.updatedAt.getTime() + } + }, + createdAt: Date.now() + }; +} + +export function buildUpdateSessionUpdate(sessionId: string, updateSeq: number, updateId: string, metadata?: { value: string; version: number }, agentState?: { value: string; version: number }): UpdatePayload { + return { + id: updateId, + seq: updateSeq, + body: { + t: 'update-session', + id: sessionId, + metadata, + agentState + }, + createdAt: Date.now() + }; +} + +export function buildUpdateAccountUpdate(userId: string, settings: { value: string | null; version: number }, updateSeq: number, updateId: string): UpdatePayload { + return { + id: updateId, + seq: updateSeq, + body: { + t: 'update-account', + id: userId, + settings + }, + createdAt: Date.now() + }; +} + +export function buildUpdateMachineUpdate(machineId: string, updateSeq: number, updateId: string, metadata?: { value: string; version: number }, daemonState?: { value: string; version: number }): UpdatePayload { + return { + id: updateId, + seq: updateSeq, + body: { + t: 'update-machine', + machineId, + metadata, + daemonState + }, + createdAt: Date.now() + }; +} + +export function buildSessionActivityEphemeral(sessionId: string, active: boolean, activeAt: number, thinking?: boolean): EphemeralPayload { + return { + type: 'activity', + id: sessionId, + active, + activeAt, + thinking: thinking || false + }; +} + +export function buildMachineActivityEphemeral(machineId: string, active: boolean, activeAt: number): EphemeralPayload { + return { + type: 'machine-activity', + id: machineId, + active, + activeAt + }; +} + +export function buildUsageEphemeral(sessionId: string, key: string, tokens: Record, cost: Record): EphemeralPayload { + return { + type: 'usage', + id: sessionId, + key, + tokens, + cost, + timestamp: Date.now() + }; +} + +export function buildMachineStatusEphemeral(machineId: string, online: boolean): EphemeralPayload { + return { + type: 'machine-status', + machineId, + online, + timestamp: Date.now() + }; +} \ No newline at end of file