wip: working on redis queue

This commit is contained in:
Steve Korshakov 2025-07-26 01:16:22 -07:00
parent c8c83eaa8b
commit ae95f70372
18 changed files with 1102 additions and 94 deletions

View File

@ -9,7 +9,7 @@
"build": "tsc --noEmit",
"start": "tsx ./sources/main.ts",
"dev": "tsx --env-file=.env.example ./sources/main.ts",
"test": "vitest",
"test": "vitest run",
"migrate": "dotenv -e .env.example -- prisma migrate dev",
"generate": "prisma generate",
"postinstall": "prisma generate",

View File

@ -7,6 +7,7 @@ import * as privacyKit from "privacy-kit";
import * as tweetnacl from "tweetnacl";
import { db } from "@/storage/db";
import { Account, Update } from "@prisma/client";
import { onShutdown } from "@/utils/shutdown";
// Connection metadata types
interface SessionScopedConnection {
@ -566,17 +567,17 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
});
// Aggregate data by time period
const aggregated = new Map<string, {
tokens: Record<string, number>;
cost: Record<string, number>;
count: number;
const aggregated = new Map<string, {
tokens: Record<string, number>;
cost: Record<string, number>;
count: number;
timestamp: number;
}>();
for (const report of reports) {
const data = report.data as PrismaJson.UsageReportData;
const date = new Date(report.createdAt);
// Calculate timestamp based on groupBy
let timestamp: number;
if (actualGroupBy === 'hour') {
@ -1046,7 +1047,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
callback({ result: 'error' });
return null;
}
// Verify session belongs to user and lock it
const session = await tx.session.findFirst({
where: {
@ -1142,7 +1143,7 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
callback({ result: 'error' });
return null;
}
// Verify session belongs to user and lock it
const session = await tx.session.findFirst({
where: {
@ -1470,8 +1471,8 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
}
if (callback) {
callback({
success: true,
callback({
success: true,
reportId: report.id,
createdAt: report.createdAt.getTime(),
updatedAt: report.updatedAt.getTime()
@ -1491,6 +1492,13 @@ export async function startApi(): Promise<{ app: FastifyInstance; io: Server }>
// End
log('API ready on port http://localhost:' + port);
onShutdown('api', async () => {
await app.close();
});
onShutdown('api', async () => {
await io.close();
});
return { app, io };
}

View File

@ -1,6 +1,6 @@
import { pubsub } from "@/services/pubsub";
import { db } from "@/storage/db";
import { backoff, delay } from "@/utils/time";
import { backoff, delay } from "@/utils/delay";
export function startTimeout() {
backoff(async () => {

View File

@ -1,6 +1,6 @@
import { startApi } from "@/app/api";
import { log } from "@/utils/log";
import { awaitShutdown } from "@/utils/shutdown";
import { awaitShutdown, onShutdown } from "@/utils/shutdown";
import { db } from './storage/db';
import { startTimeout } from "./app/timeout";
import { redis } from "./services/redis";
@ -9,13 +9,16 @@ async function main() {
// Storage
await db.$connect();
onShutdown('db', async () => {
await db.$disconnect();
});
await redis.ping();
//
// Start
//
const { app, io } = await startApi();
await startApi();
startTimeout();
//
@ -25,22 +28,11 @@ async function main() {
log('Ready');
await awaitShutdown();
log('Shutting down...');
// Close Socket.io connections
io.close(() => {
log('Socket.io closed');
});
// Close Fastify server
await app.close();
log('Fastify server closed');
}
main().catch(async (e) => {
main().catch((e) => {
console.error(e);
await db.$disconnect();
process.exit(1);
}).then(async () => {
log('Disconnecting from DB...');
await db.$disconnect();
}).then(() => {
process.exit(0);
});

View File

@ -0,0 +1,83 @@
import { redis } from '@/services/redis';
import { delay } from '@/utils/delay';
import { forever } from '@/utils/forever';
import { log, warn } from '@/utils/log';
import { shutdownSignal } from '@/utils/shutdown';
const CLEANUP_INTERVAL = 60000; // 1 minute
let started = false;
// Start cleanup worker for all streams
export async function startCleanupWorker() {
if (started) {
return;
}
started = true;
log('Starting Redis cleanup worker');
forever('redis-cleanup', async () => {
try {
const now = Date.now();
// Find all active_consumers:* keys
const keys = await redis.keys('active_consumers:*');
let totalCleaned = 0;
for (const key of keys) {
// Extract stream name from key: active_consumers:streamname
const stream = key.substring('active_consumers:'.length);
// Get all consumers with their expiration times
const consumers = await redis.hgetall(key);
const expiredConsumers: string[] = [];
// Check each consumer's expiration time
for (const [consumerGroup, expirationTime] of Object.entries(consumers)) {
if (parseInt(expirationTime) < now) {
expiredConsumers.push(consumerGroup);
}
}
if (expiredConsumers.length === 0) {
continue;
}
// Delete expired consumer groups
let cleanedCount = 0;
for (const consumerGroup of expiredConsumers) {
try {
await redis.xgroup('DESTROY', stream, consumerGroup);
cleanedCount++;
} catch (err: any) {
// Group might already be deleted or doesn't exist
if (!err.message?.includes('NOGROUP')) {
warn(`Failed to cleanup group ${consumerGroup} from stream ${stream}:`, err);
}
}
}
// Remove all expired consumers from active list at once
if (expiredConsumers.length > 0) {
await redis.hdel(key, ...expiredConsumers);
}
if (cleanedCount > 0) {
log(`Cleaned up ${cleanedCount} expired consumer groups from stream: ${stream}`);
totalCleaned += cleanedCount;
}
}
if (totalCleaned > 0) {
log(`Total cleaned up: ${totalCleaned} consumer groups across all streams`);
}
} catch (err) {
warn('Error during cleanup cycle:', err);
}
// Wait before next cleanup cycle
await delay(CLEANUP_INTERVAL, shutdownSignal);
});
}

View File

@ -0,0 +1,140 @@
import { redis } from '@/services/redis';
import { forever } from '@/utils/forever';
import { AsyncLock } from '@/utils/lock';
import { warn } from '@/utils/log';
import { LRUSet } from '@/utils/lru';
import { randomUUID } from 'crypto';
import Redis from 'ioredis';
import { startCleanupWorker } from './redisCleanup';
import { onShutdown, shutdownSignal } from '@/utils/shutdown';
import { delay } from '@/utils/delay';
const HEARTBEAT_INTERVAL = 30000; // 30 seconds
const TRIM_INTERVAL = 30000; // 30 seconds
export async function startConsumer(
stream: string,
maxSize: number,
handler: (messages: string[]) => void | Promise<void>
) {
startCleanupWorker();
let wasCreated = false;
const consumerGroup = randomUUID();
const received = new LRUSet<string>(maxSize); // Should me not longer than queue size
const client = new Redis(process.env.REDIS_URL!);
const activeConsumersKey = `active_consumers:${stream}`;
const lock = new AsyncLock();
let lastHeartbeat = 0;
//
// Start consumer group loop
//
forever('redis:' + stream, async () => {
//
// Heartbeat
//
if (Date.now() - lastHeartbeat > HEARTBEAT_INTERVAL) {
lastHeartbeat = Date.now();
await client.hset(activeConsumersKey, consumerGroup, lastHeartbeat);
}
//
// Create consumer group at current position
//
if (!wasCreated) {
try {
await client.xgroup('CREATE', stream, consumerGroup, '$', 'MKSTREAM');
} catch (err: any) {
// Ignore if group already exists
if (!err.message?.includes('BUSYGROUP')) {
throw err;
}
}
wasCreated = true;
}
//
// Read messages
//
const results = await client.xreadgroup(
'GROUP', consumerGroup, 'consumer',
'COUNT', 100, // 100 messages
'BLOCK', 5000, // 5 seconds
'STREAMS', stream, '>'
) as [string, [string, string[]][]][] | null;
if (!results || results.length === 0) {
return;
}
const [, messages] = results[0];
if (!messages || messages.length === 0) {
return;
}
// Extract ALL message IDs for acknowledgment
const allMessageIds: string[] = [];
const messageContents: string[] = [];
for (const [messageId, fields] of messages) {
// Always collect ID for acknowledgment
allMessageIds.push(messageId);
// Only process if not already seen
if (!received.has(messageId) && fields.length >= 2) {
messageContents.push(fields[1]);
received.add(messageId);
}
}
// Acknowledge ALL messages at once (including duplicates)
await redis.xack(stream, consumerGroup, ...allMessageIds);
// Only call handler if we have new messages to process
if (messageContents.length === 0) {
return;
}
// Guarantee order of messages
lock.inLock(async () => {
try {
await handler(messageContents);
} catch (err) {
warn(err);
}
});
});
//
// Start trimmer
//
forever('redis:' + stream + ':trimmer', async () => {
await redis.xtrim(stream, 'MAXLEN', '~', maxSize);
await delay(TRIM_INTERVAL, shutdownSignal);
});
//
// Clean up on shutdown
//
onShutdown('redis:' + stream, async () => {
try {
// Destroy consumer group FIRST
await redis.xgroup('DESTROY', stream, consumerGroup);
// Then remove from active consumers
await redis.hdel(activeConsumersKey, consumerGroup);
// Close the blocking client
client.disconnect();
} catch (err) {
// Ignore
}
});
}

View File

@ -0,0 +1,16 @@
import { redis } from "@/services/redis";
export function createRedisProducer(stream: string) {
return async (messages: string[]) => {
if (messages.length === 0) {
return;
}
// Use pipeline for batch publishing
const pipeline = redis.pipeline();
for (const message of messages) {
pipeline.xadd(stream, '*', 'data', message);
}
await pipeline.exec();
}
}

View File

@ -0,0 +1,272 @@
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { Redis } from 'ioredis';
import { startConsumer } from './redisConsumer';
import { createRedisProducer } from './redisProducer';
import { delay } from '@/utils/delay';
// Mock the redis import
vi.mock('@/services/redis', () => ({
redis: new Redis(process.env.REDIS_URL || 'redis://localhost:6379')
}));
// Mock forever to run immediately
vi.mock('@/utils/forever', () => ({
forever: (name: string, fn: () => Promise<void>) => {
// Run the function in a loop with a small delay
const run = async () => {
while (!globalThis.__stopForever) {
await fn();
await new Promise(resolve => setTimeout(resolve, 10));
}
};
run().catch(() => { });
}
}));
// Mock onShutdown to collect callbacks
const shutdownCallbacks: Map<string, () => Promise<void>> = new Map();
vi.mock('@/utils/shutdown', () => ({
onShutdown: (name: string, callback: () => Promise<void>) => {
shutdownCallbacks.set(name, callback);
},
shutdownSignal: { aborted: false }
}));
describe('Redis Queue System', () => {
let redis: Redis;
let testStream: string;
let receivedMessages: string[][] = [];
beforeEach(async () => {
redis = new Redis(process.env.REDIS_URL || 'redis://localhost:6379');
testStream = `test-stream-${Date.now()}`;
receivedMessages = [];
globalThis.__stopForever = false;
shutdownCallbacks.clear();
// Clean up test stream if it exists
try {
await redis.del(testStream);
await redis.del(`active_consumers:${testStream}`);
} catch (err) {
// Ignore
}
});
afterEach(async () => {
globalThis.__stopForever = true;
// Call all shutdown callbacks
for (const callback of shutdownCallbacks.values()) {
await callback();
}
// Clean up
try {
await redis.del(testStream);
await redis.del(`active_consumers:${testStream}`);
// Clean up any consumer groups
const groups = await redis.xinfo('GROUPS', testStream).catch(() => []) as any[];
for (const groupInfo of groups) {
const groupName = groupInfo[1];
await redis.xgroup('DESTROY', testStream, groupName).catch(() => { });
}
} catch (err) {
// Ignore
}
redis.disconnect();
});
it('should produce and consume messages', async () => {
const producer = createRedisProducer(testStream);
// Start consumer
await startConsumer(testStream, 1000, async (messages) => {
receivedMessages.push(messages);
});
// Wait for consumer to be ready
await delay(100);
// Send messages - each becomes a separate stream entry
await producer(['message1', 'message2', 'message3']);
// Wait for messages to be consumed
await delay(200);
// Check received messages - should get all messages (possibly in multiple batches)
const allMessages = receivedMessages.flat();
expect(allMessages).toContain('message1');
expect(allMessages).toContain('message2');
expect(allMessages).toContain('message3');
expect(allMessages).toHaveLength(3);
});
it('should handle multiple consumers', async () => {
const producer = createRedisProducer(testStream);
const received1: string[][] = [];
const received2: string[][] = [];
// Start two consumers
await startConsumer(testStream, 1000, async (messages) => {
received1.push(messages);
});
await startConsumer(testStream, 1000, async (messages) => {
received2.push(messages);
});
// Wait for consumers to be ready
await delay(100);
// Send messages
await producer(['msg1', 'msg2']);
// Wait for messages to be consumed
await delay(200);
// Both consumers should receive all messages
const allMessages1 = received1.flat();
const allMessages2 = received2.flat();
expect(allMessages1).toContain('msg1');
expect(allMessages1).toContain('msg2');
expect(allMessages1).toHaveLength(2);
expect(allMessages2).toContain('msg1');
expect(allMessages2).toContain('msg2');
expect(allMessages2).toHaveLength(2);
});
it('should track active consumers', async () => {
// Start consumer
await startConsumer(testStream, 1000, async () => { });
// Wait for registration
await delay(100);
// Check active consumers
const activeConsumers = await redis.hgetall(`active_consumers:${testStream}`);
expect(Object.keys(activeConsumers)).toHaveLength(1);
// Check that consumer has a timestamp
const consumerGroup = Object.keys(activeConsumers)[0];
const timestamp = parseInt(activeConsumers[consumerGroup]);
expect(timestamp).toBeGreaterThan(0);
expect(timestamp).toBeLessThanOrEqual(Date.now());
});
it('should clean up on shutdown', async () => {
// Start consumer
await startConsumer(testStream, 1000, async () => { });
// Wait for registration
await delay(100);
// Get consumer group
const activeConsumers = await redis.hgetall(`active_consumers:${testStream}`);
const consumerGroup = Object.keys(activeConsumers)[0];
// Verify consumer group exists
const groups = (await redis.xinfo('GROUPS', testStream)) as any[];
expect(groups.length).toBeGreaterThan(0);
// Call shutdown
const shutdownCallback = shutdownCallbacks.get(`redis:${testStream}`);
expect(shutdownCallback).toBeDefined();
await shutdownCallback!();
// Check that consumer was removed from active list
const activeAfterShutdown = await redis.hgetall(`active_consumers:${testStream}`);
expect(Object.keys(activeAfterShutdown)).toHaveLength(0);
// Check that consumer group was destroyed
const groupsAfterShutdown = (await redis.xinfo('GROUPS', testStream).catch(() => [])) as any[];
const groupNames = groupsAfterShutdown.map((g: any) => g[1]);
expect(groupNames).not.toContain(consumerGroup);
});
it('should handle empty message batches', async () => {
const producer = createRedisProducer(testStream);
// Try to produce empty array
await producer([]);
// Should not throw error
expect(true).toBe(true);
});
// it('should handle empty message batches', async () => {
// const producer = createRedisProducer(testStream, 1000);
// // Try to produce empty array
// await producer([]);
// // Should not throw error
// expect(true).toBe(true);
// });
it('should handle concurrent message processing', async () => {
const producer = createRedisProducer(testStream);
const processedMessages: string[] = [];
// Start consumer with delay to simulate processing
await startConsumer(testStream, 1000, async (messages) => {
await delay(50); // Simulate processing time
processedMessages.push(...messages);
});
// Wait for consumer to be ready
await delay(100);
// Send multiple batches quickly
await producer(['batch1-msg1', 'batch1-msg2']);
await producer(['batch2-msg1']);
await producer(['batch3-msg1', 'batch3-msg2', 'batch3-msg3']);
// Wait for all messages to be processed
await delay(1000);
// Should have processed all messages
expect(processedMessages).toHaveLength(6);
expect(processedMessages).toContain('batch1-msg1');
expect(processedMessages).toContain('batch2-msg1');
expect(processedMessages).toContain('batch3-msg3');
});
it('should update heartbeat periodically', async () => {
// Start consumer
await startConsumer(testStream, 1000, async () => { });
// Wait for initial registration
await delay(100);
// Get initial timestamp
const initial = await redis.hgetall(`active_consumers:${testStream}`);
const consumerGroup = Object.keys(initial)[0];
const initialTimestamp = parseInt(initial[consumerGroup]);
// Force some message reads to trigger heartbeat update
const producer = createRedisProducer(testStream);
await producer(['trigger']);
// Wait a bit
await delay(200);
// Check if heartbeat was updated
const updated = await redis.hget(`active_consumers:${testStream}`, consumerGroup);
const updatedTimestamp = parseInt(updated!);
// Timestamp should be valid
expect(updatedTimestamp).toBeGreaterThan(0);
expect(updatedTimestamp).toBeLessThanOrEqual(Date.now());
});
});
// Global cleanup
declare global {
var __stopForever: boolean;
}

View File

@ -1,5 +1,5 @@
import { Prisma } from "@prisma/client";
import { delay } from "@/utils/time";
import { delay } from "@/utils/delay";
import { db } from "@/storage/db";
export type Tx = Prisma.TransactionClient;

15
sources/utils/aborted.ts Normal file
View File

@ -0,0 +1,15 @@
export class AbortedExeption extends Error {
constructor(message: string = "Operation aborted") {
super(message);
this.name = "AbortedExeption";
// This is needed to properly capture the stack trace in TypeScript
if (Error.captureStackTrace) {
Error.captureStackTrace(this, AbortedExeption);
}
}
static isAborted(error: unknown): boolean {
return error instanceof AbortedExeption;
}
}

42
sources/utils/backoff.ts Normal file
View File

@ -0,0 +1,42 @@
import { AbortedExeption } from "./aborted";
import { delay } from "./delay";
import { warn } from "./log";
function exponentialRandomizedBackoffDelay(failureCount: number, minDelay: number, maxDelay: number, factor = 0.5) {
const exponentialDelay = Math.min(maxDelay, minDelay * Math.pow(2, failureCount));
const jitterRange = exponentialDelay * factor;
const randomJitter = (Math.random() * 2 - 1) * jitterRange;
const delayWithJitter = exponentialDelay + randomJitter;
return Math.floor(Math.max(minDelay, Math.min(maxDelay, delayWithJitter)));
}
type BackoffFunc = <T>(callback: () => Promise<T>, signal?: AbortSignal) => Promise<T>;
export function createBackoff(
opts?: {
minDelay?: number,
maxDelay?: number,
factor?: number
}): BackoffFunc {
return async <T>(callback: () => Promise<T>, signal?: AbortSignal): Promise<T> => {
let currentFailureCount = 0;
const minDelay = opts && opts.minDelay !== undefined ? opts.minDelay : 250;
const maxDelay = opts && opts.maxDelay !== undefined ? opts.maxDelay : 10000;
const factor = opts && opts.factor !== undefined ? opts.factor : 0.5;
while (true) {
try {
return await callback();
} catch (e: any) {
// Check if error is due to abort
if (AbortedExeption.isAborted(e)) {
throw e;
}
warn(e);
let waitForRequest = exponentialRandomizedBackoffDelay(currentFailureCount, minDelay, maxDelay, factor);
await delay(waitForRequest, signal);
}
}
};
}
export let backoff = createBackoff();

27
sources/utils/delay.ts Normal file
View File

@ -0,0 +1,27 @@
import { warn } from "./log";
export async function delay(ms: number, signal?: AbortSignal): Promise<void> {
if (!signal) {
return new Promise(resolve => setTimeout(resolve, ms));
}
if (signal.aborted) {
return;
}
await new Promise<void>((resolve) => {
const timeout = setTimeout(resolve, ms);
const abortHandler = () => {
clearTimeout(timeout);
resolve();
};
if (signal.aborted) {
clearTimeout(timeout);
resolve();
} else {
signal.addEventListener('abort', abortHandler, { once: true });
}
});
}

24
sources/utils/forever.ts Normal file
View File

@ -0,0 +1,24 @@
import { AbortedExeption } from "./aborted";
import { backoff } from "./backoff";
import { keepAlive, shutdownSignal } from "./shutdown";
export async function forever(
name: string,
callback: () => Promise<void>
) {
keepAlive(name, async () => {
await backoff(async () => {
while (!shutdownSignal.aborted) {
try {
await callback();
} catch (error) {
if (AbortedExeption.isAborted(error)) {
break;
} else {
throw error;
}
}
}
});
});
}

40
sources/utils/lock.ts Normal file
View File

@ -0,0 +1,40 @@
export class AsyncLock {
private permits: number = 1;
private promiseResolverQueue: Array<(v: boolean) => void> = [];
async inLock<T>(func: () => Promise<T> | T): Promise<T> {
try {
await this.lock();
return await func();
} finally {
this.unlock();
}
}
private async lock() {
if (this.permits > 0) {
this.permits = this.permits - 1;
return;
}
await new Promise<boolean>(resolve => this.promiseResolverQueue.push(resolve));
}
private unlock() {
this.permits += 1;
if (this.permits > 1 && this.promiseResolverQueue.length > 0) {
throw new Error('this.permits should never be > 0 when there is someone waiting.');
} else if (this.permits === 1 && this.promiseResolverQueue.length > 0) {
// If there is someone else waiting, immediately consume the permit that was released
// at the beginning of this function and let the waiting function resume.
this.permits -= 1;
const nextResolver = this.promiseResolverQueue.shift();
// Resolve on the next tick
if (nextResolver) {
setTimeout(() => {
nextResolver(true);
}, 0);
}
}
}
}

204
sources/utils/lru.spec.ts Normal file
View File

@ -0,0 +1,204 @@
import { describe, it, expect } from 'vitest';
import { LRUSet } from './lru';
describe('LRUSet', () => {
it('should throw error when maxSize is 0 or negative', () => {
expect(() => new LRUSet(0)).toThrow('LRUSet maxSize must be greater than 0');
expect(() => new LRUSet(-1)).toThrow('LRUSet maxSize must be greater than 0');
});
it('should create LRUSet with positive maxSize', () => {
const lru = new LRUSet(3);
expect(lru.size).toBe(0);
});
it('should add values to the set', () => {
const lru = new LRUSet<number>(3);
lru.add(1);
lru.add(2);
lru.add(3);
expect(lru.size).toBe(3);
expect(lru.has(1)).toBe(true);
expect(lru.has(2)).toBe(true);
expect(lru.has(3)).toBe(true);
});
it('should not duplicate values', () => {
const lru = new LRUSet<number>(3);
lru.add(1);
lru.add(1);
lru.add(1);
expect(lru.size).toBe(1);
expect(lru.has(1)).toBe(true);
});
it('should evict least recently used item when capacity exceeded', () => {
const lru = new LRUSet<number>(3);
lru.add(1);
lru.add(2);
lru.add(3);
lru.add(4); // Should evict 1
expect(lru.size).toBe(3);
expect(lru.has(1)).toBe(false);
expect(lru.has(2)).toBe(true);
expect(lru.has(3)).toBe(true);
expect(lru.has(4)).toBe(true);
});
it('should move accessed items to front', () => {
const lru = new LRUSet<number>(3);
lru.add(1);
lru.add(2);
lru.add(3);
// Access 1, moving it to front
lru.has(1);
// Add 4, should evict 2 (least recently used)
lru.add(4);
expect(lru.has(1)).toBe(true);
expect(lru.has(2)).toBe(false);
expect(lru.has(3)).toBe(true);
expect(lru.has(4)).toBe(true);
});
it('should move re-added items to front', () => {
const lru = new LRUSet<number>(3);
lru.add(1);
lru.add(2);
lru.add(3);
// Re-add 1, moving it to front
lru.add(1);
// Add 4, should evict 2 (least recently used)
lru.add(4);
expect(lru.has(1)).toBe(true);
expect(lru.has(2)).toBe(false);
expect(lru.has(3)).toBe(true);
expect(lru.has(4)).toBe(true);
});
it('should delete values', () => {
const lru = new LRUSet<number>(3);
lru.add(1);
lru.add(2);
lru.add(3);
expect(lru.delete(2)).toBe(true);
expect(lru.size).toBe(2);
expect(lru.has(2)).toBe(false);
expect(lru.delete(2)).toBe(false); // Already deleted
});
it('should handle delete of head node', () => {
const lru = new LRUSet<number>(3);
lru.add(1);
lru.add(2);
lru.add(3); // 3 is head
expect(lru.delete(3)).toBe(true);
expect(lru.size).toBe(2);
expect(lru.toArray()).toEqual([2, 1]);
});
it('should handle delete of tail node', () => {
const lru = new LRUSet<number>(3);
lru.add(1); // 1 is tail
lru.add(2);
lru.add(3);
expect(lru.delete(1)).toBe(true);
expect(lru.size).toBe(2);
expect(lru.toArray()).toEqual([3, 2]);
});
it('should clear all values', () => {
const lru = new LRUSet<number>(3);
lru.add(1);
lru.add(2);
lru.add(3);
lru.clear();
expect(lru.size).toBe(0);
expect(lru.has(1)).toBe(false);
expect(lru.has(2)).toBe(false);
expect(lru.has(3)).toBe(false);
});
it('should iterate values in order from most to least recently used', () => {
const lru = new LRUSet<number>(4);
lru.add(1);
lru.add(2);
lru.add(3);
lru.add(4);
const values = Array.from(lru.values());
expect(values).toEqual([4, 3, 2, 1]);
});
it('should convert to array in order from most to least recently used', () => {
const lru = new LRUSet<number>(4);
lru.add(1);
lru.add(2);
lru.add(3);
lru.add(4);
expect(lru.toArray()).toEqual([4, 3, 2, 1]);
});
it('should work with string values', () => {
const lru = new LRUSet<string>(3);
lru.add('a');
lru.add('b');
lru.add('c');
lru.add('d');
expect(lru.has('a')).toBe(false);
expect(lru.has('b')).toBe(true);
expect(lru.has('c')).toBe(true);
expect(lru.has('d')).toBe(true);
});
it('should work with object values', () => {
const lru = new LRUSet<{id: number}>(2);
const obj1 = {id: 1};
const obj2 = {id: 2};
const obj3 = {id: 3};
lru.add(obj1);
lru.add(obj2);
lru.add(obj3);
expect(lru.has(obj1)).toBe(false);
expect(lru.has(obj2)).toBe(true);
expect(lru.has(obj3)).toBe(true);
});
it('should handle single item capacity', () => {
const lru = new LRUSet<number>(1);
lru.add(1);
lru.add(2);
expect(lru.size).toBe(1);
expect(lru.has(1)).toBe(false);
expect(lru.has(2)).toBe(true);
});
it('should handle operations on empty set', () => {
const lru = new LRUSet<number>(3);
expect(lru.size).toBe(0);
expect(lru.has(1)).toBe(false);
expect(lru.delete(1)).toBe(false);
expect(lru.toArray()).toEqual([]);
expect(Array.from(lru.values())).toEqual([]);
});
});

111
sources/utils/lru.ts Normal file
View File

@ -0,0 +1,111 @@
class Node<T> {
constructor(
public value: T,
public prev: Node<T> | null = null,
public next: Node<T> | null = null
) {}
}
export class LRUSet<T> {
private readonly maxSize: number;
private readonly map: Map<T, Node<T>>;
private head: Node<T> | null = null;
private tail: Node<T> | null = null;
constructor(maxSize: number) {
if (maxSize <= 0) {
throw new Error('LRUSet maxSize must be greater than 0');
}
this.maxSize = maxSize;
this.map = new Map();
}
private moveToFront(node: Node<T>): void {
if (node === this.head) return;
// Remove from current position
if (node.prev) node.prev.next = node.next;
if (node.next) node.next.prev = node.prev;
if (node === this.tail) this.tail = node.prev;
// Move to front
node.prev = null;
node.next = this.head;
if (this.head) this.head.prev = node;
this.head = node;
if (!this.tail) this.tail = node;
}
add(value: T): void {
const existingNode = this.map.get(value);
if (existingNode) {
// Move to front (most recently used)
this.moveToFront(existingNode);
return;
}
// Create new node
const newNode = new Node(value);
this.map.set(value, newNode);
// Add to front
newNode.next = this.head;
if (this.head) this.head.prev = newNode;
this.head = newNode;
if (!this.tail) this.tail = newNode;
// Remove LRU if over capacity
if (this.map.size > this.maxSize) {
if (this.tail) {
this.map.delete(this.tail.value);
this.tail = this.tail.prev;
if (this.tail) this.tail.next = null;
}
}
}
has(value: T): boolean {
const node = this.map.get(value);
if (node) {
this.moveToFront(node);
return true;
}
return false;
}
delete(value: T): boolean {
const node = this.map.get(value);
if (!node) return false;
// Remove from linked list
if (node.prev) node.prev.next = node.next;
if (node.next) node.next.prev = node.prev;
if (node === this.head) this.head = node.next;
if (node === this.tail) this.tail = node.prev;
return this.map.delete(value);
}
clear(): void {
this.map.clear();
this.head = null;
this.tail = null;
}
get size(): number {
return this.map.size;
}
*values(): IterableIterator<T> {
let current = this.head;
while (current) {
yield current.value;
current = current.next;
}
}
toArray(): T[] {
return Array.from(this.values());
}
}

View File

@ -1,26 +1,37 @@
import { log } from "./log";
let locks = 0;
let awaititers = new Array<() => void>();
let shutdown = false;
const shutdownHandlers = new Map<string, Array<() => Promise<void>>>();
const shutdownController = new AbortController();
export function isShutdown() {
return shutdown;
export const shutdownSignal = shutdownController.signal;
export function onShutdown(name: string, callback: () => Promise<void>): () => void {
if (shutdownSignal.aborted) {
// If already shutting down, execute immediately
callback();
return () => {};
}
if (!shutdownHandlers.has(name)) {
shutdownHandlers.set(name, []);
}
const handlers = shutdownHandlers.get(name)!;
handlers.push(callback);
// Return unsubscribe function
return () => {
const index = handlers.indexOf(callback);
if (index !== -1) {
handlers.splice(index, 1);
if (handlers.length === 0) {
shutdownHandlers.delete(name);
}
}
};
}
export function shutdownLock() {
let locked = true;
locks++;
return () => {
if (locked) {
locks--;
if (locks === 0) {
for (let iter of awaititers) {
iter();
}
}
};
}
export function isShutdown() {
return shutdownSignal.aborted;
}
export async function awaitShutdown() {
@ -34,10 +45,76 @@ export async function awaitShutdown() {
resolve();
});
});
shutdown = true;
if (locks > 0) {
await new Promise<void>((resolve) => {
awaititers.push(resolve);
shutdownController.abort();
// Copy handlers to avoid race conditions
const handlersSnapshot = new Map<string, Array<() => Promise<void>>>();
for (const [name, handlers] of shutdownHandlers) {
handlersSnapshot.set(name, [...handlers]);
}
// Execute all shutdown handlers concurrently
const allHandlers: Promise<void>[] = [];
let totalHandlers = 0;
for (const [name, handlers] of handlersSnapshot) {
totalHandlers += handlers.length;
log(`Starting ${handlers.length} shutdown handlers for: ${name}`);
handlers.forEach((handler, index) => {
const handlerPromise = handler().then(
() => {},
(error) => log(`Error in shutdown handler ${name}[${index}]:`, error)
);
allHandlers.push(handlerPromise);
});
}
if (totalHandlers > 0) {
log(`Waiting for ${totalHandlers} shutdown handlers to complete...`);
const startTime = Date.now();
await Promise.all(allHandlers);
const duration = Date.now() - startTime;
log(`All ${totalHandlers} shutdown handlers completed in ${duration}ms`);
}
}
export async function keepAlive<T>(name: string, callback: () => Promise<T>): Promise<T> {
let completed = false;
let result: T;
let error: any;
const promise = new Promise<void>((resolve) => {
const unsubscribe = onShutdown(`keepAlive:${name}`, async () => {
if (!completed) {
log(`Waiting for keepAlive operation to complete: ${name}`);
await promise;
}
});
// Run the callback
callback().then(
(res) => {
result = res;
completed = true;
unsubscribe();
resolve();
},
(err) => {
error = err;
completed = true;
unsubscribe();
resolve();
}
);
});
// Wait for completion
await promise;
if (error) {
throw error;
}
return result!;
}

View File

@ -1,43 +0,0 @@
import { warn } from "./log";
export async function delay(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms));
}
export function exponentialBackoffDelay(currentFailureCount: number, minDelay: number, maxDelay: number, maxFailureCount: number) {
let maxDelayRet = minDelay + ((maxDelay - minDelay) / maxFailureCount) * Math.max(currentFailureCount, maxFailureCount);
return Math.round(Math.random() * maxDelayRet);
}
export type BackoffFunc = <T>(callback: () => Promise<T>) => Promise<T>;
export function createBackoff(
opts?: {
onError?: (e: any, failuresCount: number) => void,
minDelay?: number,
maxDelay?: number,
maxFailureCount?: number
}): BackoffFunc {
return async <T>(callback: () => Promise<T>): Promise<T> => {
let currentFailureCount = 0;
const minDelay = opts && opts.minDelay !== undefined ? opts.minDelay : 250;
const maxDelay = opts && opts.maxDelay !== undefined ? opts.maxDelay : 1000;
const maxFailureCount = opts && opts.maxFailureCount !== undefined ? opts.maxFailureCount : 50;
while (true) {
try {
return await callback();
} catch (e) {
if (currentFailureCount < maxFailureCount) {
currentFailureCount++;
}
if (opts && opts.onError) {
opts.onError(e, currentFailureCount);
}
let waitForRequest = exponentialBackoffDelay(currentFailureCount, minDelay, maxDelay, maxFailureCount);
await delay(waitForRequest);
}
}
};
}
export let backoff = createBackoff({ onError: (e) => { warn(e); } });