diff --git a/prisma/migrations/20250920213557_add_user_feed/migration.sql b/prisma/migrations/20250920213557_add_user_feed/migration.sql new file mode 100644 index 0000000..4ae72b9 --- /dev/null +++ b/prisma/migrations/20250920213557_add_user_feed/migration.sql @@ -0,0 +1,27 @@ +-- AlterTable +ALTER TABLE "Account" ADD COLUMN "feedSeq" BIGINT NOT NULL DEFAULT 0; + +-- CreateTable +CREATE TABLE "UserFeedItem" ( + "id" TEXT NOT NULL, + "userId" TEXT NOT NULL, + "counter" BIGINT NOT NULL, + "repeatKey" TEXT, + "body" JSONB NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "UserFeedItem_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "UserFeedItem_userId_counter_idx" ON "UserFeedItem"("userId", "counter" DESC); + +-- CreateIndex +CREATE UNIQUE INDEX "UserFeedItem_userId_counter_key" ON "UserFeedItem"("userId", "counter"); + +-- CreateIndex +CREATE UNIQUE INDEX "UserFeedItem_userId_repeatKey_key" ON "UserFeedItem"("userId", "repeatKey"); + +-- AddForeignKey +ALTER TABLE "UserFeedItem" ADD CONSTRAINT "UserFeedItem_userId_fkey" FOREIGN KEY ("userId") REFERENCES "Account"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index afa7321..855e494 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -23,6 +23,7 @@ model Account { id String @id @default(cuid()) publicKey String @unique seq Int @default(0) + feedSeq BigInt @default(0) createdAt DateTime @default(now()) updatedAt DateTime @updatedAt settings String? @@ -49,6 +50,7 @@ model Account { RelationshipsTo UserRelationship[] @relation("RelationshipsTo") Artifact Artifact[] AccessKey AccessKey[] + UserFeedItem UserFeedItem[] } model TerminalAuthRequest { @@ -319,3 +321,23 @@ model UserRelationship { @@index([toUserId, status]) @@index([fromUserId, status]) } + +// +// Feed +// + +model UserFeedItem { + id String @id @default(cuid()) + userId String + user Account @relation(fields: [userId], references: [id], onDelete: Cascade) + counter BigInt + repeatKey String? + /// [FeedBody] + body Json + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@unique([userId, counter]) + @@unique([userId, repeatKey]) + @@index([userId, counter(sort: Desc)]) +} diff --git a/sources/app/api/routes/feedRoutes.ts b/sources/app/api/routes/feedRoutes.ts new file mode 100644 index 0000000..7589b02 --- /dev/null +++ b/sources/app/api/routes/feedRoutes.ts @@ -0,0 +1,40 @@ +import { z } from "zod"; +import { Fastify } from "../types"; +import { FeedBodySchema } from "@/app/feed/types"; +import { feedGet } from "@/app/feed/feedGet"; +import { Context } from "@/context"; +import { db } from "@/storage/db"; + +export function feedRoutes(app: Fastify) { + app.get('/v1/feed', { + preHandler: app.authenticate, + schema: { + querystring: z.object({ + before: z.string().optional(), + after: z.string().optional(), + limit: z.coerce.number().int().min(1).max(200).default(50) + }).optional(), + response: { + 200: z.object({ + items: z.array(z.object({ + id: z.string(), + body: FeedBodySchema, + repeatKey: z.string().nullable(), + cursor: z.string(), + createdAt: z.number() + })), + hasMore: z.boolean() + }) + } + } + }, async (request, reply) => { + const items = await feedGet(db, Context.create(request.userId), { + cursor: { + before: request.query?.before, + after: request.query?.after + }, + limit: request.query?.limit + }); + return reply.send({ items: items.items, hasMore: items.hasMore }); + }); +} \ No newline at end of file diff --git a/sources/app/events/eventRouter.ts b/sources/app/events/eventRouter.ts index 2dc6f67..ecf5437 100644 --- a/sources/app/events/eventRouter.ts +++ b/sources/app/events/eventRouter.ts @@ -135,6 +135,12 @@ export type UpdateEvent = { uid: string; status: 'none' | 'requested' | 'pending' | 'friend' | 'rejected'; timestamp: number; +} | { + type: 'new-feed-post'; + id: string; + body: any; + cursor: string; + createdAt: number; }; // === EPHEMERAL EVENT TYPES (Transient) === @@ -556,3 +562,23 @@ export function buildRelationshipUpdatedEvent( createdAt: Date.now() }; } + +export function buildNewFeedPostUpdate(feedItem: { + id: string; + body: any; + cursor: string; + createdAt: number; +}, updateSeq: number, updateId: string): UpdatePayload { + return { + id: updateId, + seq: updateSeq, + body: { + t: 'new-feed-post', + id: feedItem.id, + body: feedItem.body, + cursor: feedItem.cursor, + createdAt: feedItem.createdAt + }, + createdAt: Date.now() + }; +} diff --git a/sources/app/feed/feedGet.ts b/sources/app/feed/feedGet.ts new file mode 100644 index 0000000..a5fbc04 --- /dev/null +++ b/sources/app/feed/feedGet.ts @@ -0,0 +1,55 @@ +import { Context } from "@/context"; +import { FeedOptions, FeedResult } from "./types"; +import { Prisma } from "@prisma/client"; +import { Tx } from "@/storage/inTx"; + +/** + * Fetch user's feed with pagination. + * Returns items in reverse chronological order (newest first). + * Supports cursor-based pagination using the counter field. + */ +export async function feedGet( + tx: Tx, + ctx: Context, + options?: FeedOptions +): Promise { + const limit = options?.limit ?? 100; + const cursor = options?.cursor; + + // Build where clause for cursor pagination + const where: Prisma.UserFeedItemWhereInput = { userId: ctx.uid }; + + if (cursor?.before !== undefined) { + if (cursor.before.startsWith('0-')) { + where.counter = { lt: parseInt(cursor.before.substring(2), 10) }; + } else { + throw new Error('Invalid cursor format'); + } + } else if (cursor?.after !== undefined) { + if (cursor.after.startsWith('0-')) { + where.counter = { gt: parseInt(cursor.after.substring(2), 10) }; + } else { + throw new Error('Invalid cursor format'); + } + } + + // Fetch items + 1 to determine hasMore + const items = await tx.userFeedItem.findMany({ + where, + orderBy: { counter: 'desc' }, + take: limit + 1 + }); + + // Check if there are more items + const hasMore = items.length > limit; + + // Return only requested limit + return { + items: items.slice(0, limit).map(item => ({ + ...item, + createdAt: item.createdAt.getTime(), + cursor: '0-' + item.counter.toString(10) + })), + hasMore + }; +} \ No newline at end of file diff --git a/sources/app/feed/feedPost.ts b/sources/app/feed/feedPost.ts new file mode 100644 index 0000000..6f3f06a --- /dev/null +++ b/sources/app/feed/feedPost.ts @@ -0,0 +1,67 @@ +import { Context } from "@/context"; +import { FeedBody, UserFeedItem } from "./types"; +import { afterTx, Tx } from "@/storage/inTx"; +import { allocateUserSeq } from "@/storage/seq"; +import { eventRouter, buildNewFeedPostUpdate } from "@/app/events/eventRouter"; +import { randomKeyNaked } from "@/utils/randomKeyNaked"; + +/** + * Add a post to user's feed. + * If repeatKey is provided and exists, the post will be updated in-place. + * Otherwise, a new post is created with an incremented counter. + */ +export async function feedPost( + tx: Tx, + ctx: Context, + body: FeedBody, + repeatKey?: string | null +): Promise { + + + // Delete existing items with the same repeatKey + if (repeatKey) { + await tx.userFeedItem.deleteMany({ + where: { + userId: ctx.uid, + repeatKey: repeatKey + } + }); + } + + // Allocate new counter + const user = await tx.account.update({ + where: { id: ctx.uid }, + select: { feedSeq: true }, + data: { feedSeq: { increment: 1 } } + }); + + // Create new item + const item = await tx.userFeedItem.create({ + data: { + counter: user.feedSeq, + userId: ctx.uid, + repeatKey: repeatKey, + body: body + } + }); + + const result = { + ...item, + createdAt: item.createdAt.getTime(), + cursor: '0-' + item.counter.toString(10) + }; + + // Emit socket event after transaction completes + afterTx(tx, async () => { + const updateSeq = await allocateUserSeq(ctx.uid); + const updatePayload = buildNewFeedPostUpdate(result, updateSeq, randomKeyNaked(12)); + + eventRouter.emitUpdate({ + userId: ctx.uid, + payload: updatePayload, + recipientFilter: { type: 'all-user-authenticated-connections' } + }); + }); + + return result; +} \ No newline at end of file diff --git a/sources/app/feed/types.ts b/sources/app/feed/types.ts new file mode 100644 index 0000000..4469a11 --- /dev/null +++ b/sources/app/feed/types.ts @@ -0,0 +1,33 @@ +import * as z from "zod"; + +export const FeedBodySchema = z.discriminatedUnion('kind', [ + z.object({ kind: z.literal('friend_request'), uid: z.string() }), + z.object({ kind: z.literal('friend_accepted'), uid: z.string() }), + z.object({ kind: z.literal('text'), text: z.string() }) +]); + +export type FeedBody = z.infer; + +export interface UserFeedItem { + id: string; + userId: string; + repeatKey: string | null; + body: FeedBody; + createdAt: number; + cursor: string; +} + +export interface FeedCursor { + before?: string; + after?: string; +} + +export interface FeedOptions { + limit?: number; + cursor?: FeedCursor; +} + +export interface FeedResult { + items: UserFeedItem[]; + hasMore: boolean; +} \ No newline at end of file diff --git a/sources/context.ts b/sources/context.ts index 576b942..7e2ab2f 100644 --- a/sources/context.ts +++ b/sources/context.ts @@ -11,6 +11,4 @@ export class Context { private constructor(uid: string) { this.uid = uid; } -} - -export type Tx = Prisma.TransactionClient | PrismaClient; \ No newline at end of file +} \ No newline at end of file