refactor: implement batch processing for expired credits in credit cron job

This commit is contained in:
javayhu 2025-08-22 00:39:17 +08:00
parent 63a5e4f328
commit a6a5d92dc1
2 changed files with 190 additions and 5 deletions

View File

@ -149,8 +149,6 @@ export async function addCredits({
console.error('addCredits, invalid expire days', userId, expireDays);
throw new Error('Invalid expire days');
}
// Process expired credits first
await processExpiredCredits(userId);
// Update user credit balance
const db = await getDb();
const current = await db
@ -230,8 +228,6 @@ export async function consumeCredits({
console.error('consumeCredits, invalid amount', userId, amount);
throw new Error('Invalid amount');
}
// Process expired credits first
await processExpiredCredits(userId);
// Check balance
if (!(await hasEnoughCredits({ userId, requiredCredits: amount }))) {
console.error(
@ -304,6 +300,7 @@ export async function consumeCredits({
/**
* Process expired credits
* @param userId - User ID
* @deprecated This function is no longer used, see distribute.ts instead
*/
export async function processExpiredCredits(userId: string) {
const now = new Date();

View File

@ -4,7 +4,7 @@ import { creditTransaction, payment, user, userCredit } from '@/db/schema';
import { findPlanByPriceId, getAllPricePlans } from '@/lib/price-plan';
import { PlanIntervals } from '@/payment/types';
import { addDays } from 'date-fns';
import { and, eq, inArray, isNull, or, sql } from 'drizzle-orm';
import { and, eq, gt, inArray, isNull, lt, not, or, sql } from 'drizzle-orm';
import { CREDIT_TRANSACTION_TYPE } from './types';
/**
@ -14,6 +14,11 @@ import { CREDIT_TRANSACTION_TYPE } from './types';
export async function distributeCreditsToAllUsers() {
console.log('>>> distribute credits start');
// Process expired credits first before distributing new credits
console.log('Processing expired credits before distribution...');
const expiredResult = await batchProcessExpiredCredits();
console.log('Expired credits processed:', expiredResult);
const db = await getDb();
// Get all users with their current active payments/subscriptions in a single query
@ -602,3 +607,186 @@ export async function batchAddYearlyUsersMonthlyCredits(
`batchAddYearlyUsersMonthlyCredits completed, total processed: ${totalProcessedCount} users`
);
}
/**
* Batch process expired credits for all users
* This function is designed to be called by a cron job
*/
export async function batchProcessExpiredCredits() {
console.log('>>> batch process expired credits start');
const db = await getDb();
const now = new Date();
// Get all users who have credit transactions that can expire
const usersWithExpirableCredits = await db
.selectDistinct({
userId: creditTransaction.userId,
})
.from(creditTransaction)
.where(
and(
// Exclude usage and expire records (these are consumption/expiration logs)
not(eq(creditTransaction.type, CREDIT_TRANSACTION_TYPE.USAGE)),
not(eq(creditTransaction.type, CREDIT_TRANSACTION_TYPE.EXPIRE)),
// Only include transactions with expirationDate set
not(isNull(creditTransaction.expirationDate)),
// Only include transactions not yet processed for expiration
isNull(creditTransaction.expirationDateProcessedAt),
// Only include transactions with remaining amount > 0
gt(creditTransaction.remainingAmount, 0),
// Only include expired transactions
lt(creditTransaction.expirationDate, now)
)
);
console.log(
'batch process expired credits, users count:',
usersWithExpirableCredits.length
);
const usersCount = usersWithExpirableCredits.length;
let processedCount = 0;
let errorCount = 0;
let totalExpiredCredits = 0;
const batchSize = 100;
// Process users in batches
for (let i = 0; i < usersWithExpirableCredits.length; i += batchSize) {
const batch = usersWithExpirableCredits.slice(i, i + batchSize);
try {
const batchResult = await batchProcessExpiredCreditsForUsers(
batch.map((user) => user.userId)
);
processedCount += batchResult.processedCount;
totalExpiredCredits += batchResult.expiredCredits;
} catch (error) {
console.error(
`batchProcessExpiredCredits error for batch ${i / batchSize + 1}:`,
error
);
errorCount += batch.length;
}
// Log progress for large datasets
if (usersWithExpirableCredits.length > 1000) {
console.log(
`expired credits progress: ${Math.min(i + batchSize, usersWithExpirableCredits.length)}/${usersWithExpirableCredits.length}`
);
}
}
console.log(
`<<< batch process expired credits end, users: ${usersCount}, processed: ${processedCount}, errors: ${errorCount}, total expired credits: ${totalExpiredCredits}`
);
return { usersCount, processedCount, errorCount, totalExpiredCredits };
}
/**
* Batch process expired credits for a group of users
* @param userIds - Array of user IDs
*/
export async function batchProcessExpiredCreditsForUsers(userIds: string[]) {
if (userIds.length === 0) {
console.log('batchProcessExpiredCreditsForUsers, no users to process');
return { processedCount: 0, expiredCredits: 0 };
}
const db = await getDb();
const now = new Date();
let totalProcessedCount = 0;
let totalExpiredCredits = 0;
// Use transaction for data consistency
await db.transaction(async (tx) => {
for (const userId of userIds) {
// Get all credit transactions that can expire for this user
const transactions = await tx
.select()
.from(creditTransaction)
.where(
and(
eq(creditTransaction.userId, userId),
// Exclude usage and expire records (these are consumption/expiration logs)
not(eq(creditTransaction.type, CREDIT_TRANSACTION_TYPE.USAGE)),
not(eq(creditTransaction.type, CREDIT_TRANSACTION_TYPE.EXPIRE)),
// Only include transactions with expirationDate set
not(isNull(creditTransaction.expirationDate)),
// Only include transactions not yet processed for expiration
isNull(creditTransaction.expirationDateProcessedAt),
// Only include transactions with remaining amount > 0
gt(creditTransaction.remainingAmount, 0),
// Only include expired transactions
lt(creditTransaction.expirationDate, now)
)
);
let expiredTotal = 0;
// Process expired credit transactions
for (const transaction of transactions) {
const remain = transaction.remainingAmount || 0;
if (remain > 0) {
expiredTotal += remain;
await tx
.update(creditTransaction)
.set({
remainingAmount: 0,
expirationDateProcessedAt: now,
updatedAt: now,
})
.where(eq(creditTransaction.id, transaction.id));
}
}
if (expiredTotal > 0) {
// Deduct expired credits from balance
const current = await tx
.select()
.from(userCredit)
.where(eq(userCredit.userId, userId))
.limit(1);
const newBalance = Math.max(
0,
(current[0]?.currentCredits || 0) - expiredTotal
);
await tx
.update(userCredit)
.set({ currentCredits: newBalance, updatedAt: now })
.where(eq(userCredit.userId, userId));
// Write expire record
await tx.insert(creditTransaction).values({
id: randomUUID(),
userId,
type: CREDIT_TRANSACTION_TYPE.EXPIRE,
amount: -expiredTotal,
remainingAmount: null,
description: `Expire credits: ${expiredTotal}`,
createdAt: now,
updatedAt: now,
});
totalExpiredCredits += expiredTotal;
console.log(
`batchProcessExpiredCreditsForUsers, ${expiredTotal} credits expired for user ${userId}`
);
}
totalProcessedCount++;
}
});
console.log(
`batchProcessExpiredCreditsForUsers, processed ${totalProcessedCount} users, total expired credits: ${totalExpiredCredits}`
);
return {
processedCount: totalProcessedCount,
expiredCredits: totalExpiredCredits,
};
}