Compare commits

...

10 Commits

Author SHA1 Message Date
b594832c30 fix docker not run
Some checks failed
Build Develop Docker to Docker Hub / docker (push) Has been cancelled
2025-10-16 14:12:42 +08:00
26d42b4cf1 add docker build ci 2025-10-16 11:46:46 +08:00
Steve Korshakov
5ba780c8a2 fix: put -> post 2025-09-21 17:43:01 -07:00
Steve Korshakov
b2c72cc485 feat: add kv 2025-09-21 17:03:45 -07:00
Steve Korshakov
aeba1b6f1c fix: fix mssing feed routes 2025-09-20 22:26:42 -07:00
Steve Korshakov
014473a7ac feat: implement session deletion with cascade and socket notifications
- Add sessionDelete action to handle deletion of sessions and all related data
- Delete session messages, usage reports, and access keys in proper order
- Add delete-session event type to eventRouter for real-time notifications
- Add DELETE /v1/sessions/:sessionId endpoint with ownership verification
- Send socket notification to all user connections after successful deletion
- Add idempotency rule to CLAUDE.md for API operations

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2025-09-20 21:17:48 -07:00
Steve Korshakov
cfd7a7b783 ref: remove logs 2025-09-20 17:10:02 -07:00
Steve Korshakov
9822512aeb fix: use inTx wrapper instead of raw transactions for friend operations
Fixes "undefined reading push" error by using the inTx wrapper which properly
initializes the callbacks array needed by afterTx for notification sending.

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2025-09-20 17:00:05 -07:00
Steve Korshakov
c534331ce5 feat: send friend requests notifications 2025-09-20 14:55:07 -07:00
Steve Korshakov
0ce1bb4c9a feat: simple feed engine 2025-09-20 14:42:28 -07:00
27 changed files with 1272 additions and 37 deletions

43
.github/workflows/build-main-docker.yml vendored Normal file
View File

@ -0,0 +1,43 @@
name: Build Develop Docker to Docker Hub
on:
push:
branches:
- main
env:
REGISTRY: docker.io
IMAGE_NAME: ${{ secrets.DOCKERHUB_USERNAME }}/${{ github.event.repository.name }}
USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
TOKEN: ${{ secrets.DOCKERHUB_TOKEN }}
jobs:
docker:
runs-on: ubuntu-latest
steps:
-
name: Checkout
uses: actions/checkout@v4
-
name: Login to ${{ env.REGISTRY }}
if: github.event_name != 'pull_request'
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ env.USERNAME }}
password: ${{ env.TOKEN }}
-
name: Set up QEMU
uses: docker/setup-qemu-action@v3
-
name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
-
name: Build and push
uses: docker/build-push-action@v6
with:
context: .
push: ${{ github.event_name != 'pull_request' }}
build-args: |
PUSH_RECENT=1
tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest

View File

@ -160,6 +160,7 @@ The project has pending Prisma migrations that need to be applied:
- Routes are in `/sources/apps/api/routes`
- Use Fastify with Zod for type-safe route definitions
- Always validate inputs using Zod
- **Idempotency**: Design all operations to be idempotent - clients may retry requests automatically and the backend must handle multiple invocations of the same operation gracefully, producing the same result as a single invocation
## Docker Deployment
@ -276,4 +277,5 @@ tail -500 .logs/*.log | grep "applySessions.*active" | tail -10
- Do not add logging when not asked
- do not run non-transactional things (like uploadign files) in transactions
- After writing an action - add a documentation comment that explains logic, also keep it in sync.
- always use github usernames
- always use github usernames
- Always use privacyKit.decodeBase64 and privacyKit.encodeBase64 from privacy-kit instead of using buffer

View File

@ -37,9 +37,10 @@ COPY --from=builder /app/tsconfig.json ./tsconfig.json
COPY --from=builder /app/package.json ./package.json
COPY --from=builder /app/node_modules ./node_modules
COPY --from=builder /app/sources ./sources
COPY --from=builder /app/prisma ./prisma
# Expose the port the app will run on
EXPOSE 3000
# Command to run the application
CMD ["yarn", "start"]
CMD ["yarn", "start"]

View File

@ -0,0 +1,27 @@
-- AlterTable
ALTER TABLE "Account" ADD COLUMN "feedSeq" BIGINT NOT NULL DEFAULT 0;
-- CreateTable
CREATE TABLE "UserFeedItem" (
"id" TEXT NOT NULL,
"userId" TEXT NOT NULL,
"counter" BIGINT NOT NULL,
"repeatKey" TEXT,
"body" JSONB NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
CONSTRAINT "UserFeedItem_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "UserFeedItem_userId_counter_idx" ON "UserFeedItem"("userId", "counter" DESC);
-- CreateIndex
CREATE UNIQUE INDEX "UserFeedItem_userId_counter_key" ON "UserFeedItem"("userId", "counter");
-- CreateIndex
CREATE UNIQUE INDEX "UserFeedItem_userId_repeatKey_key" ON "UserFeedItem"("userId", "repeatKey");
-- AddForeignKey
ALTER TABLE "UserFeedItem" ADD CONSTRAINT "UserFeedItem_userId_fkey" FOREIGN KEY ("userId") REFERENCES "Account"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View File

@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "UserRelationship" ADD COLUMN "lastNotifiedAt" TIMESTAMP(3);

View File

@ -0,0 +1,21 @@
-- CreateTable
CREATE TABLE "UserKVStore" (
"id" TEXT NOT NULL,
"accountId" TEXT NOT NULL,
"key" TEXT NOT NULL,
"value" BYTEA,
"version" INTEGER NOT NULL DEFAULT 0,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
CONSTRAINT "UserKVStore_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "UserKVStore_accountId_idx" ON "UserKVStore"("accountId");
-- CreateIndex
CREATE UNIQUE INDEX "UserKVStore_accountId_key_key" ON "UserKVStore"("accountId", "key");
-- AddForeignKey
ALTER TABLE "UserKVStore" ADD CONSTRAINT "UserKVStore_accountId_fkey" FOREIGN KEY ("accountId") REFERENCES "Account"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View File

@ -23,6 +23,7 @@ model Account {
id String @id @default(cuid())
publicKey String @unique
seq Int @default(0)
feedSeq BigInt @default(0)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
settings String?
@ -49,6 +50,8 @@ model Account {
RelationshipsTo UserRelationship[] @relation("RelationshipsTo")
Artifact Artifact[]
AccessKey AccessKey[]
UserFeedItem UserFeedItem[]
UserKVStore UserKVStore[]
}
model TerminalAuthRequest {
@ -306,16 +309,55 @@ enum RelationshipStatus {
}
model UserRelationship {
fromUserId String
fromUser Account @relation("RelationshipsFrom", fields: [fromUserId], references: [id], onDelete: Cascade)
toUserId String
toUser Account @relation("RelationshipsTo", fields: [toUserId], references: [id], onDelete: Cascade)
status RelationshipStatus @default(pending)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
acceptedAt DateTime?
fromUserId String
fromUser Account @relation("RelationshipsFrom", fields: [fromUserId], references: [id], onDelete: Cascade)
toUserId String
toUser Account @relation("RelationshipsTo", fields: [toUserId], references: [id], onDelete: Cascade)
status RelationshipStatus @default(pending)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
acceptedAt DateTime?
lastNotifiedAt DateTime?
@@id([fromUserId, toUserId])
@@index([toUserId, status])
@@index([fromUserId, status])
}
//
// Feed
//
model UserFeedItem {
id String @id @default(cuid())
userId String
user Account @relation(fields: [userId], references: [id], onDelete: Cascade)
counter BigInt
repeatKey String?
/// [FeedBody]
body Json
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([userId, counter])
@@unique([userId, repeatKey])
@@index([userId, counter(sort: Desc)])
}
//
// Key-Value Storage
//
model UserKVStore {
id String @id @default(cuid())
accountId String
account Account @relation(fields: [accountId], references: [id], onDelete: Cascade)
key String // Unencrypted for indexing
value Bytes? // Encrypted value, null when "deleted"
version Int @default(0)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([accountId, key])
@@index([accountId])
}

View File

@ -19,6 +19,8 @@ import { enableMonitoring } from "./utils/enableMonitoring";
import { enableErrorHandlers } from "./utils/enableErrorHandlers";
import { enableAuthentication } from "./utils/enableAuthentication";
import { userRoutes } from "./routes/userRoutes";
import { feedRoutes } from "./routes/feedRoutes";
import { kvRoutes } from "./routes/kvRoutes";
export async function startApi() {
@ -62,6 +64,8 @@ export async function startApi() {
versionRoutes(typed);
voiceRoutes(typed);
userRoutes(typed);
feedRoutes(typed);
kvRoutes(typed);
// Start HTTP
const port = process.env.PORT ? parseInt(process.env.PORT, 10) : 3005;

View File

@ -0,0 +1,40 @@
import { z } from "zod";
import { Fastify } from "../types";
import { FeedBodySchema } from "@/app/feed/types";
import { feedGet } from "@/app/feed/feedGet";
import { Context } from "@/context";
import { db } from "@/storage/db";
export function feedRoutes(app: Fastify) {
app.get('/v1/feed', {
preHandler: app.authenticate,
schema: {
querystring: z.object({
before: z.string().optional(),
after: z.string().optional(),
limit: z.coerce.number().int().min(1).max(200).default(50)
}).optional(),
response: {
200: z.object({
items: z.array(z.object({
id: z.string(),
body: FeedBodySchema,
repeatKey: z.string().nullable(),
cursor: z.string(),
createdAt: z.number()
})),
hasMore: z.boolean()
})
}
}
}, async (request, reply) => {
const items = await feedGet(db, Context.create(request.userId), {
cursor: {
before: request.query?.before,
after: request.query?.after
},
limit: request.query?.limit
});
return reply.send({ items: items.items, hasMore: items.hasMore });
});
}

View File

@ -0,0 +1,172 @@
import { z } from "zod";
import { Fastify } from "../types";
import { kvGet } from "@/app/kv/kvGet";
import { kvList } from "@/app/kv/kvList";
import { kvBulkGet } from "@/app/kv/kvBulkGet";
import { kvMutate } from "@/app/kv/kvMutate";
import { log } from "@/utils/log";
export function kvRoutes(app: Fastify) {
// GET /v1/kv/:key - Get single value
app.get('/v1/kv/:key', {
preHandler: app.authenticate,
schema: {
params: z.object({
key: z.string()
}),
response: {
200: z.object({
key: z.string(),
value: z.string(),
version: z.number()
}).nullable(),
404: z.object({
error: z.literal('Key not found')
}),
500: z.object({
error: z.literal('Failed to get value')
})
}
}
}, async (request, reply) => {
const userId = request.userId;
const { key } = request.params;
try {
const result = await kvGet({ uid: userId }, key);
if (!result) {
return reply.code(404).send({ error: 'Key not found' });
}
return reply.send(result);
} catch (error) {
log({ module: 'api', level: 'error' }, `Failed to get KV value: ${error}`);
return reply.code(500).send({ error: 'Failed to get value' });
}
});
// GET /v1/kv - List key-value pairs with optional prefix filter
app.get('/v1/kv', {
preHandler: app.authenticate,
schema: {
querystring: z.object({
prefix: z.string().optional(),
limit: z.coerce.number().int().min(1).max(1000).default(100)
}),
response: {
200: z.object({
items: z.array(z.object({
key: z.string(),
value: z.string(),
version: z.number()
}))
}),
500: z.object({
error: z.literal('Failed to list items')
})
}
}
}, async (request, reply) => {
const userId = request.userId;
const { prefix, limit } = request.query;
try {
const result = await kvList({ uid: userId }, { prefix, limit });
return reply.send(result);
} catch (error) {
log({ module: 'api', level: 'error' }, `Failed to list KV items: ${error}`);
return reply.code(500).send({ error: 'Failed to list items' });
}
});
// POST /v1/kv/bulk - Bulk get values
app.post('/v1/kv/bulk', {
preHandler: app.authenticate,
schema: {
body: z.object({
keys: z.array(z.string()).min(1).max(100)
}),
response: {
200: z.object({
values: z.array(z.object({
key: z.string(),
value: z.string(),
version: z.number()
}))
}),
500: z.object({
error: z.literal('Failed to get values')
})
}
}
}, async (request, reply) => {
const userId = request.userId;
const { keys } = request.body;
try {
const result = await kvBulkGet({ uid: userId }, keys);
return reply.send(result);
} catch (error) {
log({ module: 'api', level: 'error' }, `Failed to bulk get KV values: ${error}`);
return reply.code(500).send({ error: 'Failed to get values' });
}
});
// PUT /v1/kv - Atomic batch mutation
app.post('/v1/kv', {
preHandler: app.authenticate,
schema: {
body: z.object({
mutations: z.array(z.object({
key: z.string(),
value: z.string().nullable(),
version: z.number() // Always required, use -1 for new keys
})).min(1).max(100)
}),
response: {
200: z.object({
success: z.literal(true),
results: z.array(z.object({
key: z.string(),
version: z.number()
}))
}),
409: z.object({
success: z.literal(false),
errors: z.array(z.object({
key: z.string(),
error: z.literal('version-mismatch'),
version: z.number(),
value: z.string().nullable()
}))
}),
500: z.object({
error: z.literal('Failed to mutate values')
})
}
}
}, async (request, reply) => {
const userId = request.userId;
const { mutations } = request.body;
try {
const result = await kvMutate({ uid: userId }, mutations);
if (!result.success) {
return reply.code(409).send({
success: false as const,
errors: result.errors!
});
}
return reply.send({
success: true as const,
results: result.results!
});
} catch (error) {
log({ module: 'api', level: 'error' }, `Failed to mutate KV values: ${error}`);
return reply.code(500).send({ error: 'Failed to mutate values' });
}
});
}

View File

@ -6,6 +6,7 @@ import { Prisma } from "@prisma/client";
import { log } from "@/utils/log";
import { randomKeyNaked } from "@/utils/randomKeyNaked";
import { allocateUserSeq } from "@/storage/seq";
import { sessionDelete } from "@/app/session/sessionDelete";
export function sessionRoutes(app: Fastify) {
@ -352,4 +353,25 @@ export function sessionRoutes(app: Fastify) {
}))
});
});
// Delete session
app.delete('/v1/sessions/:sessionId', {
schema: {
params: z.object({
sessionId: z.string()
})
},
preHandler: app.authenticate
}, async (request, reply) => {
const userId = request.userId;
const { sessionId } = request.params;
const deleted = await sessionDelete({ uid: userId }, sessionId);
if (!deleted) {
return reply.code(404).send({ error: 'Session not found or not owned by user' });
}
return reply.send({ success: true });
});
}

View File

@ -17,15 +17,15 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
// Check if method was already registered
const previousSocket = rpcListeners.get(method);
if (previousSocket && previousSocket !== socket) {
log({ module: 'websocket-rpc' }, `RPC method ${method} re-registered: ${previousSocket.id} -> ${socket.id}`);
// log({ module: 'websocket-rpc' }, `RPC method ${method} re-registered: ${previousSocket.id} -> ${socket.id}`);
}
// Register this socket as the listener for this method
rpcListeners.set(method, socket);
socket.emit('rpc-registered', { method });
log({ module: 'websocket-rpc' }, `RPC method registered: ${method} on socket ${socket.id} (user: ${userId})`);
log({ module: 'websocket-rpc' }, `Active RPC methods for user ${userId}: ${Array.from(rpcListeners.keys()).join(', ')}`);
// log({ module: 'websocket-rpc' }, `RPC method registered: ${method} on socket ${socket.id} (user: ${userId})`);
// log({ module: 'websocket-rpc' }, `Active RPC methods for user ${userId}: ${Array.from(rpcListeners.keys()).join(', ')}`);
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Error in rpc-register: ${error}`);
socket.emit('rpc-error', { type: 'register', error: 'Internal error' });
@ -44,16 +44,16 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
if (rpcListeners.get(method) === socket) {
rpcListeners.delete(method);
log({ module: 'websocket-rpc' }, `RPC method unregistered: ${method} from socket ${socket.id} (user: ${userId})`);
// log({ module: 'websocket-rpc' }, `RPC method unregistered: ${method} from socket ${socket.id} (user: ${userId})`);
if (rpcListeners.size === 0) {
rpcListeners.delete(userId);
log({ module: 'websocket-rpc' }, `All RPC methods unregistered for user ${userId}`);
// log({ module: 'websocket-rpc' }, `All RPC methods unregistered for user ${userId}`);
} else {
log({ module: 'websocket-rpc' }, `Remaining RPC methods for user ${userId}: ${Array.from(rpcListeners.keys()).join(', ')}`);
// log({ module: 'websocket-rpc' }, `Remaining RPC methods for user ${userId}: ${Array.from(rpcListeners.keys()).join(', ')}`);
}
} else {
log({ module: 'websocket-rpc' }, `RPC unregister ignored: ${method} not registered on socket ${socket.id}`);
// log({ module: 'websocket-rpc' }, `RPC unregister ignored: ${method} not registered on socket ${socket.id}`);
}
socket.emit('rpc-unregistered', { method });
@ -80,7 +80,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
const targetSocket = rpcListeners.get(method);
if (!targetSocket || !targetSocket.connected) {
log({ module: 'websocket-rpc' }, `RPC call failed: Method ${method} not available (disconnected or not registered)`);
// log({ module: 'websocket-rpc' }, `RPC call failed: Method ${method} not available (disconnected or not registered)`);
if (callback) {
callback({
ok: false,
@ -92,7 +92,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
// Don't allow calling your own socket
if (targetSocket === socket) {
log({ module: 'websocket-rpc' }, `RPC call failed: Attempted self-call on method ${method}`);
// log({ module: 'websocket-rpc' }, `RPC call failed: Attempted self-call on method ${method}`);
if (callback) {
callback({
ok: false,
@ -104,7 +104,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
// Log RPC call initiation
const startTime = Date.now();
log({ module: 'websocket-rpc' }, `RPC call initiated: ${socket.id} -> ${method} (target: ${targetSocket.id})`);
// log({ module: 'websocket-rpc' }, `RPC call initiated: ${socket.id} -> ${method} (target: ${targetSocket.id})`);
// Forward the RPC request to the target socket using emitWithAck
try {
@ -114,7 +114,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
});
const duration = Date.now() - startTime;
log({ module: 'websocket-rpc' }, `RPC call succeeded: ${method} (${duration}ms)`);
// log({ module: 'websocket-rpc' }, `RPC call succeeded: ${method} (${duration}ms)`);
// Forward the response back to the caller via callback
if (callback) {
@ -127,7 +127,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
} catch (error) {
const duration = Date.now() - startTime;
const errorMsg = error instanceof Error ? error.message : 'RPC call failed';
log({ module: 'websocket-rpc' }, `RPC call failed: ${method} - ${errorMsg} (${duration}ms)`);
// log({ module: 'websocket-rpc' }, `RPC call failed: ${method} - ${errorMsg} (${duration}ms)`);
// Timeout or error occurred
if (callback) {
@ -138,7 +138,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
}
}
} catch (error) {
log({ module: 'websocket', level: 'error' }, `Error in rpc-call: ${error}`);
// log({ module: 'websocket', level: 'error' }, `Error in rpc-call: ${error}`);
if (callback) {
callback({
ok: false,
@ -158,13 +158,13 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
}
if (methodsToRemove.length > 0) {
log({ module: 'websocket-rpc' }, `Cleaning up RPC methods on disconnect for socket ${socket.id}: ${methodsToRemove.join(', ')}`);
// log({ module: 'websocket-rpc' }, `Cleaning up RPC methods on disconnect for socket ${socket.id}: ${methodsToRemove.join(', ')}`);
methodsToRemove.forEach(method => rpcListeners.delete(method));
}
if (rpcListeners.size === 0) {
rpcListeners.delete(userId);
log({ module: 'websocket-rpc' }, `All RPC listeners removed for user ${userId}`);
// log({ module: 'websocket-rpc' }, `All RPC listeners removed for user ${userId}`);
}
});
}

View File

@ -130,11 +130,27 @@ export type UpdateEvent = {
} | {
type: 'delete-artifact';
artifactId: string;
} | {
type: 'delete-session';
sessionId: string;
} | {
type: 'relationship-updated';
uid: string;
status: 'none' | 'requested' | 'pending' | 'friend' | 'rejected';
timestamp: number;
} | {
type: 'new-feed-post';
id: string;
body: any;
cursor: string;
createdAt: number;
} | {
type: 'kv-batch-update';
changes: Array<{
key: string;
value: string | null; // null indicates deletion
version: number; // -1 for deleted keys
}>;
};
// === EPHEMERAL EVENT TYPES (Transient) ===
@ -380,6 +396,18 @@ export function buildUpdateSessionUpdate(sessionId: string, updateSeq: number, u
};
}
export function buildDeleteSessionUpdate(sessionId: string, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'delete-session',
sid: sessionId
},
createdAt: Date.now()
};
}
export function buildUpdateAccountUpdate(userId: string, profile: Partial<AccountProfile>, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
@ -556,3 +584,39 @@ export function buildRelationshipUpdatedEvent(
createdAt: Date.now()
};
}
export function buildNewFeedPostUpdate(feedItem: {
id: string;
body: any;
cursor: string;
createdAt: number;
}, updateSeq: number, updateId: string): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'new-feed-post',
id: feedItem.id,
body: feedItem.body,
cursor: feedItem.cursor,
createdAt: feedItem.createdAt
},
createdAt: Date.now()
};
}
export function buildKVBatchUpdateUpdate(
changes: Array<{ key: string; value: string | null; version: number }>,
updateSeq: number,
updateId: string
): UpdatePayload {
return {
id: updateId,
seq: updateSeq,
body: {
t: 'kv-batch-update',
changes
},
createdAt: Date.now()
};
}

View File

@ -0,0 +1,55 @@
import { Context } from "@/context";
import { FeedOptions, FeedResult } from "./types";
import { Prisma } from "@prisma/client";
import { Tx } from "@/storage/inTx";
/**
* Fetch user's feed with pagination.
* Returns items in reverse chronological order (newest first).
* Supports cursor-based pagination using the counter field.
*/
export async function feedGet(
tx: Tx,
ctx: Context,
options?: FeedOptions
): Promise<FeedResult> {
const limit = options?.limit ?? 100;
const cursor = options?.cursor;
// Build where clause for cursor pagination
const where: Prisma.UserFeedItemWhereInput = { userId: ctx.uid };
if (cursor?.before !== undefined) {
if (cursor.before.startsWith('0-')) {
where.counter = { lt: parseInt(cursor.before.substring(2), 10) };
} else {
throw new Error('Invalid cursor format');
}
} else if (cursor?.after !== undefined) {
if (cursor.after.startsWith('0-')) {
where.counter = { gt: parseInt(cursor.after.substring(2), 10) };
} else {
throw new Error('Invalid cursor format');
}
}
// Fetch items + 1 to determine hasMore
const items = await tx.userFeedItem.findMany({
where,
orderBy: { counter: 'desc' },
take: limit + 1
});
// Check if there are more items
const hasMore = items.length > limit;
// Return only requested limit
return {
items: items.slice(0, limit).map(item => ({
...item,
createdAt: item.createdAt.getTime(),
cursor: '0-' + item.counter.toString(10)
})),
hasMore
};
}

View File

@ -0,0 +1,67 @@
import { Context } from "@/context";
import { FeedBody, UserFeedItem } from "./types";
import { afterTx, Tx } from "@/storage/inTx";
import { allocateUserSeq } from "@/storage/seq";
import { eventRouter, buildNewFeedPostUpdate } from "@/app/events/eventRouter";
import { randomKeyNaked } from "@/utils/randomKeyNaked";
/**
* Add a post to user's feed.
* If repeatKey is provided and exists, the post will be updated in-place.
* Otherwise, a new post is created with an incremented counter.
*/
export async function feedPost(
tx: Tx,
ctx: Context,
body: FeedBody,
repeatKey?: string | null
): Promise<UserFeedItem> {
// Delete existing items with the same repeatKey
if (repeatKey) {
await tx.userFeedItem.deleteMany({
where: {
userId: ctx.uid,
repeatKey: repeatKey
}
});
}
// Allocate new counter
const user = await tx.account.update({
where: { id: ctx.uid },
select: { feedSeq: true },
data: { feedSeq: { increment: 1 } }
});
// Create new item
const item = await tx.userFeedItem.create({
data: {
counter: user.feedSeq,
userId: ctx.uid,
repeatKey: repeatKey,
body: body
}
});
const result = {
...item,
createdAt: item.createdAt.getTime(),
cursor: '0-' + item.counter.toString(10)
};
// Emit socket event after transaction completes
afterTx(tx, async () => {
const updateSeq = await allocateUserSeq(ctx.uid);
const updatePayload = buildNewFeedPostUpdate(result, updateSeq, randomKeyNaked(12));
eventRouter.emitUpdate({
userId: ctx.uid,
payload: updatePayload,
recipientFilter: { type: 'all-user-authenticated-connections' }
});
});
return result;
}

33
sources/app/feed/types.ts Normal file
View File

@ -0,0 +1,33 @@
import * as z from "zod";
export const FeedBodySchema = z.discriminatedUnion('kind', [
z.object({ kind: z.literal('friend_request'), uid: z.string() }),
z.object({ kind: z.literal('friend_accepted'), uid: z.string() }),
z.object({ kind: z.literal('text'), text: z.string() })
]);
export type FeedBody = z.infer<typeof FeedBodySchema>;
export interface UserFeedItem {
id: string;
userId: string;
repeatKey: string | null;
body: FeedBody;
createdAt: number;
cursor: string;
}
export interface FeedCursor {
before?: string;
after?: string;
}
export interface FeedOptions {
limit?: number;
cursor?: FeedCursor;
}
export interface FeedResult {
items: UserFeedItem[];
hasMore: boolean;
}

View File

@ -0,0 +1,41 @@
import { db } from "@/storage/db";
import * as privacyKit from "privacy-kit";
export interface KVBulkGetResult {
values: Array<{
key: string;
value: string;
version: number;
}>;
}
/**
* Get multiple key-value pairs for the authenticated user.
* Only returns existing keys with non-null values; missing or deleted keys are omitted.
*/
export async function kvBulkGet(
ctx: { uid: string },
keys: string[]
): Promise<KVBulkGetResult> {
const results = await db.userKVStore.findMany({
where: {
accountId: ctx.uid,
key: {
in: keys
},
value: {
not: null // Exclude deleted entries
}
}
});
return {
values: results
.filter(r => r.value !== null) // Extra safety check
.map(r => ({
key: r.key,
value: privacyKit.encodeBase64(r.value!),
version: r.version
}))
};
}

37
sources/app/kv/kvGet.ts Normal file
View File

@ -0,0 +1,37 @@
import { db } from "@/storage/db";
import * as privacyKit from "privacy-kit";
export type KVGetResult = {
key: string;
value: string;
version: number;
} | null;
/**
* Get a single key-value pair for the authenticated user.
* Returns null if the key doesn't exist or if the value is null (deleted).
*/
export async function kvGet(
ctx: { uid: string },
key: string
): Promise<KVGetResult> {
const result = await db.userKVStore.findUnique({
where: {
accountId_key: {
accountId: ctx.uid,
key
}
}
});
// Treat missing records and null values as "not found"
if (!result || result.value === null) {
return null;
}
return {
key: result.key,
value: privacyKit.encodeBase64(result.value),
version: result.version
};
}

56
sources/app/kv/kvList.ts Normal file
View File

@ -0,0 +1,56 @@
import { db } from "@/storage/db";
import * as privacyKit from "privacy-kit";
export interface KVListOptions {
prefix?: string;
limit?: number;
}
export interface KVListResult {
items: Array<{
key: string;
value: string;
version: number;
}>;
}
/**
* List all key-value pairs for the authenticated user, optionally filtered by prefix.
* Returns keys, values, and versions. Excludes entries with null values (deleted).
*/
export async function kvList(
ctx: { uid: string },
options?: KVListOptions
): Promise<KVListResult> {
const where: any = {
accountId: ctx.uid,
value: {
not: null // Exclude deleted entries (null values)
}
};
// Add prefix filter if specified
if (options?.prefix) {
where.key = {
startsWith: options.prefix
};
}
const results = await db.userKVStore.findMany({
where,
orderBy: {
key: 'asc'
},
take: options?.limit
});
return {
items: results
.filter(r => r.value !== null) // Extra safety check
.map(r => ({
key: r.key,
value: privacyKit.encodeBase64(r.value!),
version: r.version
}))
};
}

139
sources/app/kv/kvMutate.ts Normal file
View File

@ -0,0 +1,139 @@
import { db } from "@/storage/db";
import { inTx, afterTx } from "@/storage/inTx";
import { allocateUserSeq } from "@/storage/seq";
import { randomKeyNaked } from "@/utils/randomKeyNaked";
import { eventRouter, buildKVBatchUpdateUpdate } from "@/app/events/eventRouter";
import * as privacyKit from "privacy-kit";
export interface KVMutation {
key: string;
value: string | null; // null = delete (sets value to null but keeps record)
version: number; // Always required, use -1 for new keys
}
export interface KVMutateResult {
success: boolean;
results?: Array<{
key: string;
version: number;
}>;
errors?: Array<{
key: string;
error: 'version-mismatch';
version: number;
value: string | null; // Current value (null if deleted)
}>;
}
/**
* Atomically mutate multiple key-value pairs.
* All mutations succeed or all fail.
* Version is always required for all operations (use -1 for new keys).
* Delete operations set value to null but keep the record with incremented version.
* Sends a single bundled update notification for all changes.
*/
export async function kvMutate(
ctx: { uid: string },
mutations: KVMutation[]
): Promise<KVMutateResult> {
return await inTx(async (tx) => {
const errors: KVMutateResult['errors'] = [];
// Pre-validate all mutations
for (const mutation of mutations) {
const existing = await tx.userKVStore.findUnique({
where: {
accountId_key: {
accountId: ctx.uid,
key: mutation.key
}
}
});
const currentVersion = existing?.version ?? -1;
// Version check is always required
if (currentVersion !== mutation.version) {
errors.push({
key: mutation.key,
error: 'version-mismatch',
version: currentVersion,
value: existing?.value ? privacyKit.encodeBase64(existing.value) : null
});
}
}
// If any errors, return all errors and abort
if (errors.length > 0) {
return { success: false, errors };
}
// Apply all mutations and collect results
const results: Array<{ key: string; version: number }> = [];
const changes: Array<{ key: string; value: string | null; version: number }> = [];
for (const mutation of mutations) {
if (mutation.version === -1) {
// Create new entry (must not exist)
const result = await tx.userKVStore.create({
data: {
accountId: ctx.uid,
key: mutation.key,
value: mutation.value ? new Uint8Array(Buffer.from(mutation.value, 'base64')) : null,
version: 0
}
});
results.push({
key: mutation.key,
version: result.version
});
changes.push({
key: mutation.key,
value: mutation.value,
version: result.version
});
} else {
// Update existing entry (including "delete" which sets value to null)
const newVersion = mutation.version + 1;
const result = await tx.userKVStore.update({
where: {
accountId_key: {
accountId: ctx.uid,
key: mutation.key
}
},
data: {
value: mutation.value ? privacyKit.decodeBase64(mutation.value) : null,
version: newVersion
}
});
results.push({
key: mutation.key,
version: result.version
});
changes.push({
key: mutation.key,
value: mutation.value,
version: result.version
});
}
}
// Send single bundled notification for all changes
afterTx(tx, async () => {
const updateSeq = await allocateUserSeq(ctx.uid);
eventRouter.emitUpdate({
userId: ctx.uid,
payload: buildKVBatchUpdateUpdate(changes, updateSeq, randomKeyNaked(12)),
recipientFilter: { type: 'user-scoped-only' }
});
});
return { success: true, results };
});
}

View File

@ -0,0 +1,108 @@
import { Context } from "@/context";
import { inTx, afterTx } from "@/storage/inTx";
import { eventRouter, buildDeleteSessionUpdate } from "@/app/events/eventRouter";
import { allocateUserSeq } from "@/storage/seq";
import { randomKeyNaked } from "@/utils/randomKeyNaked";
import { log } from "@/utils/log";
/**
* Delete a session and all its related data.
* Handles:
* - Deleting all session messages
* - Deleting all usage reports for the session
* - Deleting all access keys for the session
* - Deleting the session itself
* - Sending socket notification to all connected clients
*
* @param ctx - Context with user information
* @param sessionId - ID of the session to delete
* @returns true if deletion was successful, false if session not found or not owned by user
*/
export async function sessionDelete(ctx: Context, sessionId: string): Promise<boolean> {
return await inTx(async (tx) => {
// Verify session exists and belongs to the user
const session = await tx.session.findFirst({
where: {
id: sessionId,
accountId: ctx.uid
}
});
if (!session) {
log({
module: 'session-delete',
userId: ctx.uid,
sessionId
}, `Session not found or not owned by user`);
return false;
}
// Delete all related data
// Note: Order matters to avoid foreign key constraint violations
// 1. Delete session messages
const deletedMessages = await tx.sessionMessage.deleteMany({
where: { sessionId }
});
log({
module: 'session-delete',
userId: ctx.uid,
sessionId,
deletedCount: deletedMessages.count
}, `Deleted ${deletedMessages.count} session messages`);
// 2. Delete usage reports
const deletedReports = await tx.usageReport.deleteMany({
where: { sessionId }
});
log({
module: 'session-delete',
userId: ctx.uid,
sessionId,
deletedCount: deletedReports.count
}, `Deleted ${deletedReports.count} usage reports`);
// 3. Delete access keys
const deletedAccessKeys = await tx.accessKey.deleteMany({
where: { sessionId }
});
log({
module: 'session-delete',
userId: ctx.uid,
sessionId,
deletedCount: deletedAccessKeys.count
}, `Deleted ${deletedAccessKeys.count} access keys`);
// 4. Delete the session itself
await tx.session.delete({
where: { id: sessionId }
});
log({
module: 'session-delete',
userId: ctx.uid,
sessionId
}, `Session deleted successfully`);
// Send notification after transaction commits
afterTx(tx, async () => {
const updSeq = await allocateUserSeq(ctx.uid);
const updatePayload = buildDeleteSessionUpdate(sessionId, updSeq, randomKeyNaked(12));
log({
module: 'session-delete',
userId: ctx.uid,
sessionId,
updateType: 'delete-session',
updatePayload: JSON.stringify(updatePayload)
}, `Emitting delete-session update to all user connections`);
eventRouter.emitUpdate({
userId: ctx.uid,
payload: updatePayload,
recipientFilter: { type: 'all-user-authenticated-connections' }
});
});
return true;
});
}

View File

@ -1,10 +1,18 @@
import { Context } from "@/context";
import { buildUserProfile, UserProfile } from "./type";
import { db } from "@/storage/db";
import { inTx } from "@/storage/inTx";
import { RelationshipStatus } from "@prisma/client";
import { relationshipSet } from "./relationshipSet";
import { relationshipGet } from "./relationshipGet";
import { sendFriendRequestNotification, sendFriendshipEstablishedNotification } from "./friendNotification";
/**
* Add a friend or accept a friend request.
* Handles:
* - Accepting incoming friend requests (both users become friends)
* - Sending new friend requests
* - Sending appropriate notifications with 24-hour cooldown
*/
export async function friendAdd(ctx: Context, uid: string): Promise<UserProfile | null> {
// Prevent self-friendship
if (ctx.uid === uid) {
@ -12,7 +20,7 @@ export async function friendAdd(ctx: Context, uid: string): Promise<UserProfile
}
// Update relationship status
return await db.$transaction(async (tx) => {
return await inTx(async (tx) => {
// Read current user objects
const currentUser = await tx.account.findUnique({
@ -40,6 +48,9 @@ export async function friendAdd(ctx: Context, uid: string): Promise<UserProfile
await relationshipSet(tx, targetUser.id, currentUser.id, RelationshipStatus.friend);
await relationshipSet(tx, currentUser.id, targetUser.id, RelationshipStatus.friend);
// Send friendship established notifications to both users
await sendFriendshipEstablishedNotification(tx, currentUser.id, targetUser.id);
// Return the target user profile
return buildUserProfile(targetUser, RelationshipStatus.friend);
}
@ -54,6 +65,9 @@ export async function friendAdd(ctx: Context, uid: string): Promise<UserProfile
await relationshipSet(tx, targetUser.id, currentUser.id, RelationshipStatus.pending);
}
// Send friend request notification to the receiver
await sendFriendRequestNotification(tx, targetUser.id, currentUser.id);
// Return the target user profile
return buildUserProfile(targetUser, RelationshipStatus.requested);
}

View File

@ -0,0 +1,61 @@
import { describe, it, expect, vi } from "vitest";
import { RelationshipStatus } from "@prisma/client";
// Mock the dependencies that require environment variables
vi.mock("@/storage/files", () => ({
getPublicUrl: vi.fn((path: string) => `https://example.com/${path}`)
}));
vi.mock("@/app/feed/feedPost", () => ({
feedPost: vi.fn()
}));
vi.mock("@/storage/inTx", () => ({
afterTx: vi.fn()
}));
// Import after mocking
import { shouldSendNotification } from "./friendNotification";
describe("friendNotification", () => {
describe("shouldSendNotification", () => {
it("should return true when lastNotifiedAt is null", () => {
const result = shouldSendNotification(null, RelationshipStatus.pending);
expect(result).toBe(true);
});
it("should return false for rejected relationships", () => {
const result = shouldSendNotification(null, RelationshipStatus.rejected);
expect(result).toBe(false);
});
it("should return false for rejected relationships even if 24 hours passed", () => {
const twentyFiveHoursAgo = new Date(Date.now() - 25 * 60 * 60 * 1000);
const result = shouldSendNotification(twentyFiveHoursAgo, RelationshipStatus.rejected);
expect(result).toBe(false);
});
it("should return true when 24 hours have passed since last notification", () => {
const twentyFiveHoursAgo = new Date(Date.now() - 25 * 60 * 60 * 1000);
const result = shouldSendNotification(twentyFiveHoursAgo, RelationshipStatus.pending);
expect(result).toBe(true);
});
it("should return false when less than 24 hours have passed", () => {
const tenHoursAgo = new Date(Date.now() - 10 * 60 * 60 * 1000);
const result = shouldSendNotification(tenHoursAgo, RelationshipStatus.pending);
expect(result).toBe(false);
});
it("should work for friend status", () => {
const twentyFiveHoursAgo = new Date(Date.now() - 25 * 60 * 60 * 1000);
const result = shouldSendNotification(twentyFiveHoursAgo, RelationshipStatus.friend);
expect(result).toBe(true);
});
it("should work for requested status", () => {
const result = shouldSendNotification(null, RelationshipStatus.requested);
expect(result).toBe(true);
});
});
});

View File

@ -0,0 +1,170 @@
import { Prisma, RelationshipStatus } from "@prisma/client";
import { feedPost } from "@/app/feed/feedPost";
import { Context } from "@/context";
import { afterTx } from "@/storage/inTx";
/**
* Check if a notification should be sent based on the last notification time and relationship status.
* Returns true if:
* - No previous notification was sent (lastNotifiedAt is null)
* - OR 24 hours have passed since the last notification
* - AND the relationship is not rejected
*/
export function shouldSendNotification(
lastNotifiedAt: Date | null,
status: RelationshipStatus
): boolean {
// Don't send notifications for rejected relationships
if (status === RelationshipStatus.rejected) {
return false;
}
// If never notified, send notification
if (!lastNotifiedAt) {
return true;
}
// Check if 24 hours have passed since last notification
const twentyFourHoursAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
return lastNotifiedAt < twentyFourHoursAgo;
}
/**
* Send a friend request notification to the receiver and update lastNotifiedAt.
* This creates a feed item for the receiver about the incoming friend request.
*/
export async function sendFriendRequestNotification(
tx: Prisma.TransactionClient,
receiverUserId: string,
senderUserId: string
): Promise<void> {
// Check if we should send notification to receiver
const receiverRelationship = await tx.userRelationship.findUnique({
where: {
fromUserId_toUserId: {
fromUserId: receiverUserId,
toUserId: senderUserId
}
}
});
if (!receiverRelationship || !shouldSendNotification(
receiverRelationship.lastNotifiedAt,
receiverRelationship.status
)) {
return;
}
// Create feed notification for receiver
const receiverCtx = Context.create(receiverUserId);
await feedPost(
tx,
receiverCtx,
{
kind: 'friend_request',
uid: senderUserId
},
`friend_request_${senderUserId}` // repeatKey to avoid duplicates
);
// Update lastNotifiedAt for the receiver's relationship record
await tx.userRelationship.update({
where: {
fromUserId_toUserId: {
fromUserId: receiverUserId,
toUserId: senderUserId
}
},
data: {
lastNotifiedAt: new Date()
}
});
}
/**
* Send friendship established notifications to both users and update lastNotifiedAt.
* This creates feed items for both users about the new friendship.
*/
export async function sendFriendshipEstablishedNotification(
tx: Prisma.TransactionClient,
user1Id: string,
user2Id: string
): Promise<void> {
// Check and send notification to user1
const user1Relationship = await tx.userRelationship.findUnique({
where: {
fromUserId_toUserId: {
fromUserId: user1Id,
toUserId: user2Id
}
}
});
if (user1Relationship && shouldSendNotification(
user1Relationship.lastNotifiedAt,
user1Relationship.status
)) {
const user1Ctx = Context.create(user1Id);
await feedPost(
tx,
user1Ctx,
{
kind: 'friend_accepted',
uid: user2Id
},
`friend_accepted_${user2Id}` // repeatKey to avoid duplicates
);
// Update lastNotifiedAt for user1
await tx.userRelationship.update({
where: {
fromUserId_toUserId: {
fromUserId: user1Id,
toUserId: user2Id
}
},
data: {
lastNotifiedAt: new Date()
}
});
}
// Check and send notification to user2
const user2Relationship = await tx.userRelationship.findUnique({
where: {
fromUserId_toUserId: {
fromUserId: user2Id,
toUserId: user1Id
}
}
});
if (user2Relationship && shouldSendNotification(
user2Relationship.lastNotifiedAt,
user2Relationship.status
)) {
const user2Ctx = Context.create(user2Id);
await feedPost(
tx,
user2Ctx,
{
kind: 'friend_accepted',
uid: user1Id
},
`friend_accepted_${user1Id}` // repeatKey to avoid duplicates
);
// Update lastNotifiedAt for user2
await tx.userRelationship.update({
where: {
fromUserId_toUserId: {
fromUserId: user2Id,
toUserId: user1Id
}
},
data: {
lastNotifiedAt: new Date()
}
});
}
}

View File

@ -1,12 +1,12 @@
import { Context } from "@/context";
import { buildUserProfile, UserProfile } from "./type";
import { db } from "@/storage/db";
import { inTx } from "@/storage/inTx";
import { RelationshipStatus } from "@prisma/client";
import { relationshipSet } from "./relationshipSet";
import { relationshipGet } from "./relationshipGet";
export async function friendRemove(ctx: Context, uid: string): Promise<UserProfile | null> {
return await db.$transaction(async (tx) => {
return await inTx(async (tx) => {
// Read current user objects
const currentUser = await tx.account.findUnique({

View File

@ -1,7 +1,17 @@
import { Prisma } from "@prisma/client";
import { RelationshipStatus } from "@prisma/client";
export async function relationshipSet(tx: Prisma.TransactionClient, from: string, to: string, status: RelationshipStatus) {
export async function relationshipSet(tx: Prisma.TransactionClient, from: string, to: string, status: RelationshipStatus, lastNotifiedAt?: Date) {
// Get existing relationship to preserve lastNotifiedAt
const existing = await tx.userRelationship.findUnique({
where: {
fromUserId_toUserId: {
fromUserId: from,
toUserId: to
}
}
});
if (status === RelationshipStatus.friend) {
await tx.userRelationship.upsert({
where: {
@ -14,11 +24,14 @@ export async function relationshipSet(tx: Prisma.TransactionClient, from: string
fromUserId: from,
toUserId: to,
status,
acceptedAt: new Date()
acceptedAt: new Date(),
lastNotifiedAt: lastNotifiedAt || null
},
update: {
status,
acceptedAt: new Date()
acceptedAt: new Date(),
// Preserve existing lastNotifiedAt, only update if explicitly provided
lastNotifiedAt: lastNotifiedAt || existing?.lastNotifiedAt || undefined
}
});
} else {
@ -33,11 +46,14 @@ export async function relationshipSet(tx: Prisma.TransactionClient, from: string
fromUserId: from,
toUserId: to,
status,
acceptedAt: null
acceptedAt: null,
lastNotifiedAt: lastNotifiedAt || null
},
update: {
status,
acceptedAt: null
acceptedAt: null,
// Preserve existing lastNotifiedAt, only update if explicitly provided
lastNotifiedAt: lastNotifiedAt || existing?.lastNotifiedAt || undefined
}
});
}

View File

@ -11,6 +11,4 @@ export class Context {
private constructor(uid: string) {
this.uid = uid;
}
}
export type Tx = Prisma.TransactionClient | PrismaClient;
}