ref: moving files around

This commit is contained in:
Steve Korshakov 2025-09-01 14:04:30 -07:00
parent 2dada58228
commit 5379043a14
20 changed files with 67 additions and 555 deletions

View File

@ -0,0 +1,22 @@
-- CreateTable
CREATE TABLE "ServiceAccountToken" (
"id" TEXT NOT NULL,
"accountId" TEXT NOT NULL,
"vendor" TEXT NOT NULL,
"token" BYTEA NOT NULL,
"metadata" JSONB,
"lastUsedAt" TIMESTAMP(3),
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
CONSTRAINT "ServiceAccountToken_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "ServiceAccountToken_accountId_idx" ON "ServiceAccountToken"("accountId");
-- CreateIndex
CREATE UNIQUE INDEX "ServiceAccountToken_accountId_vendor_key" ON "ServiceAccountToken"("accountId", "vendor");
-- AddForeignKey
ALTER TABLE "ServiceAccountToken" ADD CONSTRAINT "ServiceAccountToken_accountId_fkey" FOREIGN KEY ("accountId") REFERENCES "Account"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View File

@ -43,6 +43,7 @@ model Account {
UsageReport UsageReport[]
Machine Machine[]
UploadedFile UploadedFile[]
ServiceAccountToken ServiceAccountToken[]
}
model TerminalAuthRequest {
@ -222,3 +223,18 @@ model UploadedFile {
@@unique([accountId, path])
@@index([accountId])
}
model ServiceAccountToken {
id String @id @default(cuid())
accountId String
account Account @relation(fields: [accountId], references: [id], onDelete: Cascade)
vendor String
token Bytes // Encrypted token
metadata Json? // Optional vendor metadata
lastUsedAt DateTime?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([accountId, vendor])
@@index([accountId])
}

View File

@ -7,10 +7,10 @@ import * as privacyKit from "privacy-kit";
import { db } from "@/storage/db";
import { Account, Prisma } from "@prisma/client";
import { onShutdown } from "@/utils/shutdown";
import { allocateSessionSeq, allocateUserSeq } from "@/services/seq";
import { allocateSessionSeq, allocateUserSeq } from "@/storage/seq";
import { randomKeyNaked } from "@/utils/randomKeyNaked";
import { AsyncLock } from "@/utils/lock";
import { auth } from "@/modules/auth";
import { auth } from "@/app/auth/auth";
import {
EventRouter,
ClientConnection,
@ -31,8 +31,8 @@ import {
websocketEventsCounter,
httpRequestsCounter,
httpRequestDurationHistogram
} from "@/modules/metrics";
import { activityCache } from "@/modules/sessionCache";
} from "@/app/monitoring/metrics2";
import { activityCache } from "@/app/presence/sessionCache";
import { encryptBytes, encryptString } from "@/modules/encrypt";
import { GitHubProfile } from "./types";
import { uploadImage } from "@/storage/uploadImage";

View File

@ -1,3 +1,7 @@
import { FastifyBaseLogger, FastifyInstance } from "fastify";
import { ZodTypeProvider } from "fastify-type-provider-zod";
import { IncomingMessage, Server, ServerResponse } from "http";
export interface GitHubProfile {
id: number;
login: string;
@ -36,4 +40,12 @@ export interface GitHubProfile {
export interface GitHubOrg {
}
}
export type Fastify = FastifyInstance<
Server<typeof IncomingMessage, typeof ServerResponse>,
IncomingMessage,
ServerResponse<IncomingMessage>,
FastifyBaseLogger,
ZodTypeProvider
>;

View File

@ -1,6 +1,6 @@
import fastify from 'fastify';
import { db } from '@/storage/db';
import { register } from '@/modules/metrics';
import { register } from '@/app/monitoring/metrics2';
import { log } from '@/utils/log';
export async function createMetricsServer() {

View File

@ -1,6 +1,6 @@
import { db } from "@/storage/db";
import { log } from "@/utils/log";
import { sessionCacheCounter, databaseUpdatesSkippedCounter } from "@/modules/metrics";
import { sessionCacheCounter, databaseUpdatesSkippedCounter } from "@/app/monitoring/metrics2";
interface SessionCacheEntry {
validUntil: number;

View File

@ -1,13 +1,13 @@
import { startApi } from "@/app/api";
import { startApi } from "@/app/api/api";
import { log } from "@/utils/log";
import { awaitShutdown, onShutdown } from "@/utils/shutdown";
import { db } from './storage/db';
import { startTimeout } from "./app/timeout";
import { redis } from "./services/redis";
import { startMetricsServer } from "@/app/metrics";
import { activityCache } from "@/modules/sessionCache";
import { auth } from "./modules/auth";
import { startDatabaseMetricsUpdater } from "@/modules/metrics";
import { startTimeout } from "./app/presence/timeout";
import { redis } from "./storage/redis";
import { startMetricsServer } from "@/app/monitoring/metrics";
import { activityCache } from "@/app/presence/sessionCache";
import { auth } from "./app/auth/auth";
import { startDatabaseMetricsUpdater } from "@/app/monitoring/metrics2";
import { initEncrypt } from "./modules/encrypt";
import { initGithub } from "./modules/github";
import { loadFiles } from "./storage/files";

View File

@ -1,6 +1,6 @@
import { Socket } from "socket.io";
import { log } from "@/utils/log";
import { GitHubProfile } from "@/app/types";
import { GitHubProfile } from "@/app/api/types";
import { AccountProfile } from "@/types";
import { getPublicUrl } from "@/storage/files";

View File

@ -1,27 +0,0 @@
import { EventEmitter } from 'events';
export interface PubSubEvents {
'update': (accountId: string, update: {
id: string,
seq: number,
body: any,
createdAt: number
}) => void;
'update-ephemeral': (accountId: string, update: { type: 'activity', id: string, active: boolean, activeAt: number, thinking: boolean }) => void;
}
class PubSubService extends EventEmitter {
emit<K extends keyof PubSubEvents>(event: K, ...args: Parameters<PubSubEvents[K]>): boolean {
return super.emit(event, ...args);
}
on<K extends keyof PubSubEvents>(event: K, listener: PubSubEvents[K]): this {
return super.on(event, listener);
}
off<K extends keyof PubSubEvents>(event: K, listener: PubSubEvents[K]): this {
return super.off(event, listener);
}
}
export const pubsub = new PubSubService();

View File

@ -1,83 +0,0 @@
import { redis } from '@/services/redis';
import { delay } from '@/utils/delay';
import { forever } from '@/utils/forever';
import { log, warn } from '@/utils/log';
import { shutdownSignal } from '@/utils/shutdown';
const CLEANUP_INTERVAL = 60000; // 1 minute
let started = false;
// Start cleanup worker for all streams
export async function startCleanupWorker() {
if (started) {
return;
}
started = true;
log('Starting Redis cleanup worker');
forever('redis-cleanup', async () => {
try {
const now = Date.now();
// Find all active_consumers:* keys
const keys = await redis.keys('active_consumers:*');
let totalCleaned = 0;
for (const key of keys) {
// Extract stream name from key: active_consumers:streamname
const stream = key.substring('active_consumers:'.length);
// Get all consumers with their expiration times
const consumers = await redis.hgetall(key);
const expiredConsumers: string[] = [];
// Check each consumer's expiration time
for (const [consumerGroup, expirationTime] of Object.entries(consumers)) {
if (parseInt(expirationTime) < now) {
expiredConsumers.push(consumerGroup);
}
}
if (expiredConsumers.length === 0) {
continue;
}
// Delete expired consumer groups
let cleanedCount = 0;
for (const consumerGroup of expiredConsumers) {
try {
await redis.xgroup('DESTROY', stream, consumerGroup);
cleanedCount++;
} catch (err: any) {
// Group might already be deleted or doesn't exist
if (!err.message?.includes('NOGROUP')) {
warn(`Failed to cleanup group ${consumerGroup} from stream ${stream}:`, err);
}
}
}
// Remove all expired consumers from active list at once
if (expiredConsumers.length > 0) {
await redis.hdel(key, ...expiredConsumers);
}
if (cleanedCount > 0) {
log(`Cleaned up ${cleanedCount} expired consumer groups from stream: ${stream}`);
totalCleaned += cleanedCount;
}
}
if (totalCleaned > 0) {
log(`Total cleaned up: ${totalCleaned} consumer groups across all streams`);
}
} catch (err) {
warn('Error during cleanup cycle:', err);
}
// Wait before next cleanup cycle
await delay(CLEANUP_INTERVAL, shutdownSignal);
});
}

View File

@ -1,140 +0,0 @@
import { redis } from '@/services/redis';
import { forever } from '@/utils/forever';
import { AsyncLock } from '@/utils/lock';
import { warn } from '@/utils/log';
import { LRUSet } from '@/utils/lru';
import { randomUUID } from 'crypto';
import Redis from 'ioredis';
import { startCleanupWorker } from './redisCleanup';
import { onShutdown, shutdownSignal } from '@/utils/shutdown';
import { delay } from '@/utils/delay';
const HEARTBEAT_INTERVAL = 30000; // 30 seconds
const TRIM_INTERVAL = 30000; // 30 seconds
export async function startConsumer(
stream: string,
maxSize: number,
handler: (messages: string[]) => void | Promise<void>
) {
startCleanupWorker();
let wasCreated = false;
const consumerGroup = randomUUID();
const received = new LRUSet<string>(maxSize); // Should me not longer than queue size
const client = new Redis(process.env.REDIS_URL!);
const activeConsumersKey = `active_consumers:${stream}`;
const lock = new AsyncLock();
let lastHeartbeat = 0;
//
// Start consumer group loop
//
forever('redis:' + stream, async () => {
//
// Heartbeat
//
if (Date.now() - lastHeartbeat > HEARTBEAT_INTERVAL) {
lastHeartbeat = Date.now();
await client.hset(activeConsumersKey, consumerGroup, lastHeartbeat);
}
//
// Create consumer group at current position
//
if (!wasCreated) {
try {
await client.xgroup('CREATE', stream, consumerGroup, '$', 'MKSTREAM');
} catch (err: any) {
// Ignore if group already exists
if (!err.message?.includes('BUSYGROUP')) {
throw err;
}
}
wasCreated = true;
}
//
// Read messages
//
const results = await client.xreadgroup(
'GROUP', consumerGroup, 'consumer',
'COUNT', 100, // 100 messages
'BLOCK', 5000, // 5 seconds
'STREAMS', stream, '>'
) as [string, [string, string[]][]][] | null;
if (!results || results.length === 0) {
return;
}
const [, messages] = results[0];
if (!messages || messages.length === 0) {
return;
}
// Extract ALL message IDs for acknowledgment
const allMessageIds: string[] = [];
const messageContents: string[] = [];
for (const [messageId, fields] of messages) {
// Always collect ID for acknowledgment
allMessageIds.push(messageId);
// Only process if not already seen
if (!received.has(messageId) && fields.length >= 2) {
messageContents.push(fields[1]);
received.add(messageId);
}
}
// Acknowledge ALL messages at once (including duplicates)
await redis.xack(stream, consumerGroup, ...allMessageIds);
// Only call handler if we have new messages to process
if (messageContents.length === 0) {
return;
}
// Guarantee order of messages
lock.inLock(async () => {
try {
await handler(messageContents);
} catch (err) {
warn(err);
}
});
});
//
// Start trimmer
//
forever('redis:' + stream + ':trimmer', async () => {
await redis.xtrim(stream, 'MAXLEN', '~', maxSize);
await delay(TRIM_INTERVAL, shutdownSignal);
});
//
// Clean up on shutdown
//
onShutdown('redis:' + stream, async () => {
try {
// Destroy consumer group FIRST
await redis.xgroup('DESTROY', stream, consumerGroup);
// Then remove from active consumers
await redis.hdel(activeConsumersKey, consumerGroup);
// Close the blocking client
client.disconnect();
} catch (err) {
// Ignore
}
});
}

View File

@ -1,16 +0,0 @@
import { redis } from "@/services/redis";
export function createRedisProducer(stream: string) {
return async (messages: string[]) => {
if (messages.length === 0) {
return;
}
// Use pipeline for batch publishing
const pipeline = redis.pipeline();
for (const message of messages) {
pipeline.xadd(stream, '*', 'data', message);
}
await pipeline.exec();
}
}

View File

@ -1,272 +0,0 @@
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { Redis } from 'ioredis';
import { startConsumer } from './redisConsumer';
import { createRedisProducer } from './redisProducer';
import { delay } from '@/utils/delay';
// Mock the redis import
vi.mock('@/services/redis', () => ({
redis: new Redis(process.env.REDIS_URL || 'redis://localhost:6379')
}));
// Mock forever to run immediately
vi.mock('@/utils/forever', () => ({
forever: (name: string, fn: () => Promise<void>) => {
// Run the function in a loop with a small delay
const run = async () => {
while (!globalThis.__stopForever) {
await fn();
await new Promise(resolve => setTimeout(resolve, 10));
}
};
run().catch(() => { });
}
}));
// Mock onShutdown to collect callbacks
const shutdownCallbacks: Map<string, () => Promise<void>> = new Map();
vi.mock('@/utils/shutdown', () => ({
onShutdown: (name: string, callback: () => Promise<void>) => {
shutdownCallbacks.set(name, callback);
},
shutdownSignal: { aborted: false }
}));
describe('Redis Queue System', () => {
let redis: Redis;
let testStream: string;
let receivedMessages: string[][] = [];
beforeEach(async () => {
redis = new Redis(process.env.REDIS_URL || 'redis://localhost:6379');
testStream = `test-stream-${Date.now()}`;
receivedMessages = [];
globalThis.__stopForever = false;
shutdownCallbacks.clear();
// Clean up test stream if it exists
try {
await redis.del(testStream);
await redis.del(`active_consumers:${testStream}`);
} catch (err) {
// Ignore
}
});
afterEach(async () => {
globalThis.__stopForever = true;
// Call all shutdown callbacks
for (const callback of shutdownCallbacks.values()) {
await callback();
}
// Clean up
try {
await redis.del(testStream);
await redis.del(`active_consumers:${testStream}`);
// Clean up any consumer groups
const groups = await redis.xinfo('GROUPS', testStream).catch(() => []) as any[];
for (const groupInfo of groups) {
const groupName = groupInfo[1];
await redis.xgroup('DESTROY', testStream, groupName).catch(() => { });
}
} catch (err) {
// Ignore
}
redis.disconnect();
});
it('should produce and consume messages', async () => {
const producer = createRedisProducer(testStream);
// Start consumer
await startConsumer(testStream, 1000, async (messages) => {
receivedMessages.push(messages);
});
// Wait for consumer to be ready
await delay(100);
// Send messages - each becomes a separate stream entry
await producer(['message1', 'message2', 'message3']);
// Wait for messages to be consumed
await delay(200);
// Check received messages - should get all messages (possibly in multiple batches)
const allMessages = receivedMessages.flat();
expect(allMessages).toContain('message1');
expect(allMessages).toContain('message2');
expect(allMessages).toContain('message3');
expect(allMessages).toHaveLength(3);
});
it('should handle multiple consumers', async () => {
const producer = createRedisProducer(testStream);
const received1: string[][] = [];
const received2: string[][] = [];
// Start two consumers
await startConsumer(testStream, 1000, async (messages) => {
received1.push(messages);
});
await startConsumer(testStream, 1000, async (messages) => {
received2.push(messages);
});
// Wait for consumers to be ready
await delay(100);
// Send messages
await producer(['msg1', 'msg2']);
// Wait for messages to be consumed
await delay(200);
// Both consumers should receive all messages
const allMessages1 = received1.flat();
const allMessages2 = received2.flat();
expect(allMessages1).toContain('msg1');
expect(allMessages1).toContain('msg2');
expect(allMessages1).toHaveLength(2);
expect(allMessages2).toContain('msg1');
expect(allMessages2).toContain('msg2');
expect(allMessages2).toHaveLength(2);
});
it('should track active consumers', async () => {
// Start consumer
await startConsumer(testStream, 1000, async () => { });
// Wait for registration
await delay(100);
// Check active consumers
const activeConsumers = await redis.hgetall(`active_consumers:${testStream}`);
expect(Object.keys(activeConsumers)).toHaveLength(1);
// Check that consumer has a timestamp
const consumerGroup = Object.keys(activeConsumers)[0];
const timestamp = parseInt(activeConsumers[consumerGroup]);
expect(timestamp).toBeGreaterThan(0);
expect(timestamp).toBeLessThanOrEqual(Date.now());
});
it('should clean up on shutdown', async () => {
// Start consumer
await startConsumer(testStream, 1000, async () => { });
// Wait for registration
await delay(100);
// Get consumer group
const activeConsumers = await redis.hgetall(`active_consumers:${testStream}`);
const consumerGroup = Object.keys(activeConsumers)[0];
// Verify consumer group exists
const groups = (await redis.xinfo('GROUPS', testStream)) as any[];
expect(groups.length).toBeGreaterThan(0);
// Call shutdown
const shutdownCallback = shutdownCallbacks.get(`redis:${testStream}`);
expect(shutdownCallback).toBeDefined();
await shutdownCallback!();
// Check that consumer was removed from active list
const activeAfterShutdown = await redis.hgetall(`active_consumers:${testStream}`);
expect(Object.keys(activeAfterShutdown)).toHaveLength(0);
// Check that consumer group was destroyed
const groupsAfterShutdown = (await redis.xinfo('GROUPS', testStream).catch(() => [])) as any[];
const groupNames = groupsAfterShutdown.map((g: any) => g[1]);
expect(groupNames).not.toContain(consumerGroup);
});
it('should handle empty message batches', async () => {
const producer = createRedisProducer(testStream);
// Try to produce empty array
await producer([]);
// Should not throw error
expect(true).toBe(true);
});
// it('should handle empty message batches', async () => {
// const producer = createRedisProducer(testStream, 1000);
// // Try to produce empty array
// await producer([]);
// // Should not throw error
// expect(true).toBe(true);
// });
it('should handle concurrent message processing', async () => {
const producer = createRedisProducer(testStream);
const processedMessages: string[] = [];
// Start consumer with delay to simulate processing
await startConsumer(testStream, 1000, async (messages) => {
await delay(50); // Simulate processing time
processedMessages.push(...messages);
});
// Wait for consumer to be ready
await delay(100);
// Send multiple batches quickly
await producer(['batch1-msg1', 'batch1-msg2']);
await producer(['batch2-msg1']);
await producer(['batch3-msg1', 'batch3-msg2', 'batch3-msg3']);
// Wait for all messages to be processed
await delay(1000);
// Should have processed all messages
expect(processedMessages).toHaveLength(6);
expect(processedMessages).toContain('batch1-msg1');
expect(processedMessages).toContain('batch2-msg1');
expect(processedMessages).toContain('batch3-msg3');
});
it('should update heartbeat periodically', async () => {
// Start consumer
await startConsumer(testStream, 1000, async () => { });
// Wait for initial registration
await delay(100);
// Get initial timestamp
const initial = await redis.hgetall(`active_consumers:${testStream}`);
const consumerGroup = Object.keys(initial)[0];
const initialTimestamp = parseInt(initial[consumerGroup]);
// Force some message reads to trigger heartbeat update
const producer = createRedisProducer(testStream);
await producer(['trigger']);
// Wait a bit
await delay(200);
// Check if heartbeat was updated
const updated = await redis.hget(`active_consumers:${testStream}`, consumerGroup);
const updatedTimestamp = parseInt(updated!);
// Timestamp should be valid
expect(updatedTimestamp).toBeGreaterThan(0);
expect(updatedTimestamp).toBeLessThanOrEqual(Date.now());
});
});
// Global cleanup
declare global {
var __stopForever: boolean;
}

View File

@ -1,4 +1,4 @@
import { GitHubProfile as GitHubProfileType, GitHubOrg as GitHubOrgType } from "../app/types";
import { GitHubProfile as GitHubProfileType, GitHubOrg as GitHubOrgType } from "../app/api/types";
import { ImageRef as ImageRefType } from "./files";
declare global {
namespace PrismaJson {

View File

@ -1,4 +1,4 @@
import { GitHubProfile } from "./app/types";
import { GitHubProfile } from "./app/api/types";
import { ImageRef } from "./storage/files";
export type AccountProfile = {