From a6a5d92dc1c3ecdf830db8c5996f7de8d9f07697 Mon Sep 17 00:00:00 2001 From: javayhu Date: Fri, 22 Aug 2025 00:39:17 +0800 Subject: [PATCH] refactor: implement batch processing for expired credits in credit cron job --- src/credits/credits.ts | 5 +- src/credits/distribute.ts | 190 +++++++++++++++++++++++++++++++++++++- 2 files changed, 190 insertions(+), 5 deletions(-) diff --git a/src/credits/credits.ts b/src/credits/credits.ts index d82b0a0..3d6e043 100644 --- a/src/credits/credits.ts +++ b/src/credits/credits.ts @@ -149,8 +149,6 @@ export async function addCredits({ console.error('addCredits, invalid expire days', userId, expireDays); throw new Error('Invalid expire days'); } - // Process expired credits first - await processExpiredCredits(userId); // Update user credit balance const db = await getDb(); const current = await db @@ -230,8 +228,6 @@ export async function consumeCredits({ console.error('consumeCredits, invalid amount', userId, amount); throw new Error('Invalid amount'); } - // Process expired credits first - await processExpiredCredits(userId); // Check balance if (!(await hasEnoughCredits({ userId, requiredCredits: amount }))) { console.error( @@ -304,6 +300,7 @@ export async function consumeCredits({ /** * Process expired credits * @param userId - User ID + * @deprecated This function is no longer used, see distribute.ts instead */ export async function processExpiredCredits(userId: string) { const now = new Date(); diff --git a/src/credits/distribute.ts b/src/credits/distribute.ts index 4465e40..9c8f13d 100644 --- a/src/credits/distribute.ts +++ b/src/credits/distribute.ts @@ -4,7 +4,7 @@ import { creditTransaction, payment, user, userCredit } from '@/db/schema'; import { findPlanByPriceId, getAllPricePlans } from '@/lib/price-plan'; import { PlanIntervals } from '@/payment/types'; import { addDays } from 'date-fns'; -import { and, eq, inArray, isNull, or, sql } from 'drizzle-orm'; +import { and, eq, gt, inArray, isNull, lt, not, or, sql } from 'drizzle-orm'; import { CREDIT_TRANSACTION_TYPE } from './types'; /** @@ -14,6 +14,11 @@ import { CREDIT_TRANSACTION_TYPE } from './types'; export async function distributeCreditsToAllUsers() { console.log('>>> distribute credits start'); + // Process expired credits first before distributing new credits + console.log('Processing expired credits before distribution...'); + const expiredResult = await batchProcessExpiredCredits(); + console.log('Expired credits processed:', expiredResult); + const db = await getDb(); // Get all users with their current active payments/subscriptions in a single query @@ -602,3 +607,186 @@ export async function batchAddYearlyUsersMonthlyCredits( `batchAddYearlyUsersMonthlyCredits completed, total processed: ${totalProcessedCount} users` ); } + +/** + * Batch process expired credits for all users + * This function is designed to be called by a cron job + */ +export async function batchProcessExpiredCredits() { + console.log('>>> batch process expired credits start'); + + const db = await getDb(); + const now = new Date(); + + // Get all users who have credit transactions that can expire + const usersWithExpirableCredits = await db + .selectDistinct({ + userId: creditTransaction.userId, + }) + .from(creditTransaction) + .where( + and( + // Exclude usage and expire records (these are consumption/expiration logs) + not(eq(creditTransaction.type, CREDIT_TRANSACTION_TYPE.USAGE)), + not(eq(creditTransaction.type, CREDIT_TRANSACTION_TYPE.EXPIRE)), + // Only include transactions with expirationDate set + not(isNull(creditTransaction.expirationDate)), + // Only include transactions not yet processed for expiration + isNull(creditTransaction.expirationDateProcessedAt), + // Only include transactions with remaining amount > 0 + gt(creditTransaction.remainingAmount, 0), + // Only include expired transactions + lt(creditTransaction.expirationDate, now) + ) + ); + + console.log( + 'batch process expired credits, users count:', + usersWithExpirableCredits.length + ); + + const usersCount = usersWithExpirableCredits.length; + let processedCount = 0; + let errorCount = 0; + let totalExpiredCredits = 0; + + const batchSize = 100; + + // Process users in batches + for (let i = 0; i < usersWithExpirableCredits.length; i += batchSize) { + const batch = usersWithExpirableCredits.slice(i, i + batchSize); + try { + const batchResult = await batchProcessExpiredCreditsForUsers( + batch.map((user) => user.userId) + ); + processedCount += batchResult.processedCount; + totalExpiredCredits += batchResult.expiredCredits; + } catch (error) { + console.error( + `batchProcessExpiredCredits error for batch ${i / batchSize + 1}:`, + error + ); + errorCount += batch.length; + } + + // Log progress for large datasets + if (usersWithExpirableCredits.length > 1000) { + console.log( + `expired credits progress: ${Math.min(i + batchSize, usersWithExpirableCredits.length)}/${usersWithExpirableCredits.length}` + ); + } + } + + console.log( + `<<< batch process expired credits end, users: ${usersCount}, processed: ${processedCount}, errors: ${errorCount}, total expired credits: ${totalExpiredCredits}` + ); + return { usersCount, processedCount, errorCount, totalExpiredCredits }; +} + +/** + * Batch process expired credits for a group of users + * @param userIds - Array of user IDs + */ +export async function batchProcessExpiredCreditsForUsers(userIds: string[]) { + if (userIds.length === 0) { + console.log('batchProcessExpiredCreditsForUsers, no users to process'); + return { processedCount: 0, expiredCredits: 0 }; + } + + const db = await getDb(); + const now = new Date(); + + let totalProcessedCount = 0; + let totalExpiredCredits = 0; + + // Use transaction for data consistency + await db.transaction(async (tx) => { + for (const userId of userIds) { + // Get all credit transactions that can expire for this user + const transactions = await tx + .select() + .from(creditTransaction) + .where( + and( + eq(creditTransaction.userId, userId), + // Exclude usage and expire records (these are consumption/expiration logs) + not(eq(creditTransaction.type, CREDIT_TRANSACTION_TYPE.USAGE)), + not(eq(creditTransaction.type, CREDIT_TRANSACTION_TYPE.EXPIRE)), + // Only include transactions with expirationDate set + not(isNull(creditTransaction.expirationDate)), + // Only include transactions not yet processed for expiration + isNull(creditTransaction.expirationDateProcessedAt), + // Only include transactions with remaining amount > 0 + gt(creditTransaction.remainingAmount, 0), + // Only include expired transactions + lt(creditTransaction.expirationDate, now) + ) + ); + + let expiredTotal = 0; + + // Process expired credit transactions + for (const transaction of transactions) { + const remain = transaction.remainingAmount || 0; + if (remain > 0) { + expiredTotal += remain; + await tx + .update(creditTransaction) + .set({ + remainingAmount: 0, + expirationDateProcessedAt: now, + updatedAt: now, + }) + .where(eq(creditTransaction.id, transaction.id)); + } + } + + if (expiredTotal > 0) { + // Deduct expired credits from balance + const current = await tx + .select() + .from(userCredit) + .where(eq(userCredit.userId, userId)) + .limit(1); + + const newBalance = Math.max( + 0, + (current[0]?.currentCredits || 0) - expiredTotal + ); + + await tx + .update(userCredit) + .set({ currentCredits: newBalance, updatedAt: now }) + .where(eq(userCredit.userId, userId)); + + // Write expire record + await tx.insert(creditTransaction).values({ + id: randomUUID(), + userId, + type: CREDIT_TRANSACTION_TYPE.EXPIRE, + amount: -expiredTotal, + remainingAmount: null, + description: `Expire credits: ${expiredTotal}`, + createdAt: now, + updatedAt: now, + }); + + totalExpiredCredits += expiredTotal; + console.log( + `batchProcessExpiredCreditsForUsers, ${expiredTotal} credits expired for user ${userId}` + ); + } + + totalProcessedCount++; + } + }); + + console.log( + `batchProcessExpiredCreditsForUsers, processed ${totalProcessedCount} users, total expired credits: ${totalExpiredCredits}` + ); + + return { + processedCount: totalProcessedCount, + expiredCredits: totalExpiredCredits, + }; +}