feat: simple feed engine

This commit is contained in:
Steve Korshakov 2025-09-20 14:42:28 -07:00
parent 595e23967a
commit 0ce1bb4c9a
8 changed files with 271 additions and 3 deletions

View File

@ -0,0 +1,27 @@
-- AlterTable
ALTER TABLE "Account" ADD COLUMN "feedSeq" BIGINT NOT NULL DEFAULT 0;
-- CreateTable
CREATE TABLE "UserFeedItem" (
"id" TEXT NOT NULL,
"userId" TEXT NOT NULL,
"counter" BIGINT NOT NULL,
"repeatKey" TEXT,
"body" JSONB NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
CONSTRAINT "UserFeedItem_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "UserFeedItem_userId_counter_idx" ON "UserFeedItem"("userId", "counter" DESC);
-- CreateIndex
CREATE UNIQUE INDEX "UserFeedItem_userId_counter_key" ON "UserFeedItem"("userId", "counter");
-- CreateIndex
CREATE UNIQUE INDEX "UserFeedItem_userId_repeatKey_key" ON "UserFeedItem"("userId", "repeatKey");
-- AddForeignKey
ALTER TABLE "UserFeedItem" ADD CONSTRAINT "UserFeedItem_userId_fkey" FOREIGN KEY ("userId") REFERENCES "Account"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View File

@ -23,6 +23,7 @@ model Account {
id String @id @default(cuid()) id String @id @default(cuid())
publicKey String @unique publicKey String @unique
seq Int @default(0) seq Int @default(0)
feedSeq BigInt @default(0)
createdAt DateTime @default(now()) createdAt DateTime @default(now())
updatedAt DateTime @updatedAt updatedAt DateTime @updatedAt
settings String? settings String?
@ -49,6 +50,7 @@ model Account {
RelationshipsTo UserRelationship[] @relation("RelationshipsTo") RelationshipsTo UserRelationship[] @relation("RelationshipsTo")
Artifact Artifact[] Artifact Artifact[]
AccessKey AccessKey[] AccessKey AccessKey[]
UserFeedItem UserFeedItem[]
} }
model TerminalAuthRequest { model TerminalAuthRequest {
@ -319,3 +321,23 @@ model UserRelationship {
@@index([toUserId, status]) @@index([toUserId, status])
@@index([fromUserId, status]) @@index([fromUserId, status])
} }
//
// Feed
//
model UserFeedItem {
id String @id @default(cuid())
userId String
user Account @relation(fields: [userId], references: [id], onDelete: Cascade)
counter BigInt
repeatKey String?
/// [FeedBody]
body Json
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([userId, counter])
@@unique([userId, repeatKey])
@@index([userId, counter(sort: Desc)])
}

View File

@ -0,0 +1,40 @@
import { z } from "zod";
import { Fastify } from "../types";
import { FeedBodySchema } from "@/app/feed/types";
import { feedGet } from "@/app/feed/feedGet";
import { Context } from "@/context";
import { db } from "@/storage/db";
export function feedRoutes(app: Fastify) {
app.get('/v1/feed', {
preHandler: app.authenticate,
schema: {
querystring: z.object({
before: z.string().optional(),
after: z.string().optional(),
limit: z.coerce.number().int().min(1).max(200).default(50)
}).optional(),
response: {
200: z.object({
items: z.array(z.object({
id: z.string(),
body: FeedBodySchema,
repeatKey: z.string().nullable(),
cursor: z.string(),
createdAt: z.number()
})),
hasMore: z.boolean()
})
}
}
}, async (request, reply) => {
const items = await feedGet(db, Context.create(request.userId), {
cursor: {
before: request.query?.before,
after: request.query?.after
},
limit: request.query?.limit
});
return reply.send({ items: items.items, hasMore: items.hasMore });
});
}

View File

@ -135,6 +135,12 @@ export type UpdateEvent = {
uid: string; uid: string;
status: 'none' | 'requested' | 'pending' | 'friend' | 'rejected'; status: 'none' | 'requested' | 'pending' | 'friend' | 'rejected';
timestamp: number; timestamp: number;
} | {
type: 'new-feed-post';
id: string;
body: any;
cursor: string;
createdAt: number;
}; };
// === EPHEMERAL EVENT TYPES (Transient) === // === EPHEMERAL EVENT TYPES (Transient) ===
@ -556,3 +562,23 @@ export function buildRelationshipUpdatedEvent(
createdAt: Date.now() createdAt: Date.now()
}; };
} }
export function buildNewFeedPostUpdate(feedItem: {
id: string;
body: any;
cursor: string;
createdAt: number;
}, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'new-feed-post',
id: feedItem.id,
body: feedItem.body,
cursor: feedItem.cursor,
createdAt: feedItem.createdAt
},
createdAt: Date.now()
};
}

View File

@ -0,0 +1,55 @@
import { Context } from "@/context";
import { FeedOptions, FeedResult } from "./types";
import { Prisma } from "@prisma/client";
import { Tx } from "@/storage/inTx";
/**
* Fetch user's feed with pagination.
* Returns items in reverse chronological order (newest first).
* Supports cursor-based pagination using the counter field.
*/
export async function feedGet(
tx: Tx,
ctx: Context,
options?: FeedOptions
): Promise<FeedResult> {
const limit = options?.limit ?? 100;
const cursor = options?.cursor;
// Build where clause for cursor pagination
const where: Prisma.UserFeedItemWhereInput = { userId: ctx.uid };
if (cursor?.before !== undefined) {
if (cursor.before.startsWith('0-')) {
where.counter = { lt: parseInt(cursor.before.substring(2), 10) };
} else {
throw new Error('Invalid cursor format');
}
} else if (cursor?.after !== undefined) {
if (cursor.after.startsWith('0-')) {
where.counter = { gt: parseInt(cursor.after.substring(2), 10) };
} else {
throw new Error('Invalid cursor format');
}
}
// Fetch items + 1 to determine hasMore
const items = await tx.userFeedItem.findMany({
where,
orderBy: { counter: 'desc' },
take: limit + 1
});
// Check if there are more items
const hasMore = items.length > limit;
// Return only requested limit
return {
items: items.slice(0, limit).map(item => ({
...item,
createdAt: item.createdAt.getTime(),
cursor: '0-' + item.counter.toString(10)
})),
hasMore
};
}

View File

@ -0,0 +1,67 @@
import { Context } from "@/context";
import { FeedBody, UserFeedItem } from "./types";
import { afterTx, Tx } from "@/storage/inTx";
import { allocateUserSeq } from "@/storage/seq";
import { eventRouter, buildNewFeedPostUpdate } from "@/app/events/eventRouter";
import { randomKeyNaked } from "@/utils/randomKeyNaked";
/**
* Add a post to user's feed.
* If repeatKey is provided and exists, the post will be updated in-place.
* Otherwise, a new post is created with an incremented counter.
*/
export async function feedPost(
tx: Tx,
ctx: Context,
body: FeedBody,
repeatKey?: string | null
): Promise<UserFeedItem> {
// Delete existing items with the same repeatKey
if (repeatKey) {
await tx.userFeedItem.deleteMany({
where: {
userId: ctx.uid,
repeatKey: repeatKey
}
});
}
// Allocate new counter
const user = await tx.account.update({
where: { id: ctx.uid },
select: { feedSeq: true },
data: { feedSeq: { increment: 1 } }
});
// Create new item
const item = await tx.userFeedItem.create({
data: {
counter: user.feedSeq,
userId: ctx.uid,
repeatKey: repeatKey,
body: body
}
});
const result = {
...item,
createdAt: item.createdAt.getTime(),
cursor: '0-' + item.counter.toString(10)
};
// Emit socket event after transaction completes
afterTx(tx, async () => {
const updateSeq = await allocateUserSeq(ctx.uid);
const updatePayload = buildNewFeedPostUpdate(result, updateSeq, randomKeyNaked(12));
eventRouter.emitUpdate({
userId: ctx.uid,
payload: updatePayload,
recipientFilter: { type: 'all-user-authenticated-connections' }
});
});
return result;
}

33
sources/app/feed/types.ts Normal file
View File

@ -0,0 +1,33 @@
import * as z from "zod";
export const FeedBodySchema = z.discriminatedUnion('kind', [
z.object({ kind: z.literal('friend_request'), uid: z.string() }),
z.object({ kind: z.literal('friend_accepted'), uid: z.string() }),
z.object({ kind: z.literal('text'), text: z.string() })
]);
export type FeedBody = z.infer<typeof FeedBodySchema>;
export interface UserFeedItem {
id: string;
userId: string;
repeatKey: string | null;
body: FeedBody;
createdAt: number;
cursor: string;
}
export interface FeedCursor {
before?: string;
after?: string;
}
export interface FeedOptions {
limit?: number;
cursor?: FeedCursor;
}
export interface FeedResult {
items: UserFeedItem[];
hasMore: boolean;
}

View File

@ -11,6 +11,4 @@ export class Context {
private constructor(uid: string) { private constructor(uid: string) {
this.uid = uid; this.uid = uid;
} }
} }
export type Tx = Prisma.TransactionClient | PrismaClient;