feat: optimize credit distribution by batching user processing and enhancing database queries
This commit is contained in:
parent
564efbd3e2
commit
0be53d3251
@ -4,7 +4,8 @@ import { getDb } from '@/db';
|
||||
import { creditTransaction, payment, user, userCredit } from '@/db/schema';
|
||||
import { findPlanByPriceId } from '@/lib/price-plan';
|
||||
import { addDays, isAfter } from 'date-fns';
|
||||
import { and, asc, desc, eq, gt, isNull, not, or } from 'drizzle-orm';
|
||||
import { and, asc, desc, eq, gt, inArray, isNull, not, or } from 'drizzle-orm';
|
||||
import { sql } from 'drizzle-orm';
|
||||
import { CREDIT_TRANSACTION_TYPE } from './types';
|
||||
|
||||
/**
|
||||
@ -566,56 +567,110 @@ export async function distributeCreditsToAllUsers() {
|
||||
|
||||
const db = await getDb();
|
||||
|
||||
// Get all users with their current active payments/subscriptions
|
||||
const users = await db
|
||||
// Get all users with their current active payments/subscriptions in a single query
|
||||
// This uses a LEFT JOIN to get users and their latest active payment in one query
|
||||
const usersWithPayments = await db
|
||||
.select({
|
||||
userId: user.id,
|
||||
email: user.email,
|
||||
name: user.name,
|
||||
priceId: payment.priceId,
|
||||
paymentStatus: payment.status,
|
||||
paymentCreatedAt: payment.createdAt,
|
||||
})
|
||||
.from(user)
|
||||
.leftJoin(
|
||||
// Subquery to get the latest active payment for each user
|
||||
db
|
||||
.select({
|
||||
userId: payment.userId,
|
||||
priceId: payment.priceId,
|
||||
status: payment.status,
|
||||
createdAt: payment.createdAt,
|
||||
rowNumber:
|
||||
sql<number>`ROW_NUMBER() OVER (PARTITION BY ${payment.userId} ORDER BY ${payment.createdAt} DESC)`.as(
|
||||
'row_number'
|
||||
),
|
||||
})
|
||||
.from(payment)
|
||||
.where(or(eq(payment.status, 'active'), eq(payment.status, 'trialing')))
|
||||
.as('latest_payment'),
|
||||
and(
|
||||
eq(user.id, sql`latest_payment.user_id`),
|
||||
eq(sql`latest_payment.row_number`, 1)
|
||||
)
|
||||
)
|
||||
.where(or(isNull(user.banned), eq(user.banned, false)));
|
||||
console.log('distribute credits, users count:', users.length);
|
||||
|
||||
console.log('distribute credits, users count:', usersWithPayments.length);
|
||||
|
||||
let processedCount = 0;
|
||||
let errorCount = 0;
|
||||
|
||||
for (const userRecord of users) {
|
||||
try {
|
||||
// Get user's current active subscription/payment
|
||||
const activePayments = await db
|
||||
.select()
|
||||
.from(payment)
|
||||
.where(
|
||||
and(
|
||||
eq(payment.userId, userRecord.userId),
|
||||
or(eq(payment.status, 'active'), eq(payment.status, 'trialing'))
|
||||
)
|
||||
)
|
||||
.orderBy(desc(payment.createdAt));
|
||||
// Separate users by their plan type for batch processing
|
||||
const lifetimeUserIds: string[] = [];
|
||||
const freeUserIds: string[] = [];
|
||||
|
||||
if (activePayments.length > 0) {
|
||||
// User has active subscription - check what type
|
||||
const activePayment = activePayments[0];
|
||||
const pricePlan = findPlanByPriceId(activePayment.priceId);
|
||||
|
||||
if (pricePlan?.isLifetime) {
|
||||
// Lifetime user - add monthly credits
|
||||
await addLifetimeMonthlyCredits(userRecord.userId);
|
||||
}
|
||||
// Note: Subscription renewals are handled by Stripe webhooks, not here
|
||||
} else {
|
||||
// User has no active subscription - add free monthly credits if enabled
|
||||
await addMonthlyFreeCredits(userRecord.userId);
|
||||
usersWithPayments.forEach((userRecord) => {
|
||||
if (userRecord.priceId && userRecord.paymentStatus) {
|
||||
// User has active subscription - check what type
|
||||
const pricePlan = findPlanByPriceId(userRecord.priceId);
|
||||
if (pricePlan?.isLifetime) {
|
||||
lifetimeUserIds.push(userRecord.userId);
|
||||
}
|
||||
// Note: Subscription renewals are handled by Stripe webhooks, not here
|
||||
} else {
|
||||
// User has no active subscription - add free monthly credits if enabled
|
||||
freeUserIds.push(userRecord.userId);
|
||||
}
|
||||
});
|
||||
|
||||
processedCount++;
|
||||
console.log(
|
||||
`distribute credits, lifetime users: ${lifetimeUserIds.length}, free users: ${freeUserIds.length}`
|
||||
);
|
||||
|
||||
// Process lifetime users in batches
|
||||
const batchSize = 100;
|
||||
for (let i = 0; i < lifetimeUserIds.length; i += batchSize) {
|
||||
const batch = lifetimeUserIds.slice(i, i + batchSize);
|
||||
try {
|
||||
await batchAddLifetimeMonthlyCredits(batch);
|
||||
processedCount += batch.length;
|
||||
} catch (error) {
|
||||
console.error(
|
||||
`distribute credits error, user: ${userRecord.userId}, error:`,
|
||||
`batchAddLifetimeMonthlyCredits error for batch ${i / batchSize + 1}:`,
|
||||
error
|
||||
);
|
||||
errorCount++;
|
||||
errorCount += batch.length;
|
||||
}
|
||||
|
||||
// Log progress for large datasets
|
||||
if (lifetimeUserIds.length > 1000) {
|
||||
console.log(
|
||||
`lifetime credits progress: ${Math.min(i + batchSize, lifetimeUserIds.length)}/${lifetimeUserIds.length}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Process free users in batches
|
||||
for (let i = 0; i < freeUserIds.length; i += batchSize) {
|
||||
const batch = freeUserIds.slice(i, i + batchSize);
|
||||
try {
|
||||
await batchAddMonthlyFreeCredits(batch);
|
||||
processedCount += batch.length;
|
||||
} catch (error) {
|
||||
console.error(
|
||||
`batchAddMonthlyFreeCredits error for batch ${i / batchSize + 1}:`,
|
||||
error
|
||||
);
|
||||
errorCount += batch.length;
|
||||
}
|
||||
|
||||
// Log progress for large datasets
|
||||
if (freeUserIds.length > 1000) {
|
||||
console.log(
|
||||
`free credits progress: ${Math.min(i + batchSize, freeUserIds.length)}/${freeUserIds.length}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -624,3 +679,250 @@ export async function distributeCreditsToAllUsers() {
|
||||
);
|
||||
return { processedCount, errorCount };
|
||||
}
|
||||
|
||||
/**
|
||||
* Batch add monthly free credits to multiple users
|
||||
* @param userIds - Array of user IDs
|
||||
*/
|
||||
export async function batchAddMonthlyFreeCredits(userIds: string[]) {
|
||||
if (userIds.length === 0) return;
|
||||
|
||||
const freePlan = Object.values(websiteConfig.price.plans).find(
|
||||
(plan) => plan.isFree && !plan.disabled
|
||||
);
|
||||
if (!freePlan) {
|
||||
console.log('batchAddMonthlyFreeCredits, no free plan found');
|
||||
return;
|
||||
}
|
||||
if (
|
||||
freePlan.disabled ||
|
||||
!freePlan.credits?.enable ||
|
||||
!freePlan.credits?.amount
|
||||
) {
|
||||
console.log(
|
||||
'batchAddMonthlyFreeCredits, plan disabled or credits disabled',
|
||||
freePlan.id
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const db = await getDb();
|
||||
const now = new Date();
|
||||
const credits = freePlan.credits.amount;
|
||||
const expireDays = freePlan.credits.expireDays;
|
||||
|
||||
// Use transaction for data consistency
|
||||
let processedCount = 0;
|
||||
await db.transaction(async (tx) => {
|
||||
// Get all user credit records in one query
|
||||
const userCredits = await tx
|
||||
.select({
|
||||
userId: userCredit.userId,
|
||||
lastRefreshAt: userCredit.lastRefreshAt,
|
||||
currentCredits: userCredit.currentCredits,
|
||||
})
|
||||
.from(userCredit)
|
||||
.where(inArray(userCredit.userId, userIds));
|
||||
|
||||
// Create a map for quick lookup
|
||||
const userCreditMap = new Map(
|
||||
userCredits.map((record) => [record.userId, record])
|
||||
);
|
||||
|
||||
// Filter users who can receive credits
|
||||
const eligibleUserIds = userIds.filter((userId) => {
|
||||
const record = userCreditMap.get(userId);
|
||||
if (!record?.lastRefreshAt) {
|
||||
return true; // never added credits before
|
||||
}
|
||||
const last = new Date(record.lastRefreshAt);
|
||||
// different month or year means new month
|
||||
return (
|
||||
now.getMonth() !== last.getMonth() ||
|
||||
now.getFullYear() !== last.getFullYear()
|
||||
);
|
||||
});
|
||||
|
||||
if (eligibleUserIds.length === 0) {
|
||||
console.log('batchAddMonthlyFreeCredits, no eligible users');
|
||||
return;
|
||||
}
|
||||
|
||||
processedCount = eligibleUserIds.length;
|
||||
const expirationDate = expireDays ? addDays(now, expireDays) : undefined;
|
||||
|
||||
// Batch insert credit transactions
|
||||
const transactions = eligibleUserIds.map((userId) => ({
|
||||
id: randomUUID(),
|
||||
userId,
|
||||
type: CREDIT_TRANSACTION_TYPE.MONTHLY_REFRESH,
|
||||
amount: credits,
|
||||
remainingAmount: credits,
|
||||
description: `Free monthly credits: ${credits} for ${now.getFullYear()}-${now.getMonth() + 1}`,
|
||||
expirationDate,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
}));
|
||||
|
||||
await tx.insert(creditTransaction).values(transactions);
|
||||
|
||||
// Prepare user credit updates
|
||||
const existingUserIds = eligibleUserIds.filter((userId) =>
|
||||
userCreditMap.has(userId)
|
||||
);
|
||||
const newUserIds = eligibleUserIds.filter(
|
||||
(userId) => !userCreditMap.has(userId)
|
||||
);
|
||||
|
||||
// Insert new user credit records
|
||||
if (newUserIds.length > 0) {
|
||||
const newRecords = newUserIds.map((userId) => ({
|
||||
id: randomUUID(),
|
||||
userId,
|
||||
currentCredits: credits,
|
||||
lastRefreshAt: now,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
}));
|
||||
await tx.insert(userCredit).values(newRecords);
|
||||
}
|
||||
|
||||
// Update existing user credit records
|
||||
if (existingUserIds.length > 0) {
|
||||
await tx
|
||||
.update(userCredit)
|
||||
.set({
|
||||
currentCredits: credits,
|
||||
lastRefreshAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(inArray(userCredit.userId, existingUserIds));
|
||||
}
|
||||
});
|
||||
|
||||
console.log(
|
||||
`batchAddMonthlyFreeCredits, ${credits} credits for ${processedCount} users, date: ${now.getFullYear()}-${now.getMonth() + 1}`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Batch add lifetime monthly credits to multiple users
|
||||
* @param userIds - Array of user IDs
|
||||
*/
|
||||
export async function batchAddLifetimeMonthlyCredits(userIds: string[]) {
|
||||
if (userIds.length === 0) return;
|
||||
|
||||
const lifetimePlan = Object.values(websiteConfig.price.plans).find(
|
||||
(plan) => plan.isLifetime && !plan.disabled
|
||||
);
|
||||
if (
|
||||
!lifetimePlan ||
|
||||
lifetimePlan.disabled ||
|
||||
!lifetimePlan.credits ||
|
||||
!lifetimePlan.credits.enable
|
||||
) {
|
||||
console.log(
|
||||
'batchAddLifetimeMonthlyCredits, plan disabled or credits disabled',
|
||||
lifetimePlan?.id
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const db = await getDb();
|
||||
const now = new Date();
|
||||
const credits = lifetimePlan.credits.amount;
|
||||
const expireDays = lifetimePlan.credits.expireDays;
|
||||
|
||||
// Use transaction for data consistency
|
||||
let processedCount = 0;
|
||||
await db.transaction(async (tx) => {
|
||||
// Get all user credit records in one query
|
||||
const userCredits = await tx
|
||||
.select({
|
||||
userId: userCredit.userId,
|
||||
lastRefreshAt: userCredit.lastRefreshAt,
|
||||
currentCredits: userCredit.currentCredits,
|
||||
})
|
||||
.from(userCredit)
|
||||
.where(inArray(userCredit.userId, userIds));
|
||||
|
||||
// Create a map for quick lookup
|
||||
const userCreditMap = new Map(
|
||||
userCredits.map((record) => [record.userId, record])
|
||||
);
|
||||
|
||||
// Filter users who can receive credits
|
||||
const eligibleUserIds = userIds.filter((userId) => {
|
||||
const record = userCreditMap.get(userId);
|
||||
if (!record?.lastRefreshAt) {
|
||||
return true; // never added credits before
|
||||
}
|
||||
const last = new Date(record.lastRefreshAt);
|
||||
// different month or year means new month
|
||||
return (
|
||||
now.getMonth() !== last.getMonth() ||
|
||||
now.getFullYear() !== last.getFullYear()
|
||||
);
|
||||
});
|
||||
|
||||
if (eligibleUserIds.length === 0) {
|
||||
console.log('batchAddLifetimeMonthlyCredits, no eligible users');
|
||||
return;
|
||||
}
|
||||
|
||||
processedCount = eligibleUserIds.length;
|
||||
const expirationDate = expireDays ? addDays(now, expireDays) : undefined;
|
||||
|
||||
// Batch insert credit transactions
|
||||
const transactions = eligibleUserIds.map((userId) => ({
|
||||
id: randomUUID(),
|
||||
userId,
|
||||
type: CREDIT_TRANSACTION_TYPE.LIFETIME_MONTHLY,
|
||||
amount: credits,
|
||||
remainingAmount: credits,
|
||||
description: `Lifetime monthly credits: ${credits} for ${now.getFullYear()}-${now.getMonth() + 1}`,
|
||||
expirationDate,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
}));
|
||||
|
||||
await tx.insert(creditTransaction).values(transactions);
|
||||
|
||||
// Prepare user credit updates
|
||||
const existingUserIds = eligibleUserIds.filter((userId) =>
|
||||
userCreditMap.has(userId)
|
||||
);
|
||||
const newUserIds = eligibleUserIds.filter(
|
||||
(userId) => !userCreditMap.has(userId)
|
||||
);
|
||||
|
||||
// Insert new user credit records
|
||||
if (newUserIds.length > 0) {
|
||||
const newRecords = newUserIds.map((userId) => ({
|
||||
id: randomUUID(),
|
||||
userId,
|
||||
currentCredits: credits,
|
||||
lastRefreshAt: now,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
}));
|
||||
await tx.insert(userCredit).values(newRecords);
|
||||
}
|
||||
|
||||
// Update existing user credit records
|
||||
if (existingUserIds.length > 0) {
|
||||
await tx
|
||||
.update(userCredit)
|
||||
.set({
|
||||
currentCredits: credits,
|
||||
lastRefreshAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(inArray(userCredit.userId, existingUserIds));
|
||||
}
|
||||
});
|
||||
|
||||
console.log(
|
||||
`batchAddLifetimeMonthlyCredits, ${credits} credits for ${processedCount} users, date: ${now.getFullYear()}-${now.getMonth() + 1}`
|
||||
);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user