happy-server/sources/app/events/eventRouter.ts
Steve Korshakov 014473a7ac feat: implement session deletion with cascade and socket notifications
- Add sessionDelete action to handle deletion of sessions and all related data
- Delete session messages, usage reports, and access keys in proper order
- Add delete-session event type to eventRouter for real-time notifications
- Add DELETE /v1/sessions/:sessionId endpoint with ownership verification
- Send socket notification to all user connections after successful deletion
- Add idempotency rule to CLAUDE.md for API operations

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2025-09-20 21:17:48 -07:00

600 lines
16 KiB
TypeScript

import { Socket } from "socket.io";
import { log } from "@/utils/log";
import { GitHubProfile } from "@/app/api/types";
import { AccountProfile } from "@/types";
import { getPublicUrl } from "@/storage/files";
// === CONNECTION TYPES ===
export interface SessionScopedConnection {
connectionType: 'session-scoped';
socket: Socket;
userId: string;
sessionId: string;
}
export interface UserScopedConnection {
connectionType: 'user-scoped';
socket: Socket;
userId: string;
}
export interface MachineScopedConnection {
connectionType: 'machine-scoped';
socket: Socket;
userId: string;
machineId: string;
}
export type ClientConnection = SessionScopedConnection | UserScopedConnection | MachineScopedConnection;
// === RECIPIENT FILTER TYPES ===
export type RecipientFilter =
| { type: 'all-interested-in-session'; sessionId: string }
| { type: 'user-scoped-only' }
| { type: 'all-user-authenticated-connections' };
// === UPDATE EVENT TYPES (Persistent) ===
export type UpdateEvent = {
type: 'new-message';
sessionId: string;
message: {
id: string;
seq: number;
content: any;
localId: string | null;
createdAt: number;
updatedAt: number;
}
} | {
type: 'new-session';
sessionId: string;
seq: number;
metadata: string;
metadataVersion: number;
agentState: string | null;
agentStateVersion: number;
dataEncryptionKey: string | null;
active: boolean;
activeAt: number;
createdAt: number;
updatedAt: number;
} | {
type: 'update-session';
sessionId: string;
metadata?: {
value: string | null;
version: number;
} | null | undefined;
agentState?: {
value: string | null;
version: number;
} | null | undefined;
} | {
type: 'update-account';
userId: string;
settings?: {
value: string | null;
version: number;
} | null | undefined;
github?: GitHubProfile | null | undefined;
} | {
type: 'new-machine';
machineId: string;
seq: number;
metadata: string;
metadataVersion: number;
daemonState: string | null;
daemonStateVersion: number;
dataEncryptionKey: string | null;
active: boolean;
activeAt: number;
createdAt: number;
updatedAt: number;
} | {
type: 'update-machine';
machineId: string;
metadata?: {
value: string;
version: number;
};
daemonState?: {
value: string;
version: number;
};
activeAt?: number;
} | {
type: 'new-artifact';
artifactId: string;
seq: number;
header: string;
headerVersion: number;
body: string;
bodyVersion: number;
dataEncryptionKey: string | null;
createdAt: number;
updatedAt: number;
} | {
type: 'update-artifact';
artifactId: string;
header?: {
value: string;
version: number;
};
body?: {
value: string;
version: number;
};
} | {
type: 'delete-artifact';
artifactId: string;
} | {
type: 'delete-session';
sessionId: string;
} | {
type: 'relationship-updated';
uid: string;
status: 'none' | 'requested' | 'pending' | 'friend' | 'rejected';
timestamp: number;
} | {
type: 'new-feed-post';
id: string;
body: any;
cursor: string;
createdAt: number;
};
// === EPHEMERAL EVENT TYPES (Transient) ===
export type EphemeralEvent = {
type: 'activity';
id: string;
active: boolean;
activeAt: number;
thinking?: boolean;
} | {
type: 'machine-activity';
id: string;
active: boolean;
activeAt: number;
} | {
type: 'usage';
id: string;
key: string;
tokens: Record<string, number>;
cost: Record<string, number>;
timestamp: number;
} | {
type: 'machine-status';
machineId: string;
online: boolean;
timestamp: number;
};
// === EVENT PAYLOAD TYPES ===
export interface UpdatePayload {
id: string;
seq: number;
body: {
t: UpdateEvent['type'];
[key: string]: any;
};
createdAt: number;
}
export interface EphemeralPayload {
type: EphemeralEvent['type'];
[key: string]: any;
}
// === EVENT ROUTER CLASS ===
class EventRouter {
private userConnections = new Map<string, Set<ClientConnection>>();
// === CONNECTION MANAGEMENT ===
addConnection(userId: string, connection: ClientConnection): void {
if (!this.userConnections.has(userId)) {
this.userConnections.set(userId, new Set());
}
this.userConnections.get(userId)!.add(connection);
}
removeConnection(userId: string, connection: ClientConnection): void {
const connections = this.userConnections.get(userId);
if (connections) {
connections.delete(connection);
if (connections.size === 0) {
this.userConnections.delete(userId);
}
}
}
getConnections(userId: string): Set<ClientConnection> | undefined {
return this.userConnections.get(userId);
}
// === EVENT EMISSION METHODS ===
emitUpdate(params: {
userId: string;
payload: UpdatePayload;
recipientFilter?: RecipientFilter;
skipSenderConnection?: ClientConnection;
}): void {
this.emit({
userId: params.userId,
eventName: 'update',
payload: params.payload,
recipientFilter: params.recipientFilter || { type: 'all-user-authenticated-connections' },
skipSenderConnection: params.skipSenderConnection
});
}
emitEphemeral(params: {
userId: string;
payload: EphemeralPayload;
recipientFilter?: RecipientFilter;
skipSenderConnection?: ClientConnection;
}): void {
this.emit({
userId: params.userId,
eventName: 'ephemeral',
payload: params.payload,
recipientFilter: params.recipientFilter || { type: 'all-user-authenticated-connections' },
skipSenderConnection: params.skipSenderConnection
});
}
// === PRIVATE ROUTING LOGIC ===
private shouldSendToConnection(
connection: ClientConnection,
filter: RecipientFilter
): boolean {
switch (filter.type) {
case 'all-interested-in-session':
// Send to session-scoped with matching session + all user-scoped
if (connection.connectionType === 'session-scoped') {
if (connection.sessionId !== filter.sessionId) {
return false; // Wrong session
}
} else if (connection.connectionType === 'machine-scoped') {
return false; // Machines don't need session updates
}
// user-scoped always gets it
return true;
case 'user-scoped-only':
return connection.connectionType === 'user-scoped';
case 'all-user-authenticated-connections':
// Send to all connection types (default behavior)
return true;
default:
return false;
}
}
private emit(params: {
userId: string;
eventName: 'update' | 'ephemeral';
payload: any;
recipientFilter: RecipientFilter;
skipSenderConnection?: ClientConnection;
}): void {
const connections = this.userConnections.get(params.userId);
if (!connections) {
log({ module: 'websocket', level: 'warn' }, `No connections found for user ${params.userId}`);
return;
}
for (const connection of connections) {
// Skip message echo
if (params.skipSenderConnection && connection === params.skipSenderConnection) {
continue;
}
// Apply recipient filter
if (!this.shouldSendToConnection(connection, params.recipientFilter)) {
continue;
}
connection.socket.emit(params.eventName, params.payload);
}
}
}
export const eventRouter = new EventRouter();
// === EVENT BUILDER FUNCTIONS ===
export function buildNewSessionUpdate(session: {
id: string;
seq: number;
metadata: string;
metadataVersion: number;
agentState: string | null;
agentStateVersion: number;
dataEncryptionKey: Uint8Array | null;
active: boolean;
lastActiveAt: Date;
createdAt: Date;
updatedAt: Date;
}, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'new-session',
id: session.id,
seq: session.seq,
metadata: session.metadata,
metadataVersion: session.metadataVersion,
agentState: session.agentState,
agentStateVersion: session.agentStateVersion,
dataEncryptionKey: session.dataEncryptionKey ? Buffer.from(session.dataEncryptionKey).toString('base64') : null,
active: session.active,
activeAt: session.lastActiveAt.getTime(),
createdAt: session.createdAt.getTime(),
updatedAt: session.updatedAt.getTime()
},
createdAt: Date.now()
};
}
export function buildNewMessageUpdate(message: {
id: string;
seq: number;
content: any;
localId: string | null;
createdAt: Date;
updatedAt: Date;
}, sessionId: string, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'new-message',
sid: sessionId,
message: {
id: message.id,
seq: message.seq,
content: message.content,
localId: message.localId,
createdAt: message.createdAt.getTime(),
updatedAt: message.updatedAt.getTime()
}
},
createdAt: Date.now()
};
}
export function buildUpdateSessionUpdate(sessionId: string, updateSeq: number, updateId: string, metadata?: { value: string; version: number }, agentState?: { value: string; version: number }): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'update-session',
id: sessionId,
metadata,
agentState
},
createdAt: Date.now()
};
}
export function buildDeleteSessionUpdate(sessionId: string, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'delete-session',
sid: sessionId
},
createdAt: Date.now()
};
}
export function buildUpdateAccountUpdate(userId: string, profile: Partial<AccountProfile>, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'update-account',
id: userId,
...profile,
avatar: profile.avatar ? { ...profile.avatar, url: getPublicUrl(profile.avatar.path) } : undefined
},
createdAt: Date.now()
};
}
export function buildNewMachineUpdate(machine: {
id: string;
seq: number;
metadata: string;
metadataVersion: number;
daemonState: string | null;
daemonStateVersion: number;
dataEncryptionKey: Uint8Array | null;
active: boolean;
lastActiveAt: Date;
createdAt: Date;
updatedAt: Date;
}, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'new-machine',
machineId: machine.id,
seq: machine.seq,
metadata: machine.metadata,
metadataVersion: machine.metadataVersion,
daemonState: machine.daemonState,
daemonStateVersion: machine.daemonStateVersion,
dataEncryptionKey: machine.dataEncryptionKey ? Buffer.from(machine.dataEncryptionKey).toString('base64') : null,
active: machine.active,
activeAt: machine.lastActiveAt.getTime(),
createdAt: machine.createdAt.getTime(),
updatedAt: machine.updatedAt.getTime()
},
createdAt: Date.now()
};
}
export function buildUpdateMachineUpdate(machineId: string, updateSeq: number, updateId: string, metadata?: { value: string; version: number }, daemonState?: { value: string; version: number }): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'update-machine',
machineId,
metadata,
daemonState
},
createdAt: Date.now()
};
}
export function buildSessionActivityEphemeral(sessionId: string, active: boolean, activeAt: number, thinking?: boolean): EphemeralPayload {
return {
type: 'activity',
id: sessionId,
active,
activeAt,
thinking: thinking || false
};
}
export function buildMachineActivityEphemeral(machineId: string, active: boolean, activeAt: number): EphemeralPayload {
return {
type: 'machine-activity',
id: machineId,
active,
activeAt
};
}
export function buildUsageEphemeral(sessionId: string, key: string, tokens: Record<string, number>, cost: Record<string, number>): EphemeralPayload {
return {
type: 'usage',
id: sessionId,
key,
tokens,
cost,
timestamp: Date.now()
};
}
export function buildMachineStatusEphemeral(machineId: string, online: boolean): EphemeralPayload {
return {
type: 'machine-status',
machineId,
online,
timestamp: Date.now()
};
}
export function buildNewArtifactUpdate(artifact: {
id: string;
seq: number;
header: Uint8Array;
headerVersion: number;
body: Uint8Array;
bodyVersion: number;
dataEncryptionKey: Uint8Array;
createdAt: Date;
updatedAt: Date;
}, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'new-artifact',
artifactId: artifact.id,
seq: artifact.seq,
header: Buffer.from(artifact.header).toString('base64'),
headerVersion: artifact.headerVersion,
body: Buffer.from(artifact.body).toString('base64'),
bodyVersion: artifact.bodyVersion,
dataEncryptionKey: Buffer.from(artifact.dataEncryptionKey).toString('base64'),
createdAt: artifact.createdAt.getTime(),
updatedAt: artifact.updatedAt.getTime()
},
createdAt: Date.now()
};
}
export function buildUpdateArtifactUpdate(artifactId: string, updateSeq: number, updateId: string, header?: { value: string; version: number }, body?: { value: string; version: number }): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'update-artifact',
artifactId,
header,
body
},
createdAt: Date.now()
};
}
export function buildDeleteArtifactUpdate(artifactId: string, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'delete-artifact',
artifactId
},
createdAt: Date.now()
};
}
export function buildRelationshipUpdatedEvent(
data: {
uid: string;
status: 'none' | 'requested' | 'pending' | 'friend' | 'rejected';
timestamp: number;
},
updateSeq: number,
updateId: string
): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'relationship-updated',
...data
},
createdAt: Date.now()
};
}
export function buildNewFeedPostUpdate(feedItem: {
id: string;
body: any;
cursor: string;
createdAt: number;
}, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'new-feed-post',
id: feedItem.id,
body: feedItem.body,
cursor: feedItem.cursor,
createdAt: feedItem.createdAt
},
createdAt: Date.now()
};
}