From 5379043a1456726fb444a62420046876fcc115fe Mon Sep 17 00:00:00 2001 From: Steve Korshakov Date: Mon, 1 Sep 2025 14:04:30 -0700 Subject: [PATCH] ref: moving files around --- .../migration.sql | 22 ++ prisma/schema.prisma | 16 ++ sources/app/{ => api}/api.ts | 8 +- sources/app/{ => api}/types.ts | 14 +- sources/{modules => app/auth}/auth.ts | 0 sources/app/{ => monitoring}/metrics.ts | 2 +- .../metrics.ts => app/monitoring/metrics2.ts} | 0 .../{modules => app/presence}/sessionCache.ts | 2 +- sources/app/{ => presence}/timeout.ts | 0 sources/main.ts | 14 +- sources/modules/eventRouter.ts | 2 +- sources/services/pubsub.ts | 27 -- sources/services/queue/redisCleanup.ts | 83 ------ sources/services/queue/redisConsumer.ts | 140 --------- sources/services/queue/redisProducer.ts | 16 -- sources/services/queue/redisQueue.spec.ts | 272 ------------------ sources/{services => storage}/redis.ts | 0 sources/{services => storage}/seq.ts | 0 sources/storage/types.ts | 2 +- sources/types.ts | 2 +- 20 files changed, 67 insertions(+), 555 deletions(-) create mode 100644 prisma/migrations/20250901205028_add_service_account_tokens/migration.sql rename sources/app/{ => api}/api.ts (99%) rename sources/app/{ => api}/types.ts (70%) rename sources/{modules => app/auth}/auth.ts (100%) rename sources/app/{ => monitoring}/metrics.ts (97%) rename sources/{modules/metrics.ts => app/monitoring/metrics2.ts} (100%) rename sources/{modules => app/presence}/sessionCache.ts (99%) rename sources/app/{ => presence}/timeout.ts (100%) delete mode 100644 sources/services/pubsub.ts delete mode 100644 sources/services/queue/redisCleanup.ts delete mode 100644 sources/services/queue/redisConsumer.ts delete mode 100644 sources/services/queue/redisProducer.ts delete mode 100644 sources/services/queue/redisQueue.spec.ts rename sources/{services => storage}/redis.ts (100%) rename sources/{services => storage}/seq.ts (100%) diff --git a/prisma/migrations/20250901205028_add_service_account_tokens/migration.sql b/prisma/migrations/20250901205028_add_service_account_tokens/migration.sql new file mode 100644 index 0000000..267bcad --- /dev/null +++ b/prisma/migrations/20250901205028_add_service_account_tokens/migration.sql @@ -0,0 +1,22 @@ +-- CreateTable +CREATE TABLE "ServiceAccountToken" ( + "id" TEXT NOT NULL, + "accountId" TEXT NOT NULL, + "vendor" TEXT NOT NULL, + "token" BYTEA NOT NULL, + "metadata" JSONB, + "lastUsedAt" TIMESTAMP(3), + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "ServiceAccountToken_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "ServiceAccountToken_accountId_idx" ON "ServiceAccountToken"("accountId"); + +-- CreateIndex +CREATE UNIQUE INDEX "ServiceAccountToken_accountId_vendor_key" ON "ServiceAccountToken"("accountId", "vendor"); + +-- AddForeignKey +ALTER TABLE "ServiceAccountToken" ADD CONSTRAINT "ServiceAccountToken_accountId_fkey" FOREIGN KEY ("accountId") REFERENCES "Account"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index a0405db..8dfc2ea 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -43,6 +43,7 @@ model Account { UsageReport UsageReport[] Machine Machine[] UploadedFile UploadedFile[] + ServiceAccountToken ServiceAccountToken[] } model TerminalAuthRequest { @@ -222,3 +223,18 @@ model UploadedFile { @@unique([accountId, path]) @@index([accountId]) } + +model ServiceAccountToken { + id String @id @default(cuid()) + accountId String + account Account @relation(fields: [accountId], references: [id], onDelete: Cascade) + vendor String + token Bytes // Encrypted token + metadata Json? // Optional vendor metadata + lastUsedAt DateTime? + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@unique([accountId, vendor]) + @@index([accountId]) +} diff --git a/sources/app/api.ts b/sources/app/api/api.ts similarity index 99% rename from sources/app/api.ts rename to sources/app/api/api.ts index e3161d2..b133965 100644 --- a/sources/app/api.ts +++ b/sources/app/api/api.ts @@ -7,10 +7,10 @@ import * as privacyKit from "privacy-kit"; import { db } from "@/storage/db"; import { Account, Prisma } from "@prisma/client"; import { onShutdown } from "@/utils/shutdown"; -import { allocateSessionSeq, allocateUserSeq } from "@/services/seq"; +import { allocateSessionSeq, allocateUserSeq } from "@/storage/seq"; import { randomKeyNaked } from "@/utils/randomKeyNaked"; import { AsyncLock } from "@/utils/lock"; -import { auth } from "@/modules/auth"; +import { auth } from "@/app/auth/auth"; import { EventRouter, ClientConnection, @@ -31,8 +31,8 @@ import { websocketEventsCounter, httpRequestsCounter, httpRequestDurationHistogram -} from "@/modules/metrics"; -import { activityCache } from "@/modules/sessionCache"; +} from "@/app/monitoring/metrics2"; +import { activityCache } from "@/app/presence/sessionCache"; import { encryptBytes, encryptString } from "@/modules/encrypt"; import { GitHubProfile } from "./types"; import { uploadImage } from "@/storage/uploadImage"; diff --git a/sources/app/types.ts b/sources/app/api/types.ts similarity index 70% rename from sources/app/types.ts rename to sources/app/api/types.ts index b2a064a..64425ea 100644 --- a/sources/app/types.ts +++ b/sources/app/api/types.ts @@ -1,3 +1,7 @@ +import { FastifyBaseLogger, FastifyInstance } from "fastify"; +import { ZodTypeProvider } from "fastify-type-provider-zod"; +import { IncomingMessage, Server, ServerResponse } from "http"; + export interface GitHubProfile { id: number; login: string; @@ -36,4 +40,12 @@ export interface GitHubProfile { export interface GitHubOrg { -} \ No newline at end of file +} + +export type Fastify = FastifyInstance< + Server, + IncomingMessage, + ServerResponse, + FastifyBaseLogger, + ZodTypeProvider +>; \ No newline at end of file diff --git a/sources/modules/auth.ts b/sources/app/auth/auth.ts similarity index 100% rename from sources/modules/auth.ts rename to sources/app/auth/auth.ts diff --git a/sources/app/metrics.ts b/sources/app/monitoring/metrics.ts similarity index 97% rename from sources/app/metrics.ts rename to sources/app/monitoring/metrics.ts index d36c6b3..41b9335 100644 --- a/sources/app/metrics.ts +++ b/sources/app/monitoring/metrics.ts @@ -1,6 +1,6 @@ import fastify from 'fastify'; import { db } from '@/storage/db'; -import { register } from '@/modules/metrics'; +import { register } from '@/app/monitoring/metrics2'; import { log } from '@/utils/log'; export async function createMetricsServer() { diff --git a/sources/modules/metrics.ts b/sources/app/monitoring/metrics2.ts similarity index 100% rename from sources/modules/metrics.ts rename to sources/app/monitoring/metrics2.ts diff --git a/sources/modules/sessionCache.ts b/sources/app/presence/sessionCache.ts similarity index 99% rename from sources/modules/sessionCache.ts rename to sources/app/presence/sessionCache.ts index fa1f19d..c372860 100644 --- a/sources/modules/sessionCache.ts +++ b/sources/app/presence/sessionCache.ts @@ -1,6 +1,6 @@ import { db } from "@/storage/db"; import { log } from "@/utils/log"; -import { sessionCacheCounter, databaseUpdatesSkippedCounter } from "@/modules/metrics"; +import { sessionCacheCounter, databaseUpdatesSkippedCounter } from "@/app/monitoring/metrics2"; interface SessionCacheEntry { validUntil: number; diff --git a/sources/app/timeout.ts b/sources/app/presence/timeout.ts similarity index 100% rename from sources/app/timeout.ts rename to sources/app/presence/timeout.ts diff --git a/sources/main.ts b/sources/main.ts index e636105..c5155f5 100644 --- a/sources/main.ts +++ b/sources/main.ts @@ -1,13 +1,13 @@ -import { startApi } from "@/app/api"; +import { startApi } from "@/app/api/api"; import { log } from "@/utils/log"; import { awaitShutdown, onShutdown } from "@/utils/shutdown"; import { db } from './storage/db'; -import { startTimeout } from "./app/timeout"; -import { redis } from "./services/redis"; -import { startMetricsServer } from "@/app/metrics"; -import { activityCache } from "@/modules/sessionCache"; -import { auth } from "./modules/auth"; -import { startDatabaseMetricsUpdater } from "@/modules/metrics"; +import { startTimeout } from "./app/presence/timeout"; +import { redis } from "./storage/redis"; +import { startMetricsServer } from "@/app/monitoring/metrics"; +import { activityCache } from "@/app/presence/sessionCache"; +import { auth } from "./app/auth/auth"; +import { startDatabaseMetricsUpdater } from "@/app/monitoring/metrics2"; import { initEncrypt } from "./modules/encrypt"; import { initGithub } from "./modules/github"; import { loadFiles } from "./storage/files"; diff --git a/sources/modules/eventRouter.ts b/sources/modules/eventRouter.ts index 0f8bbb9..4c9d3f8 100644 --- a/sources/modules/eventRouter.ts +++ b/sources/modules/eventRouter.ts @@ -1,6 +1,6 @@ import { Socket } from "socket.io"; import { log } from "@/utils/log"; -import { GitHubProfile } from "@/app/types"; +import { GitHubProfile } from "@/app/api/types"; import { AccountProfile } from "@/types"; import { getPublicUrl } from "@/storage/files"; diff --git a/sources/services/pubsub.ts b/sources/services/pubsub.ts deleted file mode 100644 index 0e57839..0000000 --- a/sources/services/pubsub.ts +++ /dev/null @@ -1,27 +0,0 @@ -import { EventEmitter } from 'events'; - -export interface PubSubEvents { - 'update': (accountId: string, update: { - id: string, - seq: number, - body: any, - createdAt: number - }) => void; - 'update-ephemeral': (accountId: string, update: { type: 'activity', id: string, active: boolean, activeAt: number, thinking: boolean }) => void; -} - -class PubSubService extends EventEmitter { - emit(event: K, ...args: Parameters): boolean { - return super.emit(event, ...args); - } - - on(event: K, listener: PubSubEvents[K]): this { - return super.on(event, listener); - } - - off(event: K, listener: PubSubEvents[K]): this { - return super.off(event, listener); - } -} - -export const pubsub = new PubSubService(); \ No newline at end of file diff --git a/sources/services/queue/redisCleanup.ts b/sources/services/queue/redisCleanup.ts deleted file mode 100644 index 085752c..0000000 --- a/sources/services/queue/redisCleanup.ts +++ /dev/null @@ -1,83 +0,0 @@ -import { redis } from '@/services/redis'; -import { delay } from '@/utils/delay'; -import { forever } from '@/utils/forever'; -import { log, warn } from '@/utils/log'; -import { shutdownSignal } from '@/utils/shutdown'; - -const CLEANUP_INTERVAL = 60000; // 1 minute -let started = false; - -// Start cleanup worker for all streams -export async function startCleanupWorker() { - if (started) { - return; - } - started = true; - log('Starting Redis cleanup worker'); - - forever('redis-cleanup', async () => { - try { - const now = Date.now(); - - // Find all active_consumers:* keys - const keys = await redis.keys('active_consumers:*'); - - let totalCleaned = 0; - - for (const key of keys) { - // Extract stream name from key: active_consumers:streamname - const stream = key.substring('active_consumers:'.length); - - // Get all consumers with their expiration times - const consumers = await redis.hgetall(key); - - const expiredConsumers: string[] = []; - - // Check each consumer's expiration time - for (const [consumerGroup, expirationTime] of Object.entries(consumers)) { - if (parseInt(expirationTime) < now) { - expiredConsumers.push(consumerGroup); - } - } - - if (expiredConsumers.length === 0) { - continue; - } - - // Delete expired consumer groups - let cleanedCount = 0; - for (const consumerGroup of expiredConsumers) { - try { - await redis.xgroup('DESTROY', stream, consumerGroup); - cleanedCount++; - } catch (err: any) { - // Group might already be deleted or doesn't exist - if (!err.message?.includes('NOGROUP')) { - warn(`Failed to cleanup group ${consumerGroup} from stream ${stream}:`, err); - } - } - } - - // Remove all expired consumers from active list at once - if (expiredConsumers.length > 0) { - await redis.hdel(key, ...expiredConsumers); - } - - if (cleanedCount > 0) { - log(`Cleaned up ${cleanedCount} expired consumer groups from stream: ${stream}`); - totalCleaned += cleanedCount; - } - } - - if (totalCleaned > 0) { - log(`Total cleaned up: ${totalCleaned} consumer groups across all streams`); - } - } catch (err) { - warn('Error during cleanup cycle:', err); - } - - // Wait before next cleanup cycle - await delay(CLEANUP_INTERVAL, shutdownSignal); - }); -} - diff --git a/sources/services/queue/redisConsumer.ts b/sources/services/queue/redisConsumer.ts deleted file mode 100644 index 4012ee5..0000000 --- a/sources/services/queue/redisConsumer.ts +++ /dev/null @@ -1,140 +0,0 @@ -import { redis } from '@/services/redis'; -import { forever } from '@/utils/forever'; -import { AsyncLock } from '@/utils/lock'; -import { warn } from '@/utils/log'; -import { LRUSet } from '@/utils/lru'; -import { randomUUID } from 'crypto'; -import Redis from 'ioredis'; -import { startCleanupWorker } from './redisCleanup'; -import { onShutdown, shutdownSignal } from '@/utils/shutdown'; -import { delay } from '@/utils/delay'; - -const HEARTBEAT_INTERVAL = 30000; // 30 seconds -const TRIM_INTERVAL = 30000; // 30 seconds - -export async function startConsumer( - stream: string, - maxSize: number, - handler: (messages: string[]) => void | Promise -) { - startCleanupWorker(); - let wasCreated = false; - const consumerGroup = randomUUID(); - const received = new LRUSet(maxSize); // Should me not longer than queue size - const client = new Redis(process.env.REDIS_URL!); - const activeConsumersKey = `active_consumers:${stream}`; - const lock = new AsyncLock(); - let lastHeartbeat = 0; - - // - // Start consumer group loop - // - - forever('redis:' + stream, async () => { - - // - // Heartbeat - // - - if (Date.now() - lastHeartbeat > HEARTBEAT_INTERVAL) { - lastHeartbeat = Date.now(); - await client.hset(activeConsumersKey, consumerGroup, lastHeartbeat); - } - - // - // Create consumer group at current position - // - - if (!wasCreated) { - try { - await client.xgroup('CREATE', stream, consumerGroup, '$', 'MKSTREAM'); - } catch (err: any) { - // Ignore if group already exists - if (!err.message?.includes('BUSYGROUP')) { - throw err; - } - } - wasCreated = true; - } - - // - // Read messages - // - - const results = await client.xreadgroup( - 'GROUP', consumerGroup, 'consumer', - 'COUNT', 100, // 100 messages - 'BLOCK', 5000, // 5 seconds - 'STREAMS', stream, '>' - ) as [string, [string, string[]][]][] | null; - - if (!results || results.length === 0) { - return; - } - - const [, messages] = results[0]; - if (!messages || messages.length === 0) { - return; - } - - // Extract ALL message IDs for acknowledgment - const allMessageIds: string[] = []; - const messageContents: string[] = []; - - for (const [messageId, fields] of messages) { - // Always collect ID for acknowledgment - allMessageIds.push(messageId); - - // Only process if not already seen - if (!received.has(messageId) && fields.length >= 2) { - messageContents.push(fields[1]); - received.add(messageId); - } - } - - // Acknowledge ALL messages at once (including duplicates) - await redis.xack(stream, consumerGroup, ...allMessageIds); - - // Only call handler if we have new messages to process - if (messageContents.length === 0) { - return; - } - - // Guarantee order of messages - lock.inLock(async () => { - try { - await handler(messageContents); - } catch (err) { - warn(err); - } - }); - - }); - - // - // Start trimmer - // - - forever('redis:' + stream + ':trimmer', async () => { - await redis.xtrim(stream, 'MAXLEN', '~', maxSize); - await delay(TRIM_INTERVAL, shutdownSignal); - }); - - // - // Clean up on shutdown - // - - onShutdown('redis:' + stream, async () => { - try { - // Destroy consumer group FIRST - await redis.xgroup('DESTROY', stream, consumerGroup); - // Then remove from active consumers - await redis.hdel(activeConsumersKey, consumerGroup); - // Close the blocking client - client.disconnect(); - } catch (err) { - // Ignore - } - }); - -} diff --git a/sources/services/queue/redisProducer.ts b/sources/services/queue/redisProducer.ts deleted file mode 100644 index cd1b0f6..0000000 --- a/sources/services/queue/redisProducer.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { redis } from "@/services/redis"; - -export function createRedisProducer(stream: string) { - return async (messages: string[]) => { - if (messages.length === 0) { - return; - } - - // Use pipeline for batch publishing - const pipeline = redis.pipeline(); - for (const message of messages) { - pipeline.xadd(stream, '*', 'data', message); - } - await pipeline.exec(); - } -} \ No newline at end of file diff --git a/sources/services/queue/redisQueue.spec.ts b/sources/services/queue/redisQueue.spec.ts deleted file mode 100644 index 489935e..0000000 --- a/sources/services/queue/redisQueue.spec.ts +++ /dev/null @@ -1,272 +0,0 @@ -import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; -import { Redis } from 'ioredis'; -import { startConsumer } from './redisConsumer'; -import { createRedisProducer } from './redisProducer'; -import { delay } from '@/utils/delay'; - -// Mock the redis import -vi.mock('@/services/redis', () => ({ - redis: new Redis(process.env.REDIS_URL || 'redis://localhost:6379') -})); - -// Mock forever to run immediately -vi.mock('@/utils/forever', () => ({ - forever: (name: string, fn: () => Promise) => { - // Run the function in a loop with a small delay - const run = async () => { - while (!globalThis.__stopForever) { - await fn(); - await new Promise(resolve => setTimeout(resolve, 10)); - } - }; - run().catch(() => { }); - } -})); - -// Mock onShutdown to collect callbacks -const shutdownCallbacks: Map Promise> = new Map(); -vi.mock('@/utils/shutdown', () => ({ - onShutdown: (name: string, callback: () => Promise) => { - shutdownCallbacks.set(name, callback); - }, - shutdownSignal: { aborted: false } -})); - -describe('Redis Queue System', () => { - let redis: Redis; - let testStream: string; - let receivedMessages: string[][] = []; - - beforeEach(async () => { - redis = new Redis(process.env.REDIS_URL || 'redis://localhost:6379'); - testStream = `test-stream-${Date.now()}`; - receivedMessages = []; - globalThis.__stopForever = false; - shutdownCallbacks.clear(); - - // Clean up test stream if it exists - try { - await redis.del(testStream); - await redis.del(`active_consumers:${testStream}`); - } catch (err) { - // Ignore - } - }); - - afterEach(async () => { - globalThis.__stopForever = true; - - // Call all shutdown callbacks - for (const callback of shutdownCallbacks.values()) { - await callback(); - } - - // Clean up - try { - await redis.del(testStream); - await redis.del(`active_consumers:${testStream}`); - - // Clean up any consumer groups - const groups = await redis.xinfo('GROUPS', testStream).catch(() => []) as any[]; - for (const groupInfo of groups) { - const groupName = groupInfo[1]; - await redis.xgroup('DESTROY', testStream, groupName).catch(() => { }); - } - } catch (err) { - // Ignore - } - - redis.disconnect(); - }); - - it('should produce and consume messages', async () => { - const producer = createRedisProducer(testStream); - - // Start consumer - await startConsumer(testStream, 1000, async (messages) => { - receivedMessages.push(messages); - }); - - // Wait for consumer to be ready - await delay(100); - - // Send messages - each becomes a separate stream entry - await producer(['message1', 'message2', 'message3']); - - // Wait for messages to be consumed - await delay(200); - - // Check received messages - should get all messages (possibly in multiple batches) - const allMessages = receivedMessages.flat(); - expect(allMessages).toContain('message1'); - expect(allMessages).toContain('message2'); - expect(allMessages).toContain('message3'); - expect(allMessages).toHaveLength(3); - }); - - it('should handle multiple consumers', async () => { - const producer = createRedisProducer(testStream); - const received1: string[][] = []; - const received2: string[][] = []; - - // Start two consumers - await startConsumer(testStream, 1000, async (messages) => { - received1.push(messages); - }); - - await startConsumer(testStream, 1000, async (messages) => { - received2.push(messages); - }); - - // Wait for consumers to be ready - await delay(100); - - // Send messages - await producer(['msg1', 'msg2']); - - // Wait for messages to be consumed - await delay(200); - - // Both consumers should receive all messages - const allMessages1 = received1.flat(); - const allMessages2 = received2.flat(); - - expect(allMessages1).toContain('msg1'); - expect(allMessages1).toContain('msg2'); - expect(allMessages1).toHaveLength(2); - - expect(allMessages2).toContain('msg1'); - expect(allMessages2).toContain('msg2'); - expect(allMessages2).toHaveLength(2); - }); - - it('should track active consumers', async () => { - // Start consumer - await startConsumer(testStream, 1000, async () => { }); - - // Wait for registration - await delay(100); - - // Check active consumers - const activeConsumers = await redis.hgetall(`active_consumers:${testStream}`); - expect(Object.keys(activeConsumers)).toHaveLength(1); - - // Check that consumer has a timestamp - const consumerGroup = Object.keys(activeConsumers)[0]; - const timestamp = parseInt(activeConsumers[consumerGroup]); - expect(timestamp).toBeGreaterThan(0); - expect(timestamp).toBeLessThanOrEqual(Date.now()); - }); - - it('should clean up on shutdown', async () => { - // Start consumer - await startConsumer(testStream, 1000, async () => { }); - - // Wait for registration - await delay(100); - - // Get consumer group - const activeConsumers = await redis.hgetall(`active_consumers:${testStream}`); - const consumerGroup = Object.keys(activeConsumers)[0]; - - // Verify consumer group exists - const groups = (await redis.xinfo('GROUPS', testStream)) as any[]; - expect(groups.length).toBeGreaterThan(0); - - // Call shutdown - const shutdownCallback = shutdownCallbacks.get(`redis:${testStream}`); - expect(shutdownCallback).toBeDefined(); - await shutdownCallback!(); - - // Check that consumer was removed from active list - const activeAfterShutdown = await redis.hgetall(`active_consumers:${testStream}`); - expect(Object.keys(activeAfterShutdown)).toHaveLength(0); - - // Check that consumer group was destroyed - const groupsAfterShutdown = (await redis.xinfo('GROUPS', testStream).catch(() => [])) as any[]; - const groupNames = groupsAfterShutdown.map((g: any) => g[1]); - expect(groupNames).not.toContain(consumerGroup); - }); - - it('should handle empty message batches', async () => { - const producer = createRedisProducer(testStream); - - // Try to produce empty array - await producer([]); - - // Should not throw error - expect(true).toBe(true); - }); - - // it('should handle empty message batches', async () => { - // const producer = createRedisProducer(testStream, 1000); - - // // Try to produce empty array - // await producer([]); - - // // Should not throw error - // expect(true).toBe(true); - // }); - - - it('should handle concurrent message processing', async () => { - const producer = createRedisProducer(testStream); - const processedMessages: string[] = []; - - // Start consumer with delay to simulate processing - await startConsumer(testStream, 1000, async (messages) => { - await delay(50); // Simulate processing time - processedMessages.push(...messages); - }); - - // Wait for consumer to be ready - await delay(100); - - // Send multiple batches quickly - await producer(['batch1-msg1', 'batch1-msg2']); - await producer(['batch2-msg1']); - await producer(['batch3-msg1', 'batch3-msg2', 'batch3-msg3']); - - // Wait for all messages to be processed - await delay(1000); - - // Should have processed all messages - expect(processedMessages).toHaveLength(6); - expect(processedMessages).toContain('batch1-msg1'); - expect(processedMessages).toContain('batch2-msg1'); - expect(processedMessages).toContain('batch3-msg3'); - }); - - it('should update heartbeat periodically', async () => { - // Start consumer - await startConsumer(testStream, 1000, async () => { }); - - // Wait for initial registration - await delay(100); - - // Get initial timestamp - const initial = await redis.hgetall(`active_consumers:${testStream}`); - const consumerGroup = Object.keys(initial)[0]; - const initialTimestamp = parseInt(initial[consumerGroup]); - - // Force some message reads to trigger heartbeat update - const producer = createRedisProducer(testStream); - await producer(['trigger']); - - // Wait a bit - await delay(200); - - // Check if heartbeat was updated - const updated = await redis.hget(`active_consumers:${testStream}`, consumerGroup); - const updatedTimestamp = parseInt(updated!); - - // Timestamp should be valid - expect(updatedTimestamp).toBeGreaterThan(0); - expect(updatedTimestamp).toBeLessThanOrEqual(Date.now()); - }); -}); - -// Global cleanup -declare global { - var __stopForever: boolean; -} \ No newline at end of file diff --git a/sources/services/redis.ts b/sources/storage/redis.ts similarity index 100% rename from sources/services/redis.ts rename to sources/storage/redis.ts diff --git a/sources/services/seq.ts b/sources/storage/seq.ts similarity index 100% rename from sources/services/seq.ts rename to sources/storage/seq.ts diff --git a/sources/storage/types.ts b/sources/storage/types.ts index 25a00b4..406d338 100644 --- a/sources/storage/types.ts +++ b/sources/storage/types.ts @@ -1,4 +1,4 @@ -import { GitHubProfile as GitHubProfileType, GitHubOrg as GitHubOrgType } from "../app/types"; +import { GitHubProfile as GitHubProfileType, GitHubOrg as GitHubOrgType } from "../app/api/types"; import { ImageRef as ImageRefType } from "./files"; declare global { namespace PrismaJson { diff --git a/sources/types.ts b/sources/types.ts index eab462f..420953a 100644 --- a/sources/types.ts +++ b/sources/types.ts @@ -1,4 +1,4 @@ -import { GitHubProfile } from "./app/types"; +import { GitHubProfile } from "./app/api/types"; import { ImageRef } from "./storage/files"; export type AccountProfile = {