refactor: separate session-alive and machine-alive socket events
- Remove SessionAliveEvent type and simplify event handling - session-alive now only handles session heartbeats (requires sid) - Add new machine-alive event for daemon heartbeats (requires machineId) - Remove type field and coupling between session and machine events - Add proper TypeScript types instead of using 'any' 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
a4bc4d34e8
commit
597d1d262a
@ -12,28 +12,6 @@ import { allocateSessionSeq, allocateUserSeq } from "@/services/seq";
|
|||||||
import { randomKeyNaked } from "@/utils/randomKeyNaked";
|
import { randomKeyNaked } from "@/utils/randomKeyNaked";
|
||||||
import { AsyncLock } from "@/utils/lock";
|
import { AsyncLock } from "@/utils/lock";
|
||||||
|
|
||||||
// Session alive event types
|
|
||||||
type SessionAliveEvent =
|
|
||||||
| {
|
|
||||||
type: 'session-scoped';
|
|
||||||
sid: string;
|
|
||||||
time: number;
|
|
||||||
thinking: boolean;
|
|
||||||
mode: 'local' | 'remote';
|
|
||||||
}
|
|
||||||
| {
|
|
||||||
type: 'machine-scoped';
|
|
||||||
machineId: string;
|
|
||||||
time: number;
|
|
||||||
}
|
|
||||||
| {
|
|
||||||
// Legacy format (no type field) - defaults to session-scoped
|
|
||||||
sid: string;
|
|
||||||
time: number;
|
|
||||||
thinking: boolean;
|
|
||||||
mode?: 'local' | 'remote';
|
|
||||||
type?: undefined;
|
|
||||||
};
|
|
||||||
|
|
||||||
// Recipient filter types
|
// Recipient filter types
|
||||||
type RecipientFilter =
|
type RecipientFilter =
|
||||||
@ -1184,10 +1162,15 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
socket.on('session-alive', async (data: SessionAliveEvent) => {
|
socket.on('session-alive', async (data: {
|
||||||
|
sid: string;
|
||||||
|
time: number;
|
||||||
|
thinking?: boolean;
|
||||||
|
mode?: 'local' | 'remote';
|
||||||
|
}) => {
|
||||||
try {
|
try {
|
||||||
// Basic validation
|
// Basic validation
|
||||||
if (!data || typeof data.time !== 'number') {
|
if (!data || typeof data.time !== 'number' || !data.sid) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1199,80 +1182,84 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Determine type (default to session-scoped for legacy)
|
const { sid, thinking } = data;
|
||||||
const eventType = data.type || 'session-scoped';
|
|
||||||
|
|
||||||
// Validate but CONTINUE with warning
|
// Resolve session
|
||||||
if (eventType === 'machine-scoped' && connection.connectionType !== 'machine-scoped') {
|
const session = await db.session.findUnique({
|
||||||
log({ module: 'websocket', level: 'warn' },
|
where: { id: sid, accountId: userId }
|
||||||
`Connection type mismatch: ${connection.connectionType} sending machine-scoped alive`);
|
});
|
||||||
// CONTINUE ANYWAY
|
if (!session) {
|
||||||
}
|
return;
|
||||||
if (eventType === 'session-scoped' && connection.connectionType === 'machine-scoped') {
|
|
||||||
log({ module: 'websocket', level: 'warn' },
|
|
||||||
`Connection type mismatch: ${connection.connectionType} sending session-scoped alive`);
|
|
||||||
// CONTINUE ANYWAY
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle based on type
|
// Update last active
|
||||||
if (eventType === 'machine-scoped' && 'machineId' in data) {
|
await db.session.update({
|
||||||
// Machine heartbeat - update database instead of ephemeral
|
where: { id: sid },
|
||||||
const machineId = connection.connectionType === 'machine-scoped' ? connection.machineId : data.machineId;
|
data: { lastActiveAt: new Date(t), active: true }
|
||||||
|
});
|
||||||
// Update machine lastActiveAt in database
|
|
||||||
await db.machine.update({
|
|
||||||
where: {
|
|
||||||
accountId_id: {
|
|
||||||
accountId: userId,
|
|
||||||
id: machineId
|
|
||||||
}
|
|
||||||
},
|
|
||||||
data: {
|
|
||||||
lastActiveAt: new Date(t),
|
|
||||||
active: true
|
|
||||||
}
|
|
||||||
}).catch(() => {
|
|
||||||
// Machine might not exist yet, that's ok
|
|
||||||
});
|
|
||||||
|
|
||||||
} else if ('sid' in data) {
|
|
||||||
// Session heartbeat (legacy or explicit session-scoped)
|
|
||||||
const { sid, thinking } = data;
|
|
||||||
|
|
||||||
// Resolve session
|
|
||||||
const session = await db.session.findUnique({
|
|
||||||
where: { id: sid, accountId: userId }
|
|
||||||
});
|
|
||||||
if (!session) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update last active
|
// Emit update
|
||||||
await db.session.update({
|
emitUpdateToInterestedClients({
|
||||||
where: { id: sid },
|
event: 'ephemeral',
|
||||||
data: { lastActiveAt: new Date(t), active: true }
|
userId,
|
||||||
});
|
payload: {
|
||||||
|
type: 'activity',
|
||||||
// Emit update
|
id: sid,
|
||||||
emitUpdateToInterestedClients({
|
active: true,
|
||||||
event: 'ephemeral',
|
activeAt: t,
|
||||||
userId,
|
thinking: thinking || false
|
||||||
payload: {
|
},
|
||||||
type: 'activity',
|
recipientFilter: { type: 'all-user-authenticated-connections' }
|
||||||
id: sid,
|
});
|
||||||
active: true,
|
|
||||||
activeAt: t,
|
|
||||||
thinking: thinking || false
|
|
||||||
},
|
|
||||||
recipientFilter: { type: 'all-user-authenticated-connections' }
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
log({ module: 'websocket', level: 'error' }, `Error in session-alive: ${error}`);
|
log({ module: 'websocket', level: 'error' }, `Error in session-alive: ${error}`);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
socket.on('session-end', async (data: any) => {
|
socket.on('machine-alive', async (data: {
|
||||||
|
machineId: string;
|
||||||
|
time: number;
|
||||||
|
}) => {
|
||||||
|
try {
|
||||||
|
// Basic validation
|
||||||
|
if (!data || typeof data.time !== 'number' || !data.machineId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let t = data.time;
|
||||||
|
if (t > Date.now()) {
|
||||||
|
t = Date.now();
|
||||||
|
}
|
||||||
|
if (t < Date.now() - 1000 * 60 * 10) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const machineId = data.machineId;
|
||||||
|
|
||||||
|
// Update machine lastActiveAt in database
|
||||||
|
await db.machine.update({
|
||||||
|
where: {
|
||||||
|
accountId_id: {
|
||||||
|
accountId: userId,
|
||||||
|
id: machineId
|
||||||
|
}
|
||||||
|
},
|
||||||
|
data: {
|
||||||
|
lastActiveAt: new Date(t),
|
||||||
|
active: true
|
||||||
|
}
|
||||||
|
}).catch(() => {
|
||||||
|
// Machine might not exist yet, that's ok
|
||||||
|
});
|
||||||
|
} catch (error) {
|
||||||
|
log({ module: 'websocket', level: 'error' }, `Error in machine-alive: ${error}`);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
socket.on('session-end', async (data: {
|
||||||
|
sid: string;
|
||||||
|
time: number;
|
||||||
|
}) => {
|
||||||
try {
|
try {
|
||||||
const { sid, time } = data;
|
const { sid, time } = data;
|
||||||
let t = time;
|
let t = time;
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user