feat: add session online states

This commit is contained in:
Steve Korshakov 2025-07-13 20:31:41 -07:00
parent 5aa597186a
commit d2a35ca842
7 changed files with 148 additions and 9 deletions

View File

@ -0,0 +1,3 @@
-- AlterTable
ALTER TABLE "Session" ADD COLUMN "active" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "lastActiveAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP;

View File

@ -33,15 +33,17 @@ model Account {
//
model Session {
id String @id @default(cuid())
tag String
accountId String
account Account @relation(fields: [accountId], references: [id])
metadata String
seq Int @default(0)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
messages SessionMessage[]
id String @id @default(cuid())
tag String
accountId String
account Account @relation(fields: [accountId], references: [id])
metadata String
seq Int @default(0)
active Boolean @default(true)
lastActiveAt DateTime @default(now())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
messages SessionMessage[]
@@unique([accountId, tag])
}

View File

@ -127,6 +127,8 @@ export async function startApi() {
seq: true,
createdAt: true,
updatedAt: true,
active: true,
lastActiveAt: true,
messages: {
orderBy: { seq: 'desc' },
take: 1,
@ -146,6 +148,8 @@ export async function startApi() {
seq: v.seq,
createdAt: v.createdAt.getTime(),
updatedAt: v.updatedAt.getTime(),
active: v.active,
activeAt: v.lastActiveAt.getTime(),
lastMessage: v.messages[0] ? {
id: v.messages[0].id,
seq: v.messages[0].seq,
@ -181,6 +185,8 @@ export async function startApi() {
id: session.id,
seq: session.seq,
metadata: session.metadata,
active: session.active,
activeAt: session.lastActiveAt.getTime(),
createdAt: session.createdAt.getTime(),
updatedAt: session.updatedAt.getTime()
}
@ -214,6 +220,8 @@ export async function startApi() {
id: session.id,
seq: session.seq,
metadata: metadata,
active: session.active,
activeAt: session.lastActiveAt.getTime(),
createdAt: session.createdAt.getTime(),
updatedAt: session.updatedAt.getTime()
};
@ -243,6 +251,8 @@ export async function startApi() {
id: result.session.id,
seq: result.session.seq,
metadata: result.session.metadata,
active: result.session.active,
activeAt: result.session.lastActiveAt.getTime(),
createdAt: result.session.createdAt.getTime(),
updatedAt: result.session.updatedAt.getTime()
}
@ -368,6 +378,17 @@ export async function startApi() {
}
};
pubsub.on('update', updateHandler);
const updateEphemeralHandler = (accountId: string, update: { type: 'activity', id: string, active: boolean, activeAt: number }) => {
if (accountId === userId) {
socket.emit('ephemeral', {
type: update.type,
id: update.id,
active: update.active,
activeAt: update.activeAt
});
}
};
pubsub.on('update-ephemeral', updateEphemeralHandler);
socket.on('disconnect', () => {
// Clean up
@ -379,9 +400,82 @@ export async function startApi() {
}
}
pubsub.off('update', updateHandler);
pubsub.off('update-ephemeral', updateEphemeralHandler);
log({ module: 'websocket' }, `User disconnected: ${userId}`);
});
socket.on('session-alive', async (data: any) => {
const { sid, time } = data;
let t = time;
if (typeof t !== 'number') {
return;
}
if (t > Date.now()) {
t = Date.now();
}
if (t < Date.now() - 1000 * 60 * 10) { // Ignore if time is in the past 10 minutes
return;
}
// Resolve session
const session = await db.session.findUnique({
where: { id: sid, accountId: userId }
});
if (!session) {
return;
}
// Update last active at
await db.session.update({
where: { id: sid },
data: { lastActiveAt: new Date(t), active: true }
});
// Emit update to connected sockets
pubsub.emit('update-ephemeral', userId, {
type: 'activity',
id: sid,
active: true,
activeAt: t
});
});
socket.on('session-end', async (data: any) => {
const { sid, time } = data;
let t = time;
if (typeof t !== 'number') {
return;
}
if (t > Date.now()) {
t = Date.now();
}
if (t < Date.now() - 1000 * 60 * 10) { // Ignore if time is in the past 10 minutes
return;
}
// Resolve session
const session = await db.session.findUnique({
where: { id: sid, accountId: userId }
});
if (!session) {
return;
}
// Update last active at
await db.session.update({
where: { id: sid },
data: { lastActiveAt: new Date(t), active: false }
});
// Emit update to connected sockets
pubsub.emit('update-ephemeral', userId, {
type: 'activity',
id: sid,
active: false,
activeAt: t
});
});
socket.on('message', async (data: any) => {
const { sid, message } = data;

35
sources/app/timeout.ts Normal file
View File

@ -0,0 +1,35 @@
import { pubsub } from "@/services/pubsub";
import { db } from "@/storage/db";
import { backoff, delay } from "@/utils/time";
export function startTimeout() {
backoff(async () => {
while (true) {
// Find timed out sessions
const sessions = await db.session.findMany({
where: {
active: true,
lastActiveAt: {
lte: new Date(Date.now() - 1000 * 60 * 10) // 10 minutes
}
}
});
for (const session of sessions) {
await db.session.update({
where: { id: session.id },
data: { active: false }
});
pubsub.emit('update-ephemeral', {
type: 'activity',
id: session.id,
active: false,
activeAt: session.lastActiveAt.getTime()
});
}
// Wait for 1 minute
await delay(1000 * 60);
}
});
}

View File

@ -2,6 +2,7 @@ import { startApi } from "@/app/api";
import { log } from "@/utils/log";
import { awaitShutdown } from "@/utils/shutdown";
import { db } from './storage/db';
import { startTimeout } from "./app/timeout";
async function main() {
@ -11,6 +12,7 @@ async function main() {
await db.$connect();
await startApi();
startTimeout();
//
// Ready

View File

@ -3,6 +3,7 @@ import { Update } from '@prisma/client';
export interface PubSubEvents {
'update': (accountId: string, update: Update) => void;
'update-ephemeral': (accountId: string, update: { type: 'activity', id: string, active: boolean, activeAt: number }) => void;
}
class PubSubService extends EventEmitter {

View File

@ -22,6 +22,8 @@ declare global {
id: string;
seq: number;
metadata: string;
active: boolean;
activeAt: number;
createdAt: number;
updatedAt: number;
};