feat: usage collection

This commit is contained in:
Steve Korshakov 2025-07-21 00:50:07 -07:00
parent fdc91050f7
commit e07e79ae2f
4 changed files with 312 additions and 0 deletions

View File

@ -0,0 +1,27 @@
-- CreateTable
CREATE TABLE "UsageReport" (
"id" TEXT NOT NULL,
"key" TEXT NOT NULL,
"accountId" TEXT NOT NULL,
"sessionId" TEXT,
"data" JSONB NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
CONSTRAINT "UsageReport_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "UsageReport_accountId_idx" ON "UsageReport"("accountId");
-- CreateIndex
CREATE INDEX "UsageReport_sessionId_idx" ON "UsageReport"("sessionId");
-- CreateIndex
CREATE UNIQUE INDEX "UsageReport_accountId_sessionId_key_key" ON "UsageReport"("accountId", "sessionId", "key");
-- AddForeignKey
ALTER TABLE "UsageReport" ADD CONSTRAINT "UsageReport_accountId_fkey" FOREIGN KEY ("accountId") REFERENCES "Account"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "UsageReport" ADD CONSTRAINT "UsageReport_sessionId_fkey" FOREIGN KEY ("sessionId") REFERENCES "Session"("id") ON DELETE SET NULL ON UPDATE CASCADE;

View File

@ -28,6 +28,7 @@ model Account {
Update Update[]
AccountPushToken AccountPushToken[]
TerminalAuthRequest TerminalAuthRequest[]
UsageReport UsageReport[]
}
model TerminalAuthRequest {
@ -70,6 +71,7 @@ model Session {
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
messages SessionMessage[]
usageReports UsageReport[]
@@unique([accountId, tag])
}
@ -124,3 +126,24 @@ model SimpleCache {
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}
//
// Usage Reporting
//
model UsageReport {
id String @id @default(cuid())
key String
accountId String
account Account @relation(fields: [accountId], references: [id])
sessionId String?
session Session? @relation(fields: [sessionId], references: [id])
/// [UsageReportData]
data Json
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([accountId, sessionId, key])
@@index([accountId])
@@index([sessionId])
}

View File

@ -504,6 +504,141 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}
});
// Query Usage Reports API
typed.post('/v1/usage/query', {
schema: {
body: z.object({
sessionId: z.string().nullish(),
startTime: z.number().int().positive().nullish(),
endTime: z.number().int().positive().nullish(),
groupBy: z.enum(['hour', 'day']).nullish()
})
},
preHandler: app.authenticate
}, async (request, reply) => {
const userId = request.user.id;
const { sessionId, startTime, endTime, groupBy } = request.body;
const actualGroupBy = groupBy || 'day';
try {
// Build query conditions
const where: {
accountId: string;
sessionId?: string | null;
createdAt?: {
gte?: Date;
lte?: Date;
};
} = {
accountId: userId
};
if (sessionId) {
// Verify session belongs to user
const session = await db.session.findFirst({
where: {
id: sessionId,
accountId: userId
}
});
if (!session) {
return reply.code(404).send({ error: 'Session not found' });
}
where.sessionId = sessionId;
}
if (startTime || endTime) {
where.createdAt = {};
if (startTime) {
where.createdAt.gte = new Date(startTime * 1000);
}
if (endTime) {
where.createdAt.lte = new Date(endTime * 1000);
}
}
// Fetch usage reports
const reports = await db.usageReport.findMany({
where,
orderBy: {
createdAt: 'desc'
}
});
// Aggregate data by time period
const aggregated = new Map<string, {
tokens: Record<string, number>;
cost: Record<string, number>;
count: number;
timestamp: number;
}>();
for (const report of reports) {
const data = report.data as PrismaJson.UsageReportData;
const date = new Date(report.createdAt);
// Calculate timestamp based on groupBy
let timestamp: number;
if (actualGroupBy === 'hour') {
// Round down to hour
const hourDate = new Date(date.getFullYear(), date.getMonth(), date.getDate(), date.getHours(), 0, 0, 0);
timestamp = Math.floor(hourDate.getTime() / 1000);
} else {
// Round down to day
const dayDate = new Date(date.getFullYear(), date.getMonth(), date.getDate(), 0, 0, 0, 0);
timestamp = Math.floor(dayDate.getTime() / 1000);
}
const key = timestamp.toString();
if (!aggregated.has(key)) {
aggregated.set(key, {
tokens: {},
cost: {},
count: 0,
timestamp
});
}
const agg = aggregated.get(key)!;
agg.count++;
// Aggregate tokens
for (const [tokenKey, tokenValue] of Object.entries(data.tokens)) {
if (typeof tokenValue === 'number') {
agg.tokens[tokenKey] = (agg.tokens[tokenKey] || 0) + tokenValue;
}
}
// Aggregate costs
for (const [costKey, costValue] of Object.entries(data.cost)) {
if (typeof costValue === 'number') {
agg.cost[costKey] = (agg.cost[costKey] || 0) + costValue;
}
}
}
// Convert to array and sort by timestamp
const result = Array.from(aggregated.values())
.map(data => ({
timestamp: data.timestamp,
tokens: data.tokens,
cost: data.cost,
reportCount: data.count
}))
.sort((a, b) => a.timestamp - b.timestamp);
return reply.send({
usage: result,
groupBy: actualGroupBy,
totalReports: reports.length
});
} catch (error) {
log({ module: 'api', level: 'error' }, `Failed to query usage reports: ${error}`);
return reply.code(500).send({ error: 'Failed to query usage reports' });
}
});
// Messages API
typed.get('/v1/sessions/:sessionId/messages', {
schema: {
@ -1223,6 +1358,121 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
callback({});
});
// Usage reporting
socket.on('usage-report', async (data: any, callback?: (response: any) => void) => {
const { key, sessionId, tokens, cost } = data;
// Validate required fields
if (!key || typeof key !== 'string') {
if (callback) {
callback({ success: false, error: 'Invalid key' });
}
return;
}
// Validate tokens and cost objects
if (!tokens || typeof tokens !== 'object' || typeof tokens.total !== 'number') {
if (callback) {
callback({ success: false, error: 'Invalid tokens object - must include total' });
}
return;
}
if (!cost || typeof cost !== 'object' || typeof cost.total !== 'number') {
if (callback) {
callback({ success: false, error: 'Invalid cost object - must include total' });
}
return;
}
// Validate sessionId if provided
if (sessionId && typeof sessionId !== 'string') {
if (callback) {
callback({ success: false, error: 'Invalid sessionId' });
}
return;
}
try {
// If sessionId provided, verify it belongs to the user
if (sessionId) {
const session = await db.session.findFirst({
where: {
id: sessionId,
accountId: userId
}
});
if (!session) {
if (callback) {
callback({ success: false, error: 'Session not found' });
}
return;
}
}
// Prepare usage data
const usageData: PrismaJson.UsageReportData = {
tokens,
cost
};
// Upsert the usage report
const report = await db.usageReport.upsert({
where: {
accountId_sessionId_key: {
accountId: userId,
sessionId: sessionId || null,
key
}
},
update: {
data: usageData,
updatedAt: new Date()
},
create: {
accountId: userId,
sessionId: sessionId || null,
key,
data: usageData
}
});
log({ module: 'websocket' }, `Usage report saved: key=${key}, sessionId=${sessionId || 'none'}, userId=${userId}`);
// Emit ephemeral update if sessionId is provided
if (sessionId) {
emitUpdateToInterestedClients({
event: 'ephemeral',
userId,
sessionId,
payload: {
type: 'usage',
id: sessionId,
key,
tokens: usageData.tokens,
cost: usageData.cost,
timestamp: Date.now()
}
});
}
if (callback) {
callback({
success: true,
reportId: report.id,
createdAt: report.createdAt.getTime(),
updatedAt: report.updatedAt.getTime()
});
}
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Failed to save usage report: ${error}`);
if (callback) {
callback({ success: false, error: 'Failed to save usage report' });
}
}
});
socket.emit('auth', { success: true, user: userId });
log({ module: 'websocket' }, `User connected: ${userId}`);
});

View File

@ -6,6 +6,18 @@ declare global {
c: string; // Base64 encoded encrypted content
};
// Usage report data structure
type UsageReportData = {
tokens: {
total: number;
[key: string]: number;
};
cost: {
total: number;
[key: string]: number;
};
};
// Update content types
type UpdateBody = {
t: 'new-message';