From e07e79ae2fbb650d0b90ccbc87e5f425ba725361 Mon Sep 17 00:00:00 2001 From: Steve Korshakov Date: Mon, 21 Jul 2025 00:50:07 -0700 Subject: [PATCH] feat: usage collection --- .../migration.sql | 27 ++ prisma/schema.prisma | 23 ++ sources/app/api.ts | 250 ++++++++++++++++++ sources/storage/types.ts | 12 + 4 files changed, 312 insertions(+) create mode 100644 prisma/migrations/20250721073743_add_usage_report/migration.sql diff --git a/prisma/migrations/20250721073743_add_usage_report/migration.sql b/prisma/migrations/20250721073743_add_usage_report/migration.sql new file mode 100644 index 0000000..2afca77 --- /dev/null +++ b/prisma/migrations/20250721073743_add_usage_report/migration.sql @@ -0,0 +1,27 @@ +-- CreateTable +CREATE TABLE "UsageReport" ( + "id" TEXT NOT NULL, + "key" TEXT NOT NULL, + "accountId" TEXT NOT NULL, + "sessionId" TEXT, + "data" JSONB NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "UsageReport_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "UsageReport_accountId_idx" ON "UsageReport"("accountId"); + +-- CreateIndex +CREATE INDEX "UsageReport_sessionId_idx" ON "UsageReport"("sessionId"); + +-- CreateIndex +CREATE UNIQUE INDEX "UsageReport_accountId_sessionId_key_key" ON "UsageReport"("accountId", "sessionId", "key"); + +-- AddForeignKey +ALTER TABLE "UsageReport" ADD CONSTRAINT "UsageReport_accountId_fkey" FOREIGN KEY ("accountId") REFERENCES "Account"("id") ON DELETE RESTRICT ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "UsageReport" ADD CONSTRAINT "UsageReport_sessionId_fkey" FOREIGN KEY ("sessionId") REFERENCES "Session"("id") ON DELETE SET NULL ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 5c6d513..7889848 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -28,6 +28,7 @@ model Account { Update Update[] AccountPushToken AccountPushToken[] TerminalAuthRequest TerminalAuthRequest[] + UsageReport UsageReport[] } model TerminalAuthRequest { @@ -70,6 +71,7 @@ model Session { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt messages SessionMessage[] + usageReports UsageReport[] @@unique([accountId, tag]) } @@ -124,3 +126,24 @@ model SimpleCache { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt } + +// +// Usage Reporting +// + +model UsageReport { + id String @id @default(cuid()) + key String + accountId String + account Account @relation(fields: [accountId], references: [id]) + sessionId String? + session Session? @relation(fields: [sessionId], references: [id]) + /// [UsageReportData] + data Json + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@unique([accountId, sessionId, key]) + @@index([accountId]) + @@index([sessionId]) +} diff --git a/sources/app/api.ts b/sources/app/api.ts index e13c3e9..43766b9 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -504,6 +504,141 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } }); + // Query Usage Reports API + typed.post('/v1/usage/query', { + schema: { + body: z.object({ + sessionId: z.string().nullish(), + startTime: z.number().int().positive().nullish(), + endTime: z.number().int().positive().nullish(), + groupBy: z.enum(['hour', 'day']).nullish() + }) + }, + preHandler: app.authenticate + }, async (request, reply) => { + const userId = request.user.id; + const { sessionId, startTime, endTime, groupBy } = request.body; + const actualGroupBy = groupBy || 'day'; + + try { + // Build query conditions + const where: { + accountId: string; + sessionId?: string | null; + createdAt?: { + gte?: Date; + lte?: Date; + }; + } = { + accountId: userId + }; + + if (sessionId) { + // Verify session belongs to user + const session = await db.session.findFirst({ + where: { + id: sessionId, + accountId: userId + } + }); + if (!session) { + return reply.code(404).send({ error: 'Session not found' }); + } + where.sessionId = sessionId; + } + + if (startTime || endTime) { + where.createdAt = {}; + if (startTime) { + where.createdAt.gte = new Date(startTime * 1000); + } + if (endTime) { + where.createdAt.lte = new Date(endTime * 1000); + } + } + + // Fetch usage reports + const reports = await db.usageReport.findMany({ + where, + orderBy: { + createdAt: 'desc' + } + }); + + // Aggregate data by time period + const aggregated = new Map; + cost: Record; + count: number; + timestamp: number; + }>(); + + for (const report of reports) { + const data = report.data as PrismaJson.UsageReportData; + const date = new Date(report.createdAt); + + // Calculate timestamp based on groupBy + let timestamp: number; + if (actualGroupBy === 'hour') { + // Round down to hour + const hourDate = new Date(date.getFullYear(), date.getMonth(), date.getDate(), date.getHours(), 0, 0, 0); + timestamp = Math.floor(hourDate.getTime() / 1000); + } else { + // Round down to day + const dayDate = new Date(date.getFullYear(), date.getMonth(), date.getDate(), 0, 0, 0, 0); + timestamp = Math.floor(dayDate.getTime() / 1000); + } + + const key = timestamp.toString(); + + if (!aggregated.has(key)) { + aggregated.set(key, { + tokens: {}, + cost: {}, + count: 0, + timestamp + }); + } + + const agg = aggregated.get(key)!; + agg.count++; + + // Aggregate tokens + for (const [tokenKey, tokenValue] of Object.entries(data.tokens)) { + if (typeof tokenValue === 'number') { + agg.tokens[tokenKey] = (agg.tokens[tokenKey] || 0) + tokenValue; + } + } + + // Aggregate costs + for (const [costKey, costValue] of Object.entries(data.cost)) { + if (typeof costValue === 'number') { + agg.cost[costKey] = (agg.cost[costKey] || 0) + costValue; + } + } + } + + // Convert to array and sort by timestamp + const result = Array.from(aggregated.values()) + .map(data => ({ + timestamp: data.timestamp, + tokens: data.tokens, + cost: data.cost, + reportCount: data.count + })) + .sort((a, b) => a.timestamp - b.timestamp); + + return reply.send({ + usage: result, + groupBy: actualGroupBy, + totalReports: reports.length + }); + } catch (error) { + log({ module: 'api', level: 'error' }, `Failed to query usage reports: ${error}`); + return reply.code(500).send({ error: 'Failed to query usage reports' }); + } + }); + // Messages API typed.get('/v1/sessions/:sessionId/messages', { schema: { @@ -1223,6 +1358,121 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> callback({}); }); + // Usage reporting + socket.on('usage-report', async (data: any, callback?: (response: any) => void) => { + const { key, sessionId, tokens, cost } = data; + + // Validate required fields + if (!key || typeof key !== 'string') { + if (callback) { + callback({ success: false, error: 'Invalid key' }); + } + return; + } + + // Validate tokens and cost objects + if (!tokens || typeof tokens !== 'object' || typeof tokens.total !== 'number') { + if (callback) { + callback({ success: false, error: 'Invalid tokens object - must include total' }); + } + return; + } + + if (!cost || typeof cost !== 'object' || typeof cost.total !== 'number') { + if (callback) { + callback({ success: false, error: 'Invalid cost object - must include total' }); + } + return; + } + + // Validate sessionId if provided + if (sessionId && typeof sessionId !== 'string') { + if (callback) { + callback({ success: false, error: 'Invalid sessionId' }); + } + return; + } + + try { + // If sessionId provided, verify it belongs to the user + if (sessionId) { + const session = await db.session.findFirst({ + where: { + id: sessionId, + accountId: userId + } + }); + + if (!session) { + if (callback) { + callback({ success: false, error: 'Session not found' }); + } + return; + } + } + + // Prepare usage data + const usageData: PrismaJson.UsageReportData = { + tokens, + cost + }; + + // Upsert the usage report + const report = await db.usageReport.upsert({ + where: { + accountId_sessionId_key: { + accountId: userId, + sessionId: sessionId || null, + key + } + }, + update: { + data: usageData, + updatedAt: new Date() + }, + create: { + accountId: userId, + sessionId: sessionId || null, + key, + data: usageData + } + }); + + log({ module: 'websocket' }, `Usage report saved: key=${key}, sessionId=${sessionId || 'none'}, userId=${userId}`); + + // Emit ephemeral update if sessionId is provided + if (sessionId) { + emitUpdateToInterestedClients({ + event: 'ephemeral', + userId, + sessionId, + payload: { + type: 'usage', + id: sessionId, + key, + tokens: usageData.tokens, + cost: usageData.cost, + timestamp: Date.now() + } + }); + } + + if (callback) { + callback({ + success: true, + reportId: report.id, + createdAt: report.createdAt.getTime(), + updatedAt: report.updatedAt.getTime() + }); + } + } catch (error) { + log({ module: 'websocket', level: 'error' }, `Failed to save usage report: ${error}`); + if (callback) { + callback({ success: false, error: 'Failed to save usage report' }); + } + } + }); + socket.emit('auth', { success: true, user: userId }); log({ module: 'websocket' }, `User connected: ${userId}`); }); diff --git a/sources/storage/types.ts b/sources/storage/types.ts index e0900a2..588fc57 100644 --- a/sources/storage/types.ts +++ b/sources/storage/types.ts @@ -6,6 +6,18 @@ declare global { c: string; // Base64 encoded encrypted content }; + // Usage report data structure + type UsageReportData = { + tokens: { + total: number; + [key: string]: number; + }; + cost: { + total: number; + [key: string]: number; + }; + }; + // Update content types type UpdateBody = { t: 'new-message';