diff --git a/package.json b/package.json index d1e277d..ba407d9 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ "build": "tsc --noEmit", "start": "tsx ./sources/main.ts", "dev": "tsx --env-file=.env.example ./sources/main.ts", - "test": "vitest", + "test": "vitest run", "migrate": "dotenv -e .env.example -- prisma migrate dev", "generate": "prisma generate", "postinstall": "prisma generate", diff --git a/sources/app/api.ts b/sources/app/api.ts index 33a0fda..941b8e5 100644 --- a/sources/app/api.ts +++ b/sources/app/api.ts @@ -7,6 +7,7 @@ import * as privacyKit from "privacy-kit"; import * as tweetnacl from "tweetnacl"; import { db } from "@/storage/db"; import { Account, Update } from "@prisma/client"; +import { onShutdown } from "@/utils/shutdown"; // Connection metadata types interface SessionScopedConnection { @@ -566,17 +567,17 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> }); // Aggregate data by time period - const aggregated = new Map; - cost: Record; - count: number; + const aggregated = new Map; + cost: Record; + count: number; timestamp: number; }>(); for (const report of reports) { const data = report.data as PrismaJson.UsageReportData; const date = new Date(report.createdAt); - + // Calculate timestamp based on groupBy let timestamp: number; if (actualGroupBy === 'hour') { @@ -1046,7 +1047,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> callback({ result: 'error' }); return null; } - + // Verify session belongs to user and lock it const session = await tx.session.findFirst({ where: { @@ -1142,7 +1143,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> callback({ result: 'error' }); return null; } - + // Verify session belongs to user and lock it const session = await tx.session.findFirst({ where: { @@ -1470,8 +1471,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> } if (callback) { - callback({ - success: true, + callback({ + success: true, reportId: report.id, createdAt: report.createdAt.getTime(), updatedAt: report.updatedAt.getTime() @@ -1491,6 +1492,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }> // End log('API ready on port http://localhost:' + port); - + + onShutdown('api', async () => { + await app.close(); + }); + onShutdown('api', async () => { + await io.close(); + }); + return { app, io }; } \ No newline at end of file diff --git a/sources/app/timeout.ts b/sources/app/timeout.ts index b463fbf..d0f36f1 100644 --- a/sources/app/timeout.ts +++ b/sources/app/timeout.ts @@ -1,6 +1,6 @@ import { pubsub } from "@/services/pubsub"; import { db } from "@/storage/db"; -import { backoff, delay } from "@/utils/time"; +import { backoff, delay } from "@/utils/delay"; export function startTimeout() { backoff(async () => { diff --git a/sources/main.ts b/sources/main.ts index 58e33c2..8fcb79f 100644 --- a/sources/main.ts +++ b/sources/main.ts @@ -1,6 +1,6 @@ import { startApi } from "@/app/api"; import { log } from "@/utils/log"; -import { awaitShutdown } from "@/utils/shutdown"; +import { awaitShutdown, onShutdown } from "@/utils/shutdown"; import { db } from './storage/db'; import { startTimeout } from "./app/timeout"; import { redis } from "./services/redis"; @@ -9,13 +9,16 @@ async function main() { // Storage await db.$connect(); + onShutdown('db', async () => { + await db.$disconnect(); + }); await redis.ping(); // // Start // - const { app, io } = await startApi(); + await startApi(); startTimeout(); // @@ -25,22 +28,11 @@ async function main() { log('Ready'); await awaitShutdown(); log('Shutting down...'); - - // Close Socket.io connections - io.close(() => { - log('Socket.io closed'); - }); - - // Close Fastify server - await app.close(); - log('Fastify server closed'); } -main().catch(async (e) => { +main().catch((e) => { console.error(e); - await db.$disconnect(); process.exit(1); -}).then(async () => { - log('Disconnecting from DB...'); - await db.$disconnect(); +}).then(() => { + process.exit(0); }); \ No newline at end of file diff --git a/sources/services/queue/redisCleanup.ts b/sources/services/queue/redisCleanup.ts new file mode 100644 index 0000000..085752c --- /dev/null +++ b/sources/services/queue/redisCleanup.ts @@ -0,0 +1,83 @@ +import { redis } from '@/services/redis'; +import { delay } from '@/utils/delay'; +import { forever } from '@/utils/forever'; +import { log, warn } from '@/utils/log'; +import { shutdownSignal } from '@/utils/shutdown'; + +const CLEANUP_INTERVAL = 60000; // 1 minute +let started = false; + +// Start cleanup worker for all streams +export async function startCleanupWorker() { + if (started) { + return; + } + started = true; + log('Starting Redis cleanup worker'); + + forever('redis-cleanup', async () => { + try { + const now = Date.now(); + + // Find all active_consumers:* keys + const keys = await redis.keys('active_consumers:*'); + + let totalCleaned = 0; + + for (const key of keys) { + // Extract stream name from key: active_consumers:streamname + const stream = key.substring('active_consumers:'.length); + + // Get all consumers with their expiration times + const consumers = await redis.hgetall(key); + + const expiredConsumers: string[] = []; + + // Check each consumer's expiration time + for (const [consumerGroup, expirationTime] of Object.entries(consumers)) { + if (parseInt(expirationTime) < now) { + expiredConsumers.push(consumerGroup); + } + } + + if (expiredConsumers.length === 0) { + continue; + } + + // Delete expired consumer groups + let cleanedCount = 0; + for (const consumerGroup of expiredConsumers) { + try { + await redis.xgroup('DESTROY', stream, consumerGroup); + cleanedCount++; + } catch (err: any) { + // Group might already be deleted or doesn't exist + if (!err.message?.includes('NOGROUP')) { + warn(`Failed to cleanup group ${consumerGroup} from stream ${stream}:`, err); + } + } + } + + // Remove all expired consumers from active list at once + if (expiredConsumers.length > 0) { + await redis.hdel(key, ...expiredConsumers); + } + + if (cleanedCount > 0) { + log(`Cleaned up ${cleanedCount} expired consumer groups from stream: ${stream}`); + totalCleaned += cleanedCount; + } + } + + if (totalCleaned > 0) { + log(`Total cleaned up: ${totalCleaned} consumer groups across all streams`); + } + } catch (err) { + warn('Error during cleanup cycle:', err); + } + + // Wait before next cleanup cycle + await delay(CLEANUP_INTERVAL, shutdownSignal); + }); +} + diff --git a/sources/services/queue/redisConsumer.ts b/sources/services/queue/redisConsumer.ts new file mode 100644 index 0000000..4012ee5 --- /dev/null +++ b/sources/services/queue/redisConsumer.ts @@ -0,0 +1,140 @@ +import { redis } from '@/services/redis'; +import { forever } from '@/utils/forever'; +import { AsyncLock } from '@/utils/lock'; +import { warn } from '@/utils/log'; +import { LRUSet } from '@/utils/lru'; +import { randomUUID } from 'crypto'; +import Redis from 'ioredis'; +import { startCleanupWorker } from './redisCleanup'; +import { onShutdown, shutdownSignal } from '@/utils/shutdown'; +import { delay } from '@/utils/delay'; + +const HEARTBEAT_INTERVAL = 30000; // 30 seconds +const TRIM_INTERVAL = 30000; // 30 seconds + +export async function startConsumer( + stream: string, + maxSize: number, + handler: (messages: string[]) => void | Promise +) { + startCleanupWorker(); + let wasCreated = false; + const consumerGroup = randomUUID(); + const received = new LRUSet(maxSize); // Should me not longer than queue size + const client = new Redis(process.env.REDIS_URL!); + const activeConsumersKey = `active_consumers:${stream}`; + const lock = new AsyncLock(); + let lastHeartbeat = 0; + + // + // Start consumer group loop + // + + forever('redis:' + stream, async () => { + + // + // Heartbeat + // + + if (Date.now() - lastHeartbeat > HEARTBEAT_INTERVAL) { + lastHeartbeat = Date.now(); + await client.hset(activeConsumersKey, consumerGroup, lastHeartbeat); + } + + // + // Create consumer group at current position + // + + if (!wasCreated) { + try { + await client.xgroup('CREATE', stream, consumerGroup, '$', 'MKSTREAM'); + } catch (err: any) { + // Ignore if group already exists + if (!err.message?.includes('BUSYGROUP')) { + throw err; + } + } + wasCreated = true; + } + + // + // Read messages + // + + const results = await client.xreadgroup( + 'GROUP', consumerGroup, 'consumer', + 'COUNT', 100, // 100 messages + 'BLOCK', 5000, // 5 seconds + 'STREAMS', stream, '>' + ) as [string, [string, string[]][]][] | null; + + if (!results || results.length === 0) { + return; + } + + const [, messages] = results[0]; + if (!messages || messages.length === 0) { + return; + } + + // Extract ALL message IDs for acknowledgment + const allMessageIds: string[] = []; + const messageContents: string[] = []; + + for (const [messageId, fields] of messages) { + // Always collect ID for acknowledgment + allMessageIds.push(messageId); + + // Only process if not already seen + if (!received.has(messageId) && fields.length >= 2) { + messageContents.push(fields[1]); + received.add(messageId); + } + } + + // Acknowledge ALL messages at once (including duplicates) + await redis.xack(stream, consumerGroup, ...allMessageIds); + + // Only call handler if we have new messages to process + if (messageContents.length === 0) { + return; + } + + // Guarantee order of messages + lock.inLock(async () => { + try { + await handler(messageContents); + } catch (err) { + warn(err); + } + }); + + }); + + // + // Start trimmer + // + + forever('redis:' + stream + ':trimmer', async () => { + await redis.xtrim(stream, 'MAXLEN', '~', maxSize); + await delay(TRIM_INTERVAL, shutdownSignal); + }); + + // + // Clean up on shutdown + // + + onShutdown('redis:' + stream, async () => { + try { + // Destroy consumer group FIRST + await redis.xgroup('DESTROY', stream, consumerGroup); + // Then remove from active consumers + await redis.hdel(activeConsumersKey, consumerGroup); + // Close the blocking client + client.disconnect(); + } catch (err) { + // Ignore + } + }); + +} diff --git a/sources/services/queue/redisProducer.ts b/sources/services/queue/redisProducer.ts new file mode 100644 index 0000000..cd1b0f6 --- /dev/null +++ b/sources/services/queue/redisProducer.ts @@ -0,0 +1,16 @@ +import { redis } from "@/services/redis"; + +export function createRedisProducer(stream: string) { + return async (messages: string[]) => { + if (messages.length === 0) { + return; + } + + // Use pipeline for batch publishing + const pipeline = redis.pipeline(); + for (const message of messages) { + pipeline.xadd(stream, '*', 'data', message); + } + await pipeline.exec(); + } +} \ No newline at end of file diff --git a/sources/services/queue/redisQueue.spec.ts b/sources/services/queue/redisQueue.spec.ts new file mode 100644 index 0000000..489935e --- /dev/null +++ b/sources/services/queue/redisQueue.spec.ts @@ -0,0 +1,272 @@ +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { Redis } from 'ioredis'; +import { startConsumer } from './redisConsumer'; +import { createRedisProducer } from './redisProducer'; +import { delay } from '@/utils/delay'; + +// Mock the redis import +vi.mock('@/services/redis', () => ({ + redis: new Redis(process.env.REDIS_URL || 'redis://localhost:6379') +})); + +// Mock forever to run immediately +vi.mock('@/utils/forever', () => ({ + forever: (name: string, fn: () => Promise) => { + // Run the function in a loop with a small delay + const run = async () => { + while (!globalThis.__stopForever) { + await fn(); + await new Promise(resolve => setTimeout(resolve, 10)); + } + }; + run().catch(() => { }); + } +})); + +// Mock onShutdown to collect callbacks +const shutdownCallbacks: Map Promise> = new Map(); +vi.mock('@/utils/shutdown', () => ({ + onShutdown: (name: string, callback: () => Promise) => { + shutdownCallbacks.set(name, callback); + }, + shutdownSignal: { aborted: false } +})); + +describe('Redis Queue System', () => { + let redis: Redis; + let testStream: string; + let receivedMessages: string[][] = []; + + beforeEach(async () => { + redis = new Redis(process.env.REDIS_URL || 'redis://localhost:6379'); + testStream = `test-stream-${Date.now()}`; + receivedMessages = []; + globalThis.__stopForever = false; + shutdownCallbacks.clear(); + + // Clean up test stream if it exists + try { + await redis.del(testStream); + await redis.del(`active_consumers:${testStream}`); + } catch (err) { + // Ignore + } + }); + + afterEach(async () => { + globalThis.__stopForever = true; + + // Call all shutdown callbacks + for (const callback of shutdownCallbacks.values()) { + await callback(); + } + + // Clean up + try { + await redis.del(testStream); + await redis.del(`active_consumers:${testStream}`); + + // Clean up any consumer groups + const groups = await redis.xinfo('GROUPS', testStream).catch(() => []) as any[]; + for (const groupInfo of groups) { + const groupName = groupInfo[1]; + await redis.xgroup('DESTROY', testStream, groupName).catch(() => { }); + } + } catch (err) { + // Ignore + } + + redis.disconnect(); + }); + + it('should produce and consume messages', async () => { + const producer = createRedisProducer(testStream); + + // Start consumer + await startConsumer(testStream, 1000, async (messages) => { + receivedMessages.push(messages); + }); + + // Wait for consumer to be ready + await delay(100); + + // Send messages - each becomes a separate stream entry + await producer(['message1', 'message2', 'message3']); + + // Wait for messages to be consumed + await delay(200); + + // Check received messages - should get all messages (possibly in multiple batches) + const allMessages = receivedMessages.flat(); + expect(allMessages).toContain('message1'); + expect(allMessages).toContain('message2'); + expect(allMessages).toContain('message3'); + expect(allMessages).toHaveLength(3); + }); + + it('should handle multiple consumers', async () => { + const producer = createRedisProducer(testStream); + const received1: string[][] = []; + const received2: string[][] = []; + + // Start two consumers + await startConsumer(testStream, 1000, async (messages) => { + received1.push(messages); + }); + + await startConsumer(testStream, 1000, async (messages) => { + received2.push(messages); + }); + + // Wait for consumers to be ready + await delay(100); + + // Send messages + await producer(['msg1', 'msg2']); + + // Wait for messages to be consumed + await delay(200); + + // Both consumers should receive all messages + const allMessages1 = received1.flat(); + const allMessages2 = received2.flat(); + + expect(allMessages1).toContain('msg1'); + expect(allMessages1).toContain('msg2'); + expect(allMessages1).toHaveLength(2); + + expect(allMessages2).toContain('msg1'); + expect(allMessages2).toContain('msg2'); + expect(allMessages2).toHaveLength(2); + }); + + it('should track active consumers', async () => { + // Start consumer + await startConsumer(testStream, 1000, async () => { }); + + // Wait for registration + await delay(100); + + // Check active consumers + const activeConsumers = await redis.hgetall(`active_consumers:${testStream}`); + expect(Object.keys(activeConsumers)).toHaveLength(1); + + // Check that consumer has a timestamp + const consumerGroup = Object.keys(activeConsumers)[0]; + const timestamp = parseInt(activeConsumers[consumerGroup]); + expect(timestamp).toBeGreaterThan(0); + expect(timestamp).toBeLessThanOrEqual(Date.now()); + }); + + it('should clean up on shutdown', async () => { + // Start consumer + await startConsumer(testStream, 1000, async () => { }); + + // Wait for registration + await delay(100); + + // Get consumer group + const activeConsumers = await redis.hgetall(`active_consumers:${testStream}`); + const consumerGroup = Object.keys(activeConsumers)[0]; + + // Verify consumer group exists + const groups = (await redis.xinfo('GROUPS', testStream)) as any[]; + expect(groups.length).toBeGreaterThan(0); + + // Call shutdown + const shutdownCallback = shutdownCallbacks.get(`redis:${testStream}`); + expect(shutdownCallback).toBeDefined(); + await shutdownCallback!(); + + // Check that consumer was removed from active list + const activeAfterShutdown = await redis.hgetall(`active_consumers:${testStream}`); + expect(Object.keys(activeAfterShutdown)).toHaveLength(0); + + // Check that consumer group was destroyed + const groupsAfterShutdown = (await redis.xinfo('GROUPS', testStream).catch(() => [])) as any[]; + const groupNames = groupsAfterShutdown.map((g: any) => g[1]); + expect(groupNames).not.toContain(consumerGroup); + }); + + it('should handle empty message batches', async () => { + const producer = createRedisProducer(testStream); + + // Try to produce empty array + await producer([]); + + // Should not throw error + expect(true).toBe(true); + }); + + // it('should handle empty message batches', async () => { + // const producer = createRedisProducer(testStream, 1000); + + // // Try to produce empty array + // await producer([]); + + // // Should not throw error + // expect(true).toBe(true); + // }); + + + it('should handle concurrent message processing', async () => { + const producer = createRedisProducer(testStream); + const processedMessages: string[] = []; + + // Start consumer with delay to simulate processing + await startConsumer(testStream, 1000, async (messages) => { + await delay(50); // Simulate processing time + processedMessages.push(...messages); + }); + + // Wait for consumer to be ready + await delay(100); + + // Send multiple batches quickly + await producer(['batch1-msg1', 'batch1-msg2']); + await producer(['batch2-msg1']); + await producer(['batch3-msg1', 'batch3-msg2', 'batch3-msg3']); + + // Wait for all messages to be processed + await delay(1000); + + // Should have processed all messages + expect(processedMessages).toHaveLength(6); + expect(processedMessages).toContain('batch1-msg1'); + expect(processedMessages).toContain('batch2-msg1'); + expect(processedMessages).toContain('batch3-msg3'); + }); + + it('should update heartbeat periodically', async () => { + // Start consumer + await startConsumer(testStream, 1000, async () => { }); + + // Wait for initial registration + await delay(100); + + // Get initial timestamp + const initial = await redis.hgetall(`active_consumers:${testStream}`); + const consumerGroup = Object.keys(initial)[0]; + const initialTimestamp = parseInt(initial[consumerGroup]); + + // Force some message reads to trigger heartbeat update + const producer = createRedisProducer(testStream); + await producer(['trigger']); + + // Wait a bit + await delay(200); + + // Check if heartbeat was updated + const updated = await redis.hget(`active_consumers:${testStream}`, consumerGroup); + const updatedTimestamp = parseInt(updated!); + + // Timestamp should be valid + expect(updatedTimestamp).toBeGreaterThan(0); + expect(updatedTimestamp).toBeLessThanOrEqual(Date.now()); + }); +}); + +// Global cleanup +declare global { + var __stopForever: boolean; +} \ No newline at end of file diff --git a/sources/storage/inTx.ts b/sources/storage/inTx.ts index cc4b563..0f85e72 100644 --- a/sources/storage/inTx.ts +++ b/sources/storage/inTx.ts @@ -1,5 +1,5 @@ import { Prisma } from "@prisma/client"; -import { delay } from "@/utils/time"; +import { delay } from "@/utils/delay"; import { db } from "@/storage/db"; export type Tx = Prisma.TransactionClient; diff --git a/sources/utils/aborted.ts b/sources/utils/aborted.ts new file mode 100644 index 0000000..cae4bf6 --- /dev/null +++ b/sources/utils/aborted.ts @@ -0,0 +1,15 @@ +export class AbortedExeption extends Error { + constructor(message: string = "Operation aborted") { + super(message); + this.name = "AbortedExeption"; + + // This is needed to properly capture the stack trace in TypeScript + if (Error.captureStackTrace) { + Error.captureStackTrace(this, AbortedExeption); + } + } + + static isAborted(error: unknown): boolean { + return error instanceof AbortedExeption; + } +} \ No newline at end of file diff --git a/sources/utils/backoff.ts b/sources/utils/backoff.ts new file mode 100644 index 0000000..b725e57 --- /dev/null +++ b/sources/utils/backoff.ts @@ -0,0 +1,42 @@ +import { AbortedExeption } from "./aborted"; +import { delay } from "./delay"; +import { warn } from "./log"; + +function exponentialRandomizedBackoffDelay(failureCount: number, minDelay: number, maxDelay: number, factor = 0.5) { + const exponentialDelay = Math.min(maxDelay, minDelay * Math.pow(2, failureCount)); + const jitterRange = exponentialDelay * factor; + const randomJitter = (Math.random() * 2 - 1) * jitterRange; + const delayWithJitter = exponentialDelay + randomJitter; + return Math.floor(Math.max(minDelay, Math.min(maxDelay, delayWithJitter))); +} + +type BackoffFunc = (callback: () => Promise, signal?: AbortSignal) => Promise; + +export function createBackoff( + opts?: { + minDelay?: number, + maxDelay?: number, + factor?: number + }): BackoffFunc { + return async (callback: () => Promise, signal?: AbortSignal): Promise => { + let currentFailureCount = 0; + const minDelay = opts && opts.minDelay !== undefined ? opts.minDelay : 250; + const maxDelay = opts && opts.maxDelay !== undefined ? opts.maxDelay : 10000; + const factor = opts && opts.factor !== undefined ? opts.factor : 0.5; + while (true) { + try { + return await callback(); + } catch (e: any) { + // Check if error is due to abort + if (AbortedExeption.isAborted(e)) { + throw e; + } + warn(e); + let waitForRequest = exponentialRandomizedBackoffDelay(currentFailureCount, minDelay, maxDelay, factor); + await delay(waitForRequest, signal); + } + } + }; +} + +export let backoff = createBackoff(); \ No newline at end of file diff --git a/sources/utils/delay.ts b/sources/utils/delay.ts new file mode 100644 index 0000000..a1a7335 --- /dev/null +++ b/sources/utils/delay.ts @@ -0,0 +1,27 @@ +import { warn } from "./log"; + +export async function delay(ms: number, signal?: AbortSignal): Promise { + if (!signal) { + return new Promise(resolve => setTimeout(resolve, ms)); + } + + if (signal.aborted) { + return; + } + + await new Promise((resolve) => { + const timeout = setTimeout(resolve, ms); + + const abortHandler = () => { + clearTimeout(timeout); + resolve(); + }; + + if (signal.aborted) { + clearTimeout(timeout); + resolve(); + } else { + signal.addEventListener('abort', abortHandler, { once: true }); + } + }); +} \ No newline at end of file diff --git a/sources/utils/forever.ts b/sources/utils/forever.ts new file mode 100644 index 0000000..7098a36 --- /dev/null +++ b/sources/utils/forever.ts @@ -0,0 +1,24 @@ +import { AbortedExeption } from "./aborted"; +import { backoff } from "./backoff"; +import { keepAlive, shutdownSignal } from "./shutdown"; + +export async function forever( + name: string, + callback: () => Promise +) { + keepAlive(name, async () => { + await backoff(async () => { + while (!shutdownSignal.aborted) { + try { + await callback(); + } catch (error) { + if (AbortedExeption.isAborted(error)) { + break; + } else { + throw error; + } + } + } + }); + }); +} \ No newline at end of file diff --git a/sources/utils/lock.ts b/sources/utils/lock.ts new file mode 100644 index 0000000..28a8fae --- /dev/null +++ b/sources/utils/lock.ts @@ -0,0 +1,40 @@ +export class AsyncLock { + private permits: number = 1; + private promiseResolverQueue: Array<(v: boolean) => void> = []; + + async inLock(func: () => Promise | T): Promise { + try { + await this.lock(); + return await func(); + } finally { + this.unlock(); + } + } + + private async lock() { + if (this.permits > 0) { + this.permits = this.permits - 1; + return; + } + await new Promise(resolve => this.promiseResolverQueue.push(resolve)); + } + + private unlock() { + this.permits += 1; + if (this.permits > 1 && this.promiseResolverQueue.length > 0) { + throw new Error('this.permits should never be > 0 when there is someone waiting.'); + } else if (this.permits === 1 && this.promiseResolverQueue.length > 0) { + // If there is someone else waiting, immediately consume the permit that was released + // at the beginning of this function and let the waiting function resume. + this.permits -= 1; + + const nextResolver = this.promiseResolverQueue.shift(); + // Resolve on the next tick + if (nextResolver) { + setTimeout(() => { + nextResolver(true); + }, 0); + } + } + } +} \ No newline at end of file diff --git a/sources/utils/lru.spec.ts b/sources/utils/lru.spec.ts new file mode 100644 index 0000000..d25dcaf --- /dev/null +++ b/sources/utils/lru.spec.ts @@ -0,0 +1,204 @@ +import { describe, it, expect } from 'vitest'; +import { LRUSet } from './lru'; + +describe('LRUSet', () => { + it('should throw error when maxSize is 0 or negative', () => { + expect(() => new LRUSet(0)).toThrow('LRUSet maxSize must be greater than 0'); + expect(() => new LRUSet(-1)).toThrow('LRUSet maxSize must be greater than 0'); + }); + + it('should create LRUSet with positive maxSize', () => { + const lru = new LRUSet(3); + expect(lru.size).toBe(0); + }); + + it('should add values to the set', () => { + const lru = new LRUSet(3); + lru.add(1); + lru.add(2); + lru.add(3); + + expect(lru.size).toBe(3); + expect(lru.has(1)).toBe(true); + expect(lru.has(2)).toBe(true); + expect(lru.has(3)).toBe(true); + }); + + it('should not duplicate values', () => { + const lru = new LRUSet(3); + lru.add(1); + lru.add(1); + lru.add(1); + + expect(lru.size).toBe(1); + expect(lru.has(1)).toBe(true); + }); + + it('should evict least recently used item when capacity exceeded', () => { + const lru = new LRUSet(3); + lru.add(1); + lru.add(2); + lru.add(3); + lru.add(4); // Should evict 1 + + expect(lru.size).toBe(3); + expect(lru.has(1)).toBe(false); + expect(lru.has(2)).toBe(true); + expect(lru.has(3)).toBe(true); + expect(lru.has(4)).toBe(true); + }); + + it('should move accessed items to front', () => { + const lru = new LRUSet(3); + lru.add(1); + lru.add(2); + lru.add(3); + + // Access 1, moving it to front + lru.has(1); + + // Add 4, should evict 2 (least recently used) + lru.add(4); + + expect(lru.has(1)).toBe(true); + expect(lru.has(2)).toBe(false); + expect(lru.has(3)).toBe(true); + expect(lru.has(4)).toBe(true); + }); + + it('should move re-added items to front', () => { + const lru = new LRUSet(3); + lru.add(1); + lru.add(2); + lru.add(3); + + // Re-add 1, moving it to front + lru.add(1); + + // Add 4, should evict 2 (least recently used) + lru.add(4); + + expect(lru.has(1)).toBe(true); + expect(lru.has(2)).toBe(false); + expect(lru.has(3)).toBe(true); + expect(lru.has(4)).toBe(true); + }); + + it('should delete values', () => { + const lru = new LRUSet(3); + lru.add(1); + lru.add(2); + lru.add(3); + + expect(lru.delete(2)).toBe(true); + expect(lru.size).toBe(2); + expect(lru.has(2)).toBe(false); + + expect(lru.delete(2)).toBe(false); // Already deleted + }); + + it('should handle delete of head node', () => { + const lru = new LRUSet(3); + lru.add(1); + lru.add(2); + lru.add(3); // 3 is head + + expect(lru.delete(3)).toBe(true); + expect(lru.size).toBe(2); + expect(lru.toArray()).toEqual([2, 1]); + }); + + it('should handle delete of tail node', () => { + const lru = new LRUSet(3); + lru.add(1); // 1 is tail + lru.add(2); + lru.add(3); + + expect(lru.delete(1)).toBe(true); + expect(lru.size).toBe(2); + expect(lru.toArray()).toEqual([3, 2]); + }); + + it('should clear all values', () => { + const lru = new LRUSet(3); + lru.add(1); + lru.add(2); + lru.add(3); + + lru.clear(); + + expect(lru.size).toBe(0); + expect(lru.has(1)).toBe(false); + expect(lru.has(2)).toBe(false); + expect(lru.has(3)).toBe(false); + }); + + it('should iterate values in order from most to least recently used', () => { + const lru = new LRUSet(4); + lru.add(1); + lru.add(2); + lru.add(3); + lru.add(4); + + const values = Array.from(lru.values()); + expect(values).toEqual([4, 3, 2, 1]); + }); + + it('should convert to array in order from most to least recently used', () => { + const lru = new LRUSet(4); + lru.add(1); + lru.add(2); + lru.add(3); + lru.add(4); + + expect(lru.toArray()).toEqual([4, 3, 2, 1]); + }); + + it('should work with string values', () => { + const lru = new LRUSet(3); + lru.add('a'); + lru.add('b'); + lru.add('c'); + lru.add('d'); + + expect(lru.has('a')).toBe(false); + expect(lru.has('b')).toBe(true); + expect(lru.has('c')).toBe(true); + expect(lru.has('d')).toBe(true); + }); + + it('should work with object values', () => { + const lru = new LRUSet<{id: number}>(2); + const obj1 = {id: 1}; + const obj2 = {id: 2}; + const obj3 = {id: 3}; + + lru.add(obj1); + lru.add(obj2); + lru.add(obj3); + + expect(lru.has(obj1)).toBe(false); + expect(lru.has(obj2)).toBe(true); + expect(lru.has(obj3)).toBe(true); + }); + + it('should handle single item capacity', () => { + const lru = new LRUSet(1); + lru.add(1); + lru.add(2); + + expect(lru.size).toBe(1); + expect(lru.has(1)).toBe(false); + expect(lru.has(2)).toBe(true); + }); + + it('should handle operations on empty set', () => { + const lru = new LRUSet(3); + + expect(lru.size).toBe(0); + expect(lru.has(1)).toBe(false); + expect(lru.delete(1)).toBe(false); + expect(lru.toArray()).toEqual([]); + expect(Array.from(lru.values())).toEqual([]); + }); +}); \ No newline at end of file diff --git a/sources/utils/lru.ts b/sources/utils/lru.ts new file mode 100644 index 0000000..d681a00 --- /dev/null +++ b/sources/utils/lru.ts @@ -0,0 +1,111 @@ +class Node { + constructor( + public value: T, + public prev: Node | null = null, + public next: Node | null = null + ) {} +} + +export class LRUSet { + private readonly maxSize: number; + private readonly map: Map>; + private head: Node | null = null; + private tail: Node | null = null; + + constructor(maxSize: number) { + if (maxSize <= 0) { + throw new Error('LRUSet maxSize must be greater than 0'); + } + this.maxSize = maxSize; + this.map = new Map(); + } + + private moveToFront(node: Node): void { + if (node === this.head) return; + + // Remove from current position + if (node.prev) node.prev.next = node.next; + if (node.next) node.next.prev = node.prev; + if (node === this.tail) this.tail = node.prev; + + // Move to front + node.prev = null; + node.next = this.head; + if (this.head) this.head.prev = node; + this.head = node; + if (!this.tail) this.tail = node; + } + + add(value: T): void { + const existingNode = this.map.get(value); + + if (existingNode) { + // Move to front (most recently used) + this.moveToFront(existingNode); + return; + } + + // Create new node + const newNode = new Node(value); + this.map.set(value, newNode); + + // Add to front + newNode.next = this.head; + if (this.head) this.head.prev = newNode; + this.head = newNode; + if (!this.tail) this.tail = newNode; + + // Remove LRU if over capacity + if (this.map.size > this.maxSize) { + if (this.tail) { + this.map.delete(this.tail.value); + this.tail = this.tail.prev; + if (this.tail) this.tail.next = null; + } + } + } + + has(value: T): boolean { + const node = this.map.get(value); + if (node) { + this.moveToFront(node); + return true; + } + return false; + } + + delete(value: T): boolean { + const node = this.map.get(value); + if (!node) return false; + + // Remove from linked list + if (node.prev) node.prev.next = node.next; + if (node.next) node.next.prev = node.prev; + if (node === this.head) this.head = node.next; + if (node === this.tail) this.tail = node.prev; + + return this.map.delete(value); + } + + clear(): void { + this.map.clear(); + this.head = null; + this.tail = null; + } + + get size(): number { + return this.map.size; + } + + *values(): IterableIterator { + let current = this.head; + while (current) { + yield current.value; + current = current.next; + } + } + + toArray(): T[] { + return Array.from(this.values()); + } +} \ No newline at end of file diff --git a/sources/utils/shutdown.ts b/sources/utils/shutdown.ts index 194f257..5be5594 100644 --- a/sources/utils/shutdown.ts +++ b/sources/utils/shutdown.ts @@ -1,26 +1,37 @@ import { log } from "./log"; -let locks = 0; -let awaititers = new Array<() => void>(); -let shutdown = false; +const shutdownHandlers = new Map Promise>>(); +const shutdownController = new AbortController(); -export function isShutdown() { - return shutdown; +export const shutdownSignal = shutdownController.signal; + +export function onShutdown(name: string, callback: () => Promise): () => void { + if (shutdownSignal.aborted) { + // If already shutting down, execute immediately + callback(); + return () => {}; + } + + if (!shutdownHandlers.has(name)) { + shutdownHandlers.set(name, []); + } + const handlers = shutdownHandlers.get(name)!; + handlers.push(callback); + + // Return unsubscribe function + return () => { + const index = handlers.indexOf(callback); + if (index !== -1) { + handlers.splice(index, 1); + if (handlers.length === 0) { + shutdownHandlers.delete(name); + } + } + }; } -export function shutdownLock() { - let locked = true; - locks++; - return () => { - if (locked) { - locks--; - if (locks === 0) { - for (let iter of awaititers) { - iter(); - } - } - }; - } +export function isShutdown() { + return shutdownSignal.aborted; } export async function awaitShutdown() { @@ -34,10 +45,76 @@ export async function awaitShutdown() { resolve(); }); }); - shutdown = true; - if (locks > 0) { - await new Promise((resolve) => { - awaititers.push(resolve); + shutdownController.abort(); + + // Copy handlers to avoid race conditions + const handlersSnapshot = new Map Promise>>(); + for (const [name, handlers] of shutdownHandlers) { + handlersSnapshot.set(name, [...handlers]); + } + + // Execute all shutdown handlers concurrently + const allHandlers: Promise[] = []; + let totalHandlers = 0; + + for (const [name, handlers] of handlersSnapshot) { + totalHandlers += handlers.length; + log(`Starting ${handlers.length} shutdown handlers for: ${name}`); + + handlers.forEach((handler, index) => { + const handlerPromise = handler().then( + () => {}, + (error) => log(`Error in shutdown handler ${name}[${index}]:`, error) + ); + allHandlers.push(handlerPromise); }); } + + if (totalHandlers > 0) { + log(`Waiting for ${totalHandlers} shutdown handlers to complete...`); + const startTime = Date.now(); + await Promise.all(allHandlers); + const duration = Date.now() - startTime; + log(`All ${totalHandlers} shutdown handlers completed in ${duration}ms`); + } +} + +export async function keepAlive(name: string, callback: () => Promise): Promise { + let completed = false; + let result: T; + let error: any; + + const promise = new Promise((resolve) => { + const unsubscribe = onShutdown(`keepAlive:${name}`, async () => { + if (!completed) { + log(`Waiting for keepAlive operation to complete: ${name}`); + await promise; + } + }); + + // Run the callback + callback().then( + (res) => { + result = res; + completed = true; + unsubscribe(); + resolve(); + }, + (err) => { + error = err; + completed = true; + unsubscribe(); + resolve(); + } + ); + }); + + // Wait for completion + await promise; + + if (error) { + throw error; + } + + return result!; } \ No newline at end of file diff --git a/sources/utils/time.ts b/sources/utils/time.ts deleted file mode 100644 index 39fa04c..0000000 --- a/sources/utils/time.ts +++ /dev/null @@ -1,43 +0,0 @@ -import { warn } from "./log"; - -export async function delay(ms: number) { - return new Promise(resolve => setTimeout(resolve, ms)); -} - -export function exponentialBackoffDelay(currentFailureCount: number, minDelay: number, maxDelay: number, maxFailureCount: number) { - let maxDelayRet = minDelay + ((maxDelay - minDelay) / maxFailureCount) * Math.max(currentFailureCount, maxFailureCount); - return Math.round(Math.random() * maxDelayRet); -} - -export type BackoffFunc = (callback: () => Promise) => Promise; - -export function createBackoff( - opts?: { - onError?: (e: any, failuresCount: number) => void, - minDelay?: number, - maxDelay?: number, - maxFailureCount?: number - }): BackoffFunc { - return async (callback: () => Promise): Promise => { - let currentFailureCount = 0; - const minDelay = opts && opts.minDelay !== undefined ? opts.minDelay : 250; - const maxDelay = opts && opts.maxDelay !== undefined ? opts.maxDelay : 1000; - const maxFailureCount = opts && opts.maxFailureCount !== undefined ? opts.maxFailureCount : 50; - while (true) { - try { - return await callback(); - } catch (e) { - if (currentFailureCount < maxFailureCount) { - currentFailureCount++; - } - if (opts && opts.onError) { - opts.onError(e, currentFailureCount); - } - let waitForRequest = exponentialBackoffDelay(currentFailureCount, minDelay, maxDelay, maxFailureCount); - await delay(waitForRequest); - } - } - }; -} - -export let backoff = createBackoff({ onError: (e) => { warn(e); } }); \ No newline at end of file