Compare commits
10 Commits
595e23967a
...
b594832c30
Author | SHA1 | Date | |
---|---|---|---|
b594832c30 | |||
26d42b4cf1 | |||
|
5ba780c8a2 | ||
|
b2c72cc485 | ||
|
aeba1b6f1c | ||
|
014473a7ac | ||
|
cfd7a7b783 | ||
|
9822512aeb | ||
|
c534331ce5 | ||
|
0ce1bb4c9a |
43
.github/workflows/build-main-docker.yml
vendored
Normal file
43
.github/workflows/build-main-docker.yml
vendored
Normal file
@ -0,0 +1,43 @@
|
||||
name: Build Develop Docker to Docker Hub
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
|
||||
env:
|
||||
REGISTRY: docker.io
|
||||
IMAGE_NAME: ${{ secrets.DOCKERHUB_USERNAME }}/${{ github.event.repository.name }}
|
||||
USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
TOKEN: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||
|
||||
jobs:
|
||||
docker:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
-
|
||||
name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
-
|
||||
name: Login to ${{ env.REGISTRY }}
|
||||
if: github.event_name != 'pull_request'
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: ${{ env.REGISTRY }}
|
||||
username: ${{ env.USERNAME }}
|
||||
password: ${{ env.TOKEN }}
|
||||
-
|
||||
name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
-
|
||||
name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
-
|
||||
name: Build and push
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
push: ${{ github.event_name != 'pull_request' }}
|
||||
build-args: |
|
||||
PUSH_RECENT=1
|
||||
tags: |
|
||||
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
|
@ -160,6 +160,7 @@ The project has pending Prisma migrations that need to be applied:
|
||||
- Routes are in `/sources/apps/api/routes`
|
||||
- Use Fastify with Zod for type-safe route definitions
|
||||
- Always validate inputs using Zod
|
||||
- **Idempotency**: Design all operations to be idempotent - clients may retry requests automatically and the backend must handle multiple invocations of the same operation gracefully, producing the same result as a single invocation
|
||||
|
||||
## Docker Deployment
|
||||
|
||||
@ -276,4 +277,5 @@ tail -500 .logs/*.log | grep "applySessions.*active" | tail -10
|
||||
- Do not add logging when not asked
|
||||
- do not run non-transactional things (like uploadign files) in transactions
|
||||
- After writing an action - add a documentation comment that explains logic, also keep it in sync.
|
||||
- always use github usernames
|
||||
- always use github usernames
|
||||
- Always use privacyKit.decodeBase64 and privacyKit.encodeBase64 from privacy-kit instead of using buffer
|
@ -37,9 +37,10 @@ COPY --from=builder /app/tsconfig.json ./tsconfig.json
|
||||
COPY --from=builder /app/package.json ./package.json
|
||||
COPY --from=builder /app/node_modules ./node_modules
|
||||
COPY --from=builder /app/sources ./sources
|
||||
COPY --from=builder /app/prisma ./prisma
|
||||
|
||||
# Expose the port the app will run on
|
||||
EXPOSE 3000
|
||||
|
||||
# Command to run the application
|
||||
CMD ["yarn", "start"]
|
||||
CMD ["yarn", "start"]
|
||||
|
27
prisma/migrations/20250920213557_add_user_feed/migration.sql
Normal file
27
prisma/migrations/20250920213557_add_user_feed/migration.sql
Normal file
@ -0,0 +1,27 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE "Account" ADD COLUMN "feedSeq" BIGINT NOT NULL DEFAULT 0;
|
||||
|
||||
-- CreateTable
|
||||
CREATE TABLE "UserFeedItem" (
|
||||
"id" TEXT NOT NULL,
|
||||
"userId" TEXT NOT NULL,
|
||||
"counter" BIGINT NOT NULL,
|
||||
"repeatKey" TEXT,
|
||||
"body" JSONB NOT NULL,
|
||||
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"updatedAt" TIMESTAMP(3) NOT NULL,
|
||||
|
||||
CONSTRAINT "UserFeedItem_pkey" PRIMARY KEY ("id")
|
||||
);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "UserFeedItem_userId_counter_idx" ON "UserFeedItem"("userId", "counter" DESC);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE UNIQUE INDEX "UserFeedItem_userId_counter_key" ON "UserFeedItem"("userId", "counter");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE UNIQUE INDEX "UserFeedItem_userId_repeatKey_key" ON "UserFeedItem"("userId", "repeatKey");
|
||||
|
||||
-- AddForeignKey
|
||||
ALTER TABLE "UserFeedItem" ADD CONSTRAINT "UserFeedItem_userId_fkey" FOREIGN KEY ("userId") REFERENCES "Account"("id") ON DELETE CASCADE ON UPDATE CASCADE;
|
@ -0,0 +1,2 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE "UserRelationship" ADD COLUMN "lastNotifiedAt" TIMESTAMP(3);
|
21
prisma/migrations/20250922000310_add_user_kv/migration.sql
Normal file
21
prisma/migrations/20250922000310_add_user_kv/migration.sql
Normal file
@ -0,0 +1,21 @@
|
||||
-- CreateTable
|
||||
CREATE TABLE "UserKVStore" (
|
||||
"id" TEXT NOT NULL,
|
||||
"accountId" TEXT NOT NULL,
|
||||
"key" TEXT NOT NULL,
|
||||
"value" BYTEA,
|
||||
"version" INTEGER NOT NULL DEFAULT 0,
|
||||
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"updatedAt" TIMESTAMP(3) NOT NULL,
|
||||
|
||||
CONSTRAINT "UserKVStore_pkey" PRIMARY KEY ("id")
|
||||
);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "UserKVStore_accountId_idx" ON "UserKVStore"("accountId");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE UNIQUE INDEX "UserKVStore_accountId_key_key" ON "UserKVStore"("accountId", "key");
|
||||
|
||||
-- AddForeignKey
|
||||
ALTER TABLE "UserKVStore" ADD CONSTRAINT "UserKVStore_accountId_fkey" FOREIGN KEY ("accountId") REFERENCES "Account"("id") ON DELETE CASCADE ON UPDATE CASCADE;
|
@ -23,6 +23,7 @@ model Account {
|
||||
id String @id @default(cuid())
|
||||
publicKey String @unique
|
||||
seq Int @default(0)
|
||||
feedSeq BigInt @default(0)
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
settings String?
|
||||
@ -49,6 +50,8 @@ model Account {
|
||||
RelationshipsTo UserRelationship[] @relation("RelationshipsTo")
|
||||
Artifact Artifact[]
|
||||
AccessKey AccessKey[]
|
||||
UserFeedItem UserFeedItem[]
|
||||
UserKVStore UserKVStore[]
|
||||
}
|
||||
|
||||
model TerminalAuthRequest {
|
||||
@ -306,16 +309,55 @@ enum RelationshipStatus {
|
||||
}
|
||||
|
||||
model UserRelationship {
|
||||
fromUserId String
|
||||
fromUser Account @relation("RelationshipsFrom", fields: [fromUserId], references: [id], onDelete: Cascade)
|
||||
toUserId String
|
||||
toUser Account @relation("RelationshipsTo", fields: [toUserId], references: [id], onDelete: Cascade)
|
||||
status RelationshipStatus @default(pending)
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
acceptedAt DateTime?
|
||||
fromUserId String
|
||||
fromUser Account @relation("RelationshipsFrom", fields: [fromUserId], references: [id], onDelete: Cascade)
|
||||
toUserId String
|
||||
toUser Account @relation("RelationshipsTo", fields: [toUserId], references: [id], onDelete: Cascade)
|
||||
status RelationshipStatus @default(pending)
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
acceptedAt DateTime?
|
||||
lastNotifiedAt DateTime?
|
||||
|
||||
@@id([fromUserId, toUserId])
|
||||
@@index([toUserId, status])
|
||||
@@index([fromUserId, status])
|
||||
}
|
||||
|
||||
//
|
||||
// Feed
|
||||
//
|
||||
|
||||
model UserFeedItem {
|
||||
id String @id @default(cuid())
|
||||
userId String
|
||||
user Account @relation(fields: [userId], references: [id], onDelete: Cascade)
|
||||
counter BigInt
|
||||
repeatKey String?
|
||||
/// [FeedBody]
|
||||
body Json
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
|
||||
@@unique([userId, counter])
|
||||
@@unique([userId, repeatKey])
|
||||
@@index([userId, counter(sort: Desc)])
|
||||
}
|
||||
|
||||
//
|
||||
// Key-Value Storage
|
||||
//
|
||||
|
||||
model UserKVStore {
|
||||
id String @id @default(cuid())
|
||||
accountId String
|
||||
account Account @relation(fields: [accountId], references: [id], onDelete: Cascade)
|
||||
key String // Unencrypted for indexing
|
||||
value Bytes? // Encrypted value, null when "deleted"
|
||||
version Int @default(0)
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
|
||||
@@unique([accountId, key])
|
||||
@@index([accountId])
|
||||
}
|
||||
|
@ -19,6 +19,8 @@ import { enableMonitoring } from "./utils/enableMonitoring";
|
||||
import { enableErrorHandlers } from "./utils/enableErrorHandlers";
|
||||
import { enableAuthentication } from "./utils/enableAuthentication";
|
||||
import { userRoutes } from "./routes/userRoutes";
|
||||
import { feedRoutes } from "./routes/feedRoutes";
|
||||
import { kvRoutes } from "./routes/kvRoutes";
|
||||
|
||||
export async function startApi() {
|
||||
|
||||
@ -62,6 +64,8 @@ export async function startApi() {
|
||||
versionRoutes(typed);
|
||||
voiceRoutes(typed);
|
||||
userRoutes(typed);
|
||||
feedRoutes(typed);
|
||||
kvRoutes(typed);
|
||||
|
||||
// Start HTTP
|
||||
const port = process.env.PORT ? parseInt(process.env.PORT, 10) : 3005;
|
||||
|
40
sources/app/api/routes/feedRoutes.ts
Normal file
40
sources/app/api/routes/feedRoutes.ts
Normal file
@ -0,0 +1,40 @@
|
||||
import { z } from "zod";
|
||||
import { Fastify } from "../types";
|
||||
import { FeedBodySchema } from "@/app/feed/types";
|
||||
import { feedGet } from "@/app/feed/feedGet";
|
||||
import { Context } from "@/context";
|
||||
import { db } from "@/storage/db";
|
||||
|
||||
export function feedRoutes(app: Fastify) {
|
||||
app.get('/v1/feed', {
|
||||
preHandler: app.authenticate,
|
||||
schema: {
|
||||
querystring: z.object({
|
||||
before: z.string().optional(),
|
||||
after: z.string().optional(),
|
||||
limit: z.coerce.number().int().min(1).max(200).default(50)
|
||||
}).optional(),
|
||||
response: {
|
||||
200: z.object({
|
||||
items: z.array(z.object({
|
||||
id: z.string(),
|
||||
body: FeedBodySchema,
|
||||
repeatKey: z.string().nullable(),
|
||||
cursor: z.string(),
|
||||
createdAt: z.number()
|
||||
})),
|
||||
hasMore: z.boolean()
|
||||
})
|
||||
}
|
||||
}
|
||||
}, async (request, reply) => {
|
||||
const items = await feedGet(db, Context.create(request.userId), {
|
||||
cursor: {
|
||||
before: request.query?.before,
|
||||
after: request.query?.after
|
||||
},
|
||||
limit: request.query?.limit
|
||||
});
|
||||
return reply.send({ items: items.items, hasMore: items.hasMore });
|
||||
});
|
||||
}
|
172
sources/app/api/routes/kvRoutes.ts
Normal file
172
sources/app/api/routes/kvRoutes.ts
Normal file
@ -0,0 +1,172 @@
|
||||
import { z } from "zod";
|
||||
import { Fastify } from "../types";
|
||||
import { kvGet } from "@/app/kv/kvGet";
|
||||
import { kvList } from "@/app/kv/kvList";
|
||||
import { kvBulkGet } from "@/app/kv/kvBulkGet";
|
||||
import { kvMutate } from "@/app/kv/kvMutate";
|
||||
import { log } from "@/utils/log";
|
||||
|
||||
export function kvRoutes(app: Fastify) {
|
||||
// GET /v1/kv/:key - Get single value
|
||||
app.get('/v1/kv/:key', {
|
||||
preHandler: app.authenticate,
|
||||
schema: {
|
||||
params: z.object({
|
||||
key: z.string()
|
||||
}),
|
||||
response: {
|
||||
200: z.object({
|
||||
key: z.string(),
|
||||
value: z.string(),
|
||||
version: z.number()
|
||||
}).nullable(),
|
||||
404: z.object({
|
||||
error: z.literal('Key not found')
|
||||
}),
|
||||
500: z.object({
|
||||
error: z.literal('Failed to get value')
|
||||
})
|
||||
}
|
||||
}
|
||||
}, async (request, reply) => {
|
||||
const userId = request.userId;
|
||||
const { key } = request.params;
|
||||
|
||||
try {
|
||||
const result = await kvGet({ uid: userId }, key);
|
||||
|
||||
if (!result) {
|
||||
return reply.code(404).send({ error: 'Key not found' });
|
||||
}
|
||||
|
||||
return reply.send(result);
|
||||
} catch (error) {
|
||||
log({ module: 'api', level: 'error' }, `Failed to get KV value: ${error}`);
|
||||
return reply.code(500).send({ error: 'Failed to get value' });
|
||||
}
|
||||
});
|
||||
|
||||
// GET /v1/kv - List key-value pairs with optional prefix filter
|
||||
app.get('/v1/kv', {
|
||||
preHandler: app.authenticate,
|
||||
schema: {
|
||||
querystring: z.object({
|
||||
prefix: z.string().optional(),
|
||||
limit: z.coerce.number().int().min(1).max(1000).default(100)
|
||||
}),
|
||||
response: {
|
||||
200: z.object({
|
||||
items: z.array(z.object({
|
||||
key: z.string(),
|
||||
value: z.string(),
|
||||
version: z.number()
|
||||
}))
|
||||
}),
|
||||
500: z.object({
|
||||
error: z.literal('Failed to list items')
|
||||
})
|
||||
}
|
||||
}
|
||||
}, async (request, reply) => {
|
||||
const userId = request.userId;
|
||||
const { prefix, limit } = request.query;
|
||||
|
||||
try {
|
||||
const result = await kvList({ uid: userId }, { prefix, limit });
|
||||
return reply.send(result);
|
||||
} catch (error) {
|
||||
log({ module: 'api', level: 'error' }, `Failed to list KV items: ${error}`);
|
||||
return reply.code(500).send({ error: 'Failed to list items' });
|
||||
}
|
||||
});
|
||||
|
||||
// POST /v1/kv/bulk - Bulk get values
|
||||
app.post('/v1/kv/bulk', {
|
||||
preHandler: app.authenticate,
|
||||
schema: {
|
||||
body: z.object({
|
||||
keys: z.array(z.string()).min(1).max(100)
|
||||
}),
|
||||
response: {
|
||||
200: z.object({
|
||||
values: z.array(z.object({
|
||||
key: z.string(),
|
||||
value: z.string(),
|
||||
version: z.number()
|
||||
}))
|
||||
}),
|
||||
500: z.object({
|
||||
error: z.literal('Failed to get values')
|
||||
})
|
||||
}
|
||||
}
|
||||
}, async (request, reply) => {
|
||||
const userId = request.userId;
|
||||
const { keys } = request.body;
|
||||
|
||||
try {
|
||||
const result = await kvBulkGet({ uid: userId }, keys);
|
||||
return reply.send(result);
|
||||
} catch (error) {
|
||||
log({ module: 'api', level: 'error' }, `Failed to bulk get KV values: ${error}`);
|
||||
return reply.code(500).send({ error: 'Failed to get values' });
|
||||
}
|
||||
});
|
||||
|
||||
// PUT /v1/kv - Atomic batch mutation
|
||||
app.post('/v1/kv', {
|
||||
preHandler: app.authenticate,
|
||||
schema: {
|
||||
body: z.object({
|
||||
mutations: z.array(z.object({
|
||||
key: z.string(),
|
||||
value: z.string().nullable(),
|
||||
version: z.number() // Always required, use -1 for new keys
|
||||
})).min(1).max(100)
|
||||
}),
|
||||
response: {
|
||||
200: z.object({
|
||||
success: z.literal(true),
|
||||
results: z.array(z.object({
|
||||
key: z.string(),
|
||||
version: z.number()
|
||||
}))
|
||||
}),
|
||||
409: z.object({
|
||||
success: z.literal(false),
|
||||
errors: z.array(z.object({
|
||||
key: z.string(),
|
||||
error: z.literal('version-mismatch'),
|
||||
version: z.number(),
|
||||
value: z.string().nullable()
|
||||
}))
|
||||
}),
|
||||
500: z.object({
|
||||
error: z.literal('Failed to mutate values')
|
||||
})
|
||||
}
|
||||
}
|
||||
}, async (request, reply) => {
|
||||
const userId = request.userId;
|
||||
const { mutations } = request.body;
|
||||
|
||||
try {
|
||||
const result = await kvMutate({ uid: userId }, mutations);
|
||||
|
||||
if (!result.success) {
|
||||
return reply.code(409).send({
|
||||
success: false as const,
|
||||
errors: result.errors!
|
||||
});
|
||||
}
|
||||
|
||||
return reply.send({
|
||||
success: true as const,
|
||||
results: result.results!
|
||||
});
|
||||
} catch (error) {
|
||||
log({ module: 'api', level: 'error' }, `Failed to mutate KV values: ${error}`);
|
||||
return reply.code(500).send({ error: 'Failed to mutate values' });
|
||||
}
|
||||
});
|
||||
}
|
@ -6,6 +6,7 @@ import { Prisma } from "@prisma/client";
|
||||
import { log } from "@/utils/log";
|
||||
import { randomKeyNaked } from "@/utils/randomKeyNaked";
|
||||
import { allocateUserSeq } from "@/storage/seq";
|
||||
import { sessionDelete } from "@/app/session/sessionDelete";
|
||||
|
||||
export function sessionRoutes(app: Fastify) {
|
||||
|
||||
@ -352,4 +353,25 @@ export function sessionRoutes(app: Fastify) {
|
||||
}))
|
||||
});
|
||||
});
|
||||
|
||||
// Delete session
|
||||
app.delete('/v1/sessions/:sessionId', {
|
||||
schema: {
|
||||
params: z.object({
|
||||
sessionId: z.string()
|
||||
})
|
||||
},
|
||||
preHandler: app.authenticate
|
||||
}, async (request, reply) => {
|
||||
const userId = request.userId;
|
||||
const { sessionId } = request.params;
|
||||
|
||||
const deleted = await sessionDelete({ uid: userId }, sessionId);
|
||||
|
||||
if (!deleted) {
|
||||
return reply.code(404).send({ error: 'Session not found or not owned by user' });
|
||||
}
|
||||
|
||||
return reply.send({ success: true });
|
||||
});
|
||||
}
|
@ -17,15 +17,15 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
||||
// Check if method was already registered
|
||||
const previousSocket = rpcListeners.get(method);
|
||||
if (previousSocket && previousSocket !== socket) {
|
||||
log({ module: 'websocket-rpc' }, `RPC method ${method} re-registered: ${previousSocket.id} -> ${socket.id}`);
|
||||
// log({ module: 'websocket-rpc' }, `RPC method ${method} re-registered: ${previousSocket.id} -> ${socket.id}`);
|
||||
}
|
||||
|
||||
// Register this socket as the listener for this method
|
||||
rpcListeners.set(method, socket);
|
||||
|
||||
socket.emit('rpc-registered', { method });
|
||||
log({ module: 'websocket-rpc' }, `RPC method registered: ${method} on socket ${socket.id} (user: ${userId})`);
|
||||
log({ module: 'websocket-rpc' }, `Active RPC methods for user ${userId}: ${Array.from(rpcListeners.keys()).join(', ')}`);
|
||||
// log({ module: 'websocket-rpc' }, `RPC method registered: ${method} on socket ${socket.id} (user: ${userId})`);
|
||||
// log({ module: 'websocket-rpc' }, `Active RPC methods for user ${userId}: ${Array.from(rpcListeners.keys()).join(', ')}`);
|
||||
} catch (error) {
|
||||
log({ module: 'websocket', level: 'error' }, `Error in rpc-register: ${error}`);
|
||||
socket.emit('rpc-error', { type: 'register', error: 'Internal error' });
|
||||
@ -44,16 +44,16 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
||||
|
||||
if (rpcListeners.get(method) === socket) {
|
||||
rpcListeners.delete(method);
|
||||
log({ module: 'websocket-rpc' }, `RPC method unregistered: ${method} from socket ${socket.id} (user: ${userId})`);
|
||||
// log({ module: 'websocket-rpc' }, `RPC method unregistered: ${method} from socket ${socket.id} (user: ${userId})`);
|
||||
|
||||
if (rpcListeners.size === 0) {
|
||||
rpcListeners.delete(userId);
|
||||
log({ module: 'websocket-rpc' }, `All RPC methods unregistered for user ${userId}`);
|
||||
// log({ module: 'websocket-rpc' }, `All RPC methods unregistered for user ${userId}`);
|
||||
} else {
|
||||
log({ module: 'websocket-rpc' }, `Remaining RPC methods for user ${userId}: ${Array.from(rpcListeners.keys()).join(', ')}`);
|
||||
// log({ module: 'websocket-rpc' }, `Remaining RPC methods for user ${userId}: ${Array.from(rpcListeners.keys()).join(', ')}`);
|
||||
}
|
||||
} else {
|
||||
log({ module: 'websocket-rpc' }, `RPC unregister ignored: ${method} not registered on socket ${socket.id}`);
|
||||
// log({ module: 'websocket-rpc' }, `RPC unregister ignored: ${method} not registered on socket ${socket.id}`);
|
||||
}
|
||||
|
||||
socket.emit('rpc-unregistered', { method });
|
||||
@ -80,7 +80,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
||||
|
||||
const targetSocket = rpcListeners.get(method);
|
||||
if (!targetSocket || !targetSocket.connected) {
|
||||
log({ module: 'websocket-rpc' }, `RPC call failed: Method ${method} not available (disconnected or not registered)`);
|
||||
// log({ module: 'websocket-rpc' }, `RPC call failed: Method ${method} not available (disconnected or not registered)`);
|
||||
if (callback) {
|
||||
callback({
|
||||
ok: false,
|
||||
@ -92,7 +92,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
||||
|
||||
// Don't allow calling your own socket
|
||||
if (targetSocket === socket) {
|
||||
log({ module: 'websocket-rpc' }, `RPC call failed: Attempted self-call on method ${method}`);
|
||||
// log({ module: 'websocket-rpc' }, `RPC call failed: Attempted self-call on method ${method}`);
|
||||
if (callback) {
|
||||
callback({
|
||||
ok: false,
|
||||
@ -104,7 +104,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
||||
|
||||
// Log RPC call initiation
|
||||
const startTime = Date.now();
|
||||
log({ module: 'websocket-rpc' }, `RPC call initiated: ${socket.id} -> ${method} (target: ${targetSocket.id})`);
|
||||
// log({ module: 'websocket-rpc' }, `RPC call initiated: ${socket.id} -> ${method} (target: ${targetSocket.id})`);
|
||||
|
||||
// Forward the RPC request to the target socket using emitWithAck
|
||||
try {
|
||||
@ -114,7 +114,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
||||
});
|
||||
|
||||
const duration = Date.now() - startTime;
|
||||
log({ module: 'websocket-rpc' }, `RPC call succeeded: ${method} (${duration}ms)`);
|
||||
// log({ module: 'websocket-rpc' }, `RPC call succeeded: ${method} (${duration}ms)`);
|
||||
|
||||
// Forward the response back to the caller via callback
|
||||
if (callback) {
|
||||
@ -127,7 +127,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
||||
} catch (error) {
|
||||
const duration = Date.now() - startTime;
|
||||
const errorMsg = error instanceof Error ? error.message : 'RPC call failed';
|
||||
log({ module: 'websocket-rpc' }, `RPC call failed: ${method} - ${errorMsg} (${duration}ms)`);
|
||||
// log({ module: 'websocket-rpc' }, `RPC call failed: ${method} - ${errorMsg} (${duration}ms)`);
|
||||
|
||||
// Timeout or error occurred
|
||||
if (callback) {
|
||||
@ -138,7 +138,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
log({ module: 'websocket', level: 'error' }, `Error in rpc-call: ${error}`);
|
||||
// log({ module: 'websocket', level: 'error' }, `Error in rpc-call: ${error}`);
|
||||
if (callback) {
|
||||
callback({
|
||||
ok: false,
|
||||
@ -158,13 +158,13 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
||||
}
|
||||
|
||||
if (methodsToRemove.length > 0) {
|
||||
log({ module: 'websocket-rpc' }, `Cleaning up RPC methods on disconnect for socket ${socket.id}: ${methodsToRemove.join(', ')}`);
|
||||
// log({ module: 'websocket-rpc' }, `Cleaning up RPC methods on disconnect for socket ${socket.id}: ${methodsToRemove.join(', ')}`);
|
||||
methodsToRemove.forEach(method => rpcListeners.delete(method));
|
||||
}
|
||||
|
||||
if (rpcListeners.size === 0) {
|
||||
rpcListeners.delete(userId);
|
||||
log({ module: 'websocket-rpc' }, `All RPC listeners removed for user ${userId}`);
|
||||
// log({ module: 'websocket-rpc' }, `All RPC listeners removed for user ${userId}`);
|
||||
}
|
||||
});
|
||||
}
|
@ -130,11 +130,27 @@ export type UpdateEvent = {
|
||||
} | {
|
||||
type: 'delete-artifact';
|
||||
artifactId: string;
|
||||
} | {
|
||||
type: 'delete-session';
|
||||
sessionId: string;
|
||||
} | {
|
||||
type: 'relationship-updated';
|
||||
uid: string;
|
||||
status: 'none' | 'requested' | 'pending' | 'friend' | 'rejected';
|
||||
timestamp: number;
|
||||
} | {
|
||||
type: 'new-feed-post';
|
||||
id: string;
|
||||
body: any;
|
||||
cursor: string;
|
||||
createdAt: number;
|
||||
} | {
|
||||
type: 'kv-batch-update';
|
||||
changes: Array<{
|
||||
key: string;
|
||||
value: string | null; // null indicates deletion
|
||||
version: number; // -1 for deleted keys
|
||||
}>;
|
||||
};
|
||||
|
||||
// === EPHEMERAL EVENT TYPES (Transient) ===
|
||||
@ -380,6 +396,18 @@ export function buildUpdateSessionUpdate(sessionId: string, updateSeq: number, u
|
||||
};
|
||||
}
|
||||
|
||||
export function buildDeleteSessionUpdate(sessionId: string, updateSeq: number, updateId: string): UpdatePayload {
|
||||
return {
|
||||
id: updateId,
|
||||
seq: updateSeq,
|
||||
body: {
|
||||
t: 'delete-session',
|
||||
sid: sessionId
|
||||
},
|
||||
createdAt: Date.now()
|
||||
};
|
||||
}
|
||||
|
||||
export function buildUpdateAccountUpdate(userId: string, profile: Partial<AccountProfile>, updateSeq: number, updateId: string): UpdatePayload {
|
||||
return {
|
||||
id: updateId,
|
||||
@ -556,3 +584,39 @@ export function buildRelationshipUpdatedEvent(
|
||||
createdAt: Date.now()
|
||||
};
|
||||
}
|
||||
|
||||
export function buildNewFeedPostUpdate(feedItem: {
|
||||
id: string;
|
||||
body: any;
|
||||
cursor: string;
|
||||
createdAt: number;
|
||||
}, updateSeq: number, updateId: string): UpdatePayload {
|
||||
return {
|
||||
id: updateId,
|
||||
seq: updateSeq,
|
||||
body: {
|
||||
t: 'new-feed-post',
|
||||
id: feedItem.id,
|
||||
body: feedItem.body,
|
||||
cursor: feedItem.cursor,
|
||||
createdAt: feedItem.createdAt
|
||||
},
|
||||
createdAt: Date.now()
|
||||
};
|
||||
}
|
||||
|
||||
export function buildKVBatchUpdateUpdate(
|
||||
changes: Array<{ key: string; value: string | null; version: number }>,
|
||||
updateSeq: number,
|
||||
updateId: string
|
||||
): UpdatePayload {
|
||||
return {
|
||||
id: updateId,
|
||||
seq: updateSeq,
|
||||
body: {
|
||||
t: 'kv-batch-update',
|
||||
changes
|
||||
},
|
||||
createdAt: Date.now()
|
||||
};
|
||||
}
|
||||
|
55
sources/app/feed/feedGet.ts
Normal file
55
sources/app/feed/feedGet.ts
Normal file
@ -0,0 +1,55 @@
|
||||
import { Context } from "@/context";
|
||||
import { FeedOptions, FeedResult } from "./types";
|
||||
import { Prisma } from "@prisma/client";
|
||||
import { Tx } from "@/storage/inTx";
|
||||
|
||||
/**
|
||||
* Fetch user's feed with pagination.
|
||||
* Returns items in reverse chronological order (newest first).
|
||||
* Supports cursor-based pagination using the counter field.
|
||||
*/
|
||||
export async function feedGet(
|
||||
tx: Tx,
|
||||
ctx: Context,
|
||||
options?: FeedOptions
|
||||
): Promise<FeedResult> {
|
||||
const limit = options?.limit ?? 100;
|
||||
const cursor = options?.cursor;
|
||||
|
||||
// Build where clause for cursor pagination
|
||||
const where: Prisma.UserFeedItemWhereInput = { userId: ctx.uid };
|
||||
|
||||
if (cursor?.before !== undefined) {
|
||||
if (cursor.before.startsWith('0-')) {
|
||||
where.counter = { lt: parseInt(cursor.before.substring(2), 10) };
|
||||
} else {
|
||||
throw new Error('Invalid cursor format');
|
||||
}
|
||||
} else if (cursor?.after !== undefined) {
|
||||
if (cursor.after.startsWith('0-')) {
|
||||
where.counter = { gt: parseInt(cursor.after.substring(2), 10) };
|
||||
} else {
|
||||
throw new Error('Invalid cursor format');
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch items + 1 to determine hasMore
|
||||
const items = await tx.userFeedItem.findMany({
|
||||
where,
|
||||
orderBy: { counter: 'desc' },
|
||||
take: limit + 1
|
||||
});
|
||||
|
||||
// Check if there are more items
|
||||
const hasMore = items.length > limit;
|
||||
|
||||
// Return only requested limit
|
||||
return {
|
||||
items: items.slice(0, limit).map(item => ({
|
||||
...item,
|
||||
createdAt: item.createdAt.getTime(),
|
||||
cursor: '0-' + item.counter.toString(10)
|
||||
})),
|
||||
hasMore
|
||||
};
|
||||
}
|
67
sources/app/feed/feedPost.ts
Normal file
67
sources/app/feed/feedPost.ts
Normal file
@ -0,0 +1,67 @@
|
||||
import { Context } from "@/context";
|
||||
import { FeedBody, UserFeedItem } from "./types";
|
||||
import { afterTx, Tx } from "@/storage/inTx";
|
||||
import { allocateUserSeq } from "@/storage/seq";
|
||||
import { eventRouter, buildNewFeedPostUpdate } from "@/app/events/eventRouter";
|
||||
import { randomKeyNaked } from "@/utils/randomKeyNaked";
|
||||
|
||||
/**
|
||||
* Add a post to user's feed.
|
||||
* If repeatKey is provided and exists, the post will be updated in-place.
|
||||
* Otherwise, a new post is created with an incremented counter.
|
||||
*/
|
||||
export async function feedPost(
|
||||
tx: Tx,
|
||||
ctx: Context,
|
||||
body: FeedBody,
|
||||
repeatKey?: string | null
|
||||
): Promise<UserFeedItem> {
|
||||
|
||||
|
||||
// Delete existing items with the same repeatKey
|
||||
if (repeatKey) {
|
||||
await tx.userFeedItem.deleteMany({
|
||||
where: {
|
||||
userId: ctx.uid,
|
||||
repeatKey: repeatKey
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Allocate new counter
|
||||
const user = await tx.account.update({
|
||||
where: { id: ctx.uid },
|
||||
select: { feedSeq: true },
|
||||
data: { feedSeq: { increment: 1 } }
|
||||
});
|
||||
|
||||
// Create new item
|
||||
const item = await tx.userFeedItem.create({
|
||||
data: {
|
||||
counter: user.feedSeq,
|
||||
userId: ctx.uid,
|
||||
repeatKey: repeatKey,
|
||||
body: body
|
||||
}
|
||||
});
|
||||
|
||||
const result = {
|
||||
...item,
|
||||
createdAt: item.createdAt.getTime(),
|
||||
cursor: '0-' + item.counter.toString(10)
|
||||
};
|
||||
|
||||
// Emit socket event after transaction completes
|
||||
afterTx(tx, async () => {
|
||||
const updateSeq = await allocateUserSeq(ctx.uid);
|
||||
const updatePayload = buildNewFeedPostUpdate(result, updateSeq, randomKeyNaked(12));
|
||||
|
||||
eventRouter.emitUpdate({
|
||||
userId: ctx.uid,
|
||||
payload: updatePayload,
|
||||
recipientFilter: { type: 'all-user-authenticated-connections' }
|
||||
});
|
||||
});
|
||||
|
||||
return result;
|
||||
}
|
33
sources/app/feed/types.ts
Normal file
33
sources/app/feed/types.ts
Normal file
@ -0,0 +1,33 @@
|
||||
import * as z from "zod";
|
||||
|
||||
export const FeedBodySchema = z.discriminatedUnion('kind', [
|
||||
z.object({ kind: z.literal('friend_request'), uid: z.string() }),
|
||||
z.object({ kind: z.literal('friend_accepted'), uid: z.string() }),
|
||||
z.object({ kind: z.literal('text'), text: z.string() })
|
||||
]);
|
||||
|
||||
export type FeedBody = z.infer<typeof FeedBodySchema>;
|
||||
|
||||
export interface UserFeedItem {
|
||||
id: string;
|
||||
userId: string;
|
||||
repeatKey: string | null;
|
||||
body: FeedBody;
|
||||
createdAt: number;
|
||||
cursor: string;
|
||||
}
|
||||
|
||||
export interface FeedCursor {
|
||||
before?: string;
|
||||
after?: string;
|
||||
}
|
||||
|
||||
export interface FeedOptions {
|
||||
limit?: number;
|
||||
cursor?: FeedCursor;
|
||||
}
|
||||
|
||||
export interface FeedResult {
|
||||
items: UserFeedItem[];
|
||||
hasMore: boolean;
|
||||
}
|
41
sources/app/kv/kvBulkGet.ts
Normal file
41
sources/app/kv/kvBulkGet.ts
Normal file
@ -0,0 +1,41 @@
|
||||
import { db } from "@/storage/db";
|
||||
import * as privacyKit from "privacy-kit";
|
||||
|
||||
export interface KVBulkGetResult {
|
||||
values: Array<{
|
||||
key: string;
|
||||
value: string;
|
||||
version: number;
|
||||
}>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get multiple key-value pairs for the authenticated user.
|
||||
* Only returns existing keys with non-null values; missing or deleted keys are omitted.
|
||||
*/
|
||||
export async function kvBulkGet(
|
||||
ctx: { uid: string },
|
||||
keys: string[]
|
||||
): Promise<KVBulkGetResult> {
|
||||
const results = await db.userKVStore.findMany({
|
||||
where: {
|
||||
accountId: ctx.uid,
|
||||
key: {
|
||||
in: keys
|
||||
},
|
||||
value: {
|
||||
not: null // Exclude deleted entries
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return {
|
||||
values: results
|
||||
.filter(r => r.value !== null) // Extra safety check
|
||||
.map(r => ({
|
||||
key: r.key,
|
||||
value: privacyKit.encodeBase64(r.value!),
|
||||
version: r.version
|
||||
}))
|
||||
};
|
||||
}
|
37
sources/app/kv/kvGet.ts
Normal file
37
sources/app/kv/kvGet.ts
Normal file
@ -0,0 +1,37 @@
|
||||
import { db } from "@/storage/db";
|
||||
import * as privacyKit from "privacy-kit";
|
||||
|
||||
export type KVGetResult = {
|
||||
key: string;
|
||||
value: string;
|
||||
version: number;
|
||||
} | null;
|
||||
|
||||
/**
|
||||
* Get a single key-value pair for the authenticated user.
|
||||
* Returns null if the key doesn't exist or if the value is null (deleted).
|
||||
*/
|
||||
export async function kvGet(
|
||||
ctx: { uid: string },
|
||||
key: string
|
||||
): Promise<KVGetResult> {
|
||||
const result = await db.userKVStore.findUnique({
|
||||
where: {
|
||||
accountId_key: {
|
||||
accountId: ctx.uid,
|
||||
key
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Treat missing records and null values as "not found"
|
||||
if (!result || result.value === null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
key: result.key,
|
||||
value: privacyKit.encodeBase64(result.value),
|
||||
version: result.version
|
||||
};
|
||||
}
|
56
sources/app/kv/kvList.ts
Normal file
56
sources/app/kv/kvList.ts
Normal file
@ -0,0 +1,56 @@
|
||||
import { db } from "@/storage/db";
|
||||
import * as privacyKit from "privacy-kit";
|
||||
|
||||
export interface KVListOptions {
|
||||
prefix?: string;
|
||||
limit?: number;
|
||||
}
|
||||
|
||||
export interface KVListResult {
|
||||
items: Array<{
|
||||
key: string;
|
||||
value: string;
|
||||
version: number;
|
||||
}>;
|
||||
}
|
||||
|
||||
/**
|
||||
* List all key-value pairs for the authenticated user, optionally filtered by prefix.
|
||||
* Returns keys, values, and versions. Excludes entries with null values (deleted).
|
||||
*/
|
||||
export async function kvList(
|
||||
ctx: { uid: string },
|
||||
options?: KVListOptions
|
||||
): Promise<KVListResult> {
|
||||
const where: any = {
|
||||
accountId: ctx.uid,
|
||||
value: {
|
||||
not: null // Exclude deleted entries (null values)
|
||||
}
|
||||
};
|
||||
|
||||
// Add prefix filter if specified
|
||||
if (options?.prefix) {
|
||||
where.key = {
|
||||
startsWith: options.prefix
|
||||
};
|
||||
}
|
||||
|
||||
const results = await db.userKVStore.findMany({
|
||||
where,
|
||||
orderBy: {
|
||||
key: 'asc'
|
||||
},
|
||||
take: options?.limit
|
||||
});
|
||||
|
||||
return {
|
||||
items: results
|
||||
.filter(r => r.value !== null) // Extra safety check
|
||||
.map(r => ({
|
||||
key: r.key,
|
||||
value: privacyKit.encodeBase64(r.value!),
|
||||
version: r.version
|
||||
}))
|
||||
};
|
||||
}
|
139
sources/app/kv/kvMutate.ts
Normal file
139
sources/app/kv/kvMutate.ts
Normal file
@ -0,0 +1,139 @@
|
||||
import { db } from "@/storage/db";
|
||||
import { inTx, afterTx } from "@/storage/inTx";
|
||||
import { allocateUserSeq } from "@/storage/seq";
|
||||
import { randomKeyNaked } from "@/utils/randomKeyNaked";
|
||||
import { eventRouter, buildKVBatchUpdateUpdate } from "@/app/events/eventRouter";
|
||||
import * as privacyKit from "privacy-kit";
|
||||
|
||||
export interface KVMutation {
|
||||
key: string;
|
||||
value: string | null; // null = delete (sets value to null but keeps record)
|
||||
version: number; // Always required, use -1 for new keys
|
||||
}
|
||||
|
||||
export interface KVMutateResult {
|
||||
success: boolean;
|
||||
results?: Array<{
|
||||
key: string;
|
||||
version: number;
|
||||
}>;
|
||||
errors?: Array<{
|
||||
key: string;
|
||||
error: 'version-mismatch';
|
||||
version: number;
|
||||
value: string | null; // Current value (null if deleted)
|
||||
}>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically mutate multiple key-value pairs.
|
||||
* All mutations succeed or all fail.
|
||||
* Version is always required for all operations (use -1 for new keys).
|
||||
* Delete operations set value to null but keep the record with incremented version.
|
||||
* Sends a single bundled update notification for all changes.
|
||||
*/
|
||||
export async function kvMutate(
|
||||
ctx: { uid: string },
|
||||
mutations: KVMutation[]
|
||||
): Promise<KVMutateResult> {
|
||||
return await inTx(async (tx) => {
|
||||
const errors: KVMutateResult['errors'] = [];
|
||||
|
||||
// Pre-validate all mutations
|
||||
for (const mutation of mutations) {
|
||||
const existing = await tx.userKVStore.findUnique({
|
||||
where: {
|
||||
accountId_key: {
|
||||
accountId: ctx.uid,
|
||||
key: mutation.key
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
const currentVersion = existing?.version ?? -1;
|
||||
|
||||
// Version check is always required
|
||||
if (currentVersion !== mutation.version) {
|
||||
errors.push({
|
||||
key: mutation.key,
|
||||
error: 'version-mismatch',
|
||||
version: currentVersion,
|
||||
value: existing?.value ? privacyKit.encodeBase64(existing.value) : null
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// If any errors, return all errors and abort
|
||||
if (errors.length > 0) {
|
||||
return { success: false, errors };
|
||||
}
|
||||
|
||||
// Apply all mutations and collect results
|
||||
const results: Array<{ key: string; version: number }> = [];
|
||||
const changes: Array<{ key: string; value: string | null; version: number }> = [];
|
||||
|
||||
for (const mutation of mutations) {
|
||||
if (mutation.version === -1) {
|
||||
// Create new entry (must not exist)
|
||||
const result = await tx.userKVStore.create({
|
||||
data: {
|
||||
accountId: ctx.uid,
|
||||
key: mutation.key,
|
||||
value: mutation.value ? new Uint8Array(Buffer.from(mutation.value, 'base64')) : null,
|
||||
version: 0
|
||||
}
|
||||
});
|
||||
|
||||
results.push({
|
||||
key: mutation.key,
|
||||
version: result.version
|
||||
});
|
||||
|
||||
changes.push({
|
||||
key: mutation.key,
|
||||
value: mutation.value,
|
||||
version: result.version
|
||||
});
|
||||
} else {
|
||||
// Update existing entry (including "delete" which sets value to null)
|
||||
const newVersion = mutation.version + 1;
|
||||
|
||||
const result = await tx.userKVStore.update({
|
||||
where: {
|
||||
accountId_key: {
|
||||
accountId: ctx.uid,
|
||||
key: mutation.key
|
||||
}
|
||||
},
|
||||
data: {
|
||||
value: mutation.value ? privacyKit.decodeBase64(mutation.value) : null,
|
||||
version: newVersion
|
||||
}
|
||||
});
|
||||
|
||||
results.push({
|
||||
key: mutation.key,
|
||||
version: result.version
|
||||
});
|
||||
|
||||
changes.push({
|
||||
key: mutation.key,
|
||||
value: mutation.value,
|
||||
version: result.version
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Send single bundled notification for all changes
|
||||
afterTx(tx, async () => {
|
||||
const updateSeq = await allocateUserSeq(ctx.uid);
|
||||
eventRouter.emitUpdate({
|
||||
userId: ctx.uid,
|
||||
payload: buildKVBatchUpdateUpdate(changes, updateSeq, randomKeyNaked(12)),
|
||||
recipientFilter: { type: 'user-scoped-only' }
|
||||
});
|
||||
});
|
||||
|
||||
return { success: true, results };
|
||||
});
|
||||
}
|
108
sources/app/session/sessionDelete.ts
Normal file
108
sources/app/session/sessionDelete.ts
Normal file
@ -0,0 +1,108 @@
|
||||
import { Context } from "@/context";
|
||||
import { inTx, afterTx } from "@/storage/inTx";
|
||||
import { eventRouter, buildDeleteSessionUpdate } from "@/app/events/eventRouter";
|
||||
import { allocateUserSeq } from "@/storage/seq";
|
||||
import { randomKeyNaked } from "@/utils/randomKeyNaked";
|
||||
import { log } from "@/utils/log";
|
||||
|
||||
/**
|
||||
* Delete a session and all its related data.
|
||||
* Handles:
|
||||
* - Deleting all session messages
|
||||
* - Deleting all usage reports for the session
|
||||
* - Deleting all access keys for the session
|
||||
* - Deleting the session itself
|
||||
* - Sending socket notification to all connected clients
|
||||
*
|
||||
* @param ctx - Context with user information
|
||||
* @param sessionId - ID of the session to delete
|
||||
* @returns true if deletion was successful, false if session not found or not owned by user
|
||||
*/
|
||||
export async function sessionDelete(ctx: Context, sessionId: string): Promise<boolean> {
|
||||
return await inTx(async (tx) => {
|
||||
// Verify session exists and belongs to the user
|
||||
const session = await tx.session.findFirst({
|
||||
where: {
|
||||
id: sessionId,
|
||||
accountId: ctx.uid
|
||||
}
|
||||
});
|
||||
|
||||
if (!session) {
|
||||
log({
|
||||
module: 'session-delete',
|
||||
userId: ctx.uid,
|
||||
sessionId
|
||||
}, `Session not found or not owned by user`);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Delete all related data
|
||||
// Note: Order matters to avoid foreign key constraint violations
|
||||
|
||||
// 1. Delete session messages
|
||||
const deletedMessages = await tx.sessionMessage.deleteMany({
|
||||
where: { sessionId }
|
||||
});
|
||||
log({
|
||||
module: 'session-delete',
|
||||
userId: ctx.uid,
|
||||
sessionId,
|
||||
deletedCount: deletedMessages.count
|
||||
}, `Deleted ${deletedMessages.count} session messages`);
|
||||
|
||||
// 2. Delete usage reports
|
||||
const deletedReports = await tx.usageReport.deleteMany({
|
||||
where: { sessionId }
|
||||
});
|
||||
log({
|
||||
module: 'session-delete',
|
||||
userId: ctx.uid,
|
||||
sessionId,
|
||||
deletedCount: deletedReports.count
|
||||
}, `Deleted ${deletedReports.count} usage reports`);
|
||||
|
||||
// 3. Delete access keys
|
||||
const deletedAccessKeys = await tx.accessKey.deleteMany({
|
||||
where: { sessionId }
|
||||
});
|
||||
log({
|
||||
module: 'session-delete',
|
||||
userId: ctx.uid,
|
||||
sessionId,
|
||||
deletedCount: deletedAccessKeys.count
|
||||
}, `Deleted ${deletedAccessKeys.count} access keys`);
|
||||
|
||||
// 4. Delete the session itself
|
||||
await tx.session.delete({
|
||||
where: { id: sessionId }
|
||||
});
|
||||
log({
|
||||
module: 'session-delete',
|
||||
userId: ctx.uid,
|
||||
sessionId
|
||||
}, `Session deleted successfully`);
|
||||
|
||||
// Send notification after transaction commits
|
||||
afterTx(tx, async () => {
|
||||
const updSeq = await allocateUserSeq(ctx.uid);
|
||||
const updatePayload = buildDeleteSessionUpdate(sessionId, updSeq, randomKeyNaked(12));
|
||||
|
||||
log({
|
||||
module: 'session-delete',
|
||||
userId: ctx.uid,
|
||||
sessionId,
|
||||
updateType: 'delete-session',
|
||||
updatePayload: JSON.stringify(updatePayload)
|
||||
}, `Emitting delete-session update to all user connections`);
|
||||
|
||||
eventRouter.emitUpdate({
|
||||
userId: ctx.uid,
|
||||
payload: updatePayload,
|
||||
recipientFilter: { type: 'all-user-authenticated-connections' }
|
||||
});
|
||||
});
|
||||
|
||||
return true;
|
||||
});
|
||||
}
|
@ -1,10 +1,18 @@
|
||||
import { Context } from "@/context";
|
||||
import { buildUserProfile, UserProfile } from "./type";
|
||||
import { db } from "@/storage/db";
|
||||
import { inTx } from "@/storage/inTx";
|
||||
import { RelationshipStatus } from "@prisma/client";
|
||||
import { relationshipSet } from "./relationshipSet";
|
||||
import { relationshipGet } from "./relationshipGet";
|
||||
import { sendFriendRequestNotification, sendFriendshipEstablishedNotification } from "./friendNotification";
|
||||
|
||||
/**
|
||||
* Add a friend or accept a friend request.
|
||||
* Handles:
|
||||
* - Accepting incoming friend requests (both users become friends)
|
||||
* - Sending new friend requests
|
||||
* - Sending appropriate notifications with 24-hour cooldown
|
||||
*/
|
||||
export async function friendAdd(ctx: Context, uid: string): Promise<UserProfile | null> {
|
||||
// Prevent self-friendship
|
||||
if (ctx.uid === uid) {
|
||||
@ -12,7 +20,7 @@ export async function friendAdd(ctx: Context, uid: string): Promise<UserProfile
|
||||
}
|
||||
|
||||
// Update relationship status
|
||||
return await db.$transaction(async (tx) => {
|
||||
return await inTx(async (tx) => {
|
||||
|
||||
// Read current user objects
|
||||
const currentUser = await tx.account.findUnique({
|
||||
@ -40,6 +48,9 @@ export async function friendAdd(ctx: Context, uid: string): Promise<UserProfile
|
||||
await relationshipSet(tx, targetUser.id, currentUser.id, RelationshipStatus.friend);
|
||||
await relationshipSet(tx, currentUser.id, targetUser.id, RelationshipStatus.friend);
|
||||
|
||||
// Send friendship established notifications to both users
|
||||
await sendFriendshipEstablishedNotification(tx, currentUser.id, targetUser.id);
|
||||
|
||||
// Return the target user profile
|
||||
return buildUserProfile(targetUser, RelationshipStatus.friend);
|
||||
}
|
||||
@ -54,6 +65,9 @@ export async function friendAdd(ctx: Context, uid: string): Promise<UserProfile
|
||||
await relationshipSet(tx, targetUser.id, currentUser.id, RelationshipStatus.pending);
|
||||
}
|
||||
|
||||
// Send friend request notification to the receiver
|
||||
await sendFriendRequestNotification(tx, targetUser.id, currentUser.id);
|
||||
|
||||
// Return the target user profile
|
||||
return buildUserProfile(targetUser, RelationshipStatus.requested);
|
||||
}
|
||||
|
61
sources/app/social/friendNotification.spec.ts
Normal file
61
sources/app/social/friendNotification.spec.ts
Normal file
@ -0,0 +1,61 @@
|
||||
import { describe, it, expect, vi } from "vitest";
|
||||
import { RelationshipStatus } from "@prisma/client";
|
||||
|
||||
// Mock the dependencies that require environment variables
|
||||
vi.mock("@/storage/files", () => ({
|
||||
getPublicUrl: vi.fn((path: string) => `https://example.com/${path}`)
|
||||
}));
|
||||
|
||||
vi.mock("@/app/feed/feedPost", () => ({
|
||||
feedPost: vi.fn()
|
||||
}));
|
||||
|
||||
vi.mock("@/storage/inTx", () => ({
|
||||
afterTx: vi.fn()
|
||||
}));
|
||||
|
||||
// Import after mocking
|
||||
import { shouldSendNotification } from "./friendNotification";
|
||||
|
||||
describe("friendNotification", () => {
|
||||
describe("shouldSendNotification", () => {
|
||||
it("should return true when lastNotifiedAt is null", () => {
|
||||
const result = shouldSendNotification(null, RelationshipStatus.pending);
|
||||
expect(result).toBe(true);
|
||||
});
|
||||
|
||||
it("should return false for rejected relationships", () => {
|
||||
const result = shouldSendNotification(null, RelationshipStatus.rejected);
|
||||
expect(result).toBe(false);
|
||||
});
|
||||
|
||||
it("should return false for rejected relationships even if 24 hours passed", () => {
|
||||
const twentyFiveHoursAgo = new Date(Date.now() - 25 * 60 * 60 * 1000);
|
||||
const result = shouldSendNotification(twentyFiveHoursAgo, RelationshipStatus.rejected);
|
||||
expect(result).toBe(false);
|
||||
});
|
||||
|
||||
it("should return true when 24 hours have passed since last notification", () => {
|
||||
const twentyFiveHoursAgo = new Date(Date.now() - 25 * 60 * 60 * 1000);
|
||||
const result = shouldSendNotification(twentyFiveHoursAgo, RelationshipStatus.pending);
|
||||
expect(result).toBe(true);
|
||||
});
|
||||
|
||||
it("should return false when less than 24 hours have passed", () => {
|
||||
const tenHoursAgo = new Date(Date.now() - 10 * 60 * 60 * 1000);
|
||||
const result = shouldSendNotification(tenHoursAgo, RelationshipStatus.pending);
|
||||
expect(result).toBe(false);
|
||||
});
|
||||
|
||||
it("should work for friend status", () => {
|
||||
const twentyFiveHoursAgo = new Date(Date.now() - 25 * 60 * 60 * 1000);
|
||||
const result = shouldSendNotification(twentyFiveHoursAgo, RelationshipStatus.friend);
|
||||
expect(result).toBe(true);
|
||||
});
|
||||
|
||||
it("should work for requested status", () => {
|
||||
const result = shouldSendNotification(null, RelationshipStatus.requested);
|
||||
expect(result).toBe(true);
|
||||
});
|
||||
});
|
||||
});
|
170
sources/app/social/friendNotification.ts
Normal file
170
sources/app/social/friendNotification.ts
Normal file
@ -0,0 +1,170 @@
|
||||
import { Prisma, RelationshipStatus } from "@prisma/client";
|
||||
import { feedPost } from "@/app/feed/feedPost";
|
||||
import { Context } from "@/context";
|
||||
import { afterTx } from "@/storage/inTx";
|
||||
|
||||
/**
|
||||
* Check if a notification should be sent based on the last notification time and relationship status.
|
||||
* Returns true if:
|
||||
* - No previous notification was sent (lastNotifiedAt is null)
|
||||
* - OR 24 hours have passed since the last notification
|
||||
* - AND the relationship is not rejected
|
||||
*/
|
||||
export function shouldSendNotification(
|
||||
lastNotifiedAt: Date | null,
|
||||
status: RelationshipStatus
|
||||
): boolean {
|
||||
// Don't send notifications for rejected relationships
|
||||
if (status === RelationshipStatus.rejected) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If never notified, send notification
|
||||
if (!lastNotifiedAt) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check if 24 hours have passed since last notification
|
||||
const twentyFourHoursAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
|
||||
return lastNotifiedAt < twentyFourHoursAgo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a friend request notification to the receiver and update lastNotifiedAt.
|
||||
* This creates a feed item for the receiver about the incoming friend request.
|
||||
*/
|
||||
export async function sendFriendRequestNotification(
|
||||
tx: Prisma.TransactionClient,
|
||||
receiverUserId: string,
|
||||
senderUserId: string
|
||||
): Promise<void> {
|
||||
// Check if we should send notification to receiver
|
||||
const receiverRelationship = await tx.userRelationship.findUnique({
|
||||
where: {
|
||||
fromUserId_toUserId: {
|
||||
fromUserId: receiverUserId,
|
||||
toUserId: senderUserId
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (!receiverRelationship || !shouldSendNotification(
|
||||
receiverRelationship.lastNotifiedAt,
|
||||
receiverRelationship.status
|
||||
)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Create feed notification for receiver
|
||||
const receiverCtx = Context.create(receiverUserId);
|
||||
await feedPost(
|
||||
tx,
|
||||
receiverCtx,
|
||||
{
|
||||
kind: 'friend_request',
|
||||
uid: senderUserId
|
||||
},
|
||||
`friend_request_${senderUserId}` // repeatKey to avoid duplicates
|
||||
);
|
||||
|
||||
// Update lastNotifiedAt for the receiver's relationship record
|
||||
await tx.userRelationship.update({
|
||||
where: {
|
||||
fromUserId_toUserId: {
|
||||
fromUserId: receiverUserId,
|
||||
toUserId: senderUserId
|
||||
}
|
||||
},
|
||||
data: {
|
||||
lastNotifiedAt: new Date()
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Send friendship established notifications to both users and update lastNotifiedAt.
|
||||
* This creates feed items for both users about the new friendship.
|
||||
*/
|
||||
export async function sendFriendshipEstablishedNotification(
|
||||
tx: Prisma.TransactionClient,
|
||||
user1Id: string,
|
||||
user2Id: string
|
||||
): Promise<void> {
|
||||
// Check and send notification to user1
|
||||
const user1Relationship = await tx.userRelationship.findUnique({
|
||||
where: {
|
||||
fromUserId_toUserId: {
|
||||
fromUserId: user1Id,
|
||||
toUserId: user2Id
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (user1Relationship && shouldSendNotification(
|
||||
user1Relationship.lastNotifiedAt,
|
||||
user1Relationship.status
|
||||
)) {
|
||||
const user1Ctx = Context.create(user1Id);
|
||||
await feedPost(
|
||||
tx,
|
||||
user1Ctx,
|
||||
{
|
||||
kind: 'friend_accepted',
|
||||
uid: user2Id
|
||||
},
|
||||
`friend_accepted_${user2Id}` // repeatKey to avoid duplicates
|
||||
);
|
||||
|
||||
// Update lastNotifiedAt for user1
|
||||
await tx.userRelationship.update({
|
||||
where: {
|
||||
fromUserId_toUserId: {
|
||||
fromUserId: user1Id,
|
||||
toUserId: user2Id
|
||||
}
|
||||
},
|
||||
data: {
|
||||
lastNotifiedAt: new Date()
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Check and send notification to user2
|
||||
const user2Relationship = await tx.userRelationship.findUnique({
|
||||
where: {
|
||||
fromUserId_toUserId: {
|
||||
fromUserId: user2Id,
|
||||
toUserId: user1Id
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (user2Relationship && shouldSendNotification(
|
||||
user2Relationship.lastNotifiedAt,
|
||||
user2Relationship.status
|
||||
)) {
|
||||
const user2Ctx = Context.create(user2Id);
|
||||
await feedPost(
|
||||
tx,
|
||||
user2Ctx,
|
||||
{
|
||||
kind: 'friend_accepted',
|
||||
uid: user1Id
|
||||
},
|
||||
`friend_accepted_${user1Id}` // repeatKey to avoid duplicates
|
||||
);
|
||||
|
||||
// Update lastNotifiedAt for user2
|
||||
await tx.userRelationship.update({
|
||||
where: {
|
||||
fromUserId_toUserId: {
|
||||
fromUserId: user2Id,
|
||||
toUserId: user1Id
|
||||
}
|
||||
},
|
||||
data: {
|
||||
lastNotifiedAt: new Date()
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
@ -1,12 +1,12 @@
|
||||
import { Context } from "@/context";
|
||||
import { buildUserProfile, UserProfile } from "./type";
|
||||
import { db } from "@/storage/db";
|
||||
import { inTx } from "@/storage/inTx";
|
||||
import { RelationshipStatus } from "@prisma/client";
|
||||
import { relationshipSet } from "./relationshipSet";
|
||||
import { relationshipGet } from "./relationshipGet";
|
||||
|
||||
export async function friendRemove(ctx: Context, uid: string): Promise<UserProfile | null> {
|
||||
return await db.$transaction(async (tx) => {
|
||||
return await inTx(async (tx) => {
|
||||
|
||||
// Read current user objects
|
||||
const currentUser = await tx.account.findUnique({
|
||||
|
@ -1,7 +1,17 @@
|
||||
import { Prisma } from "@prisma/client";
|
||||
import { RelationshipStatus } from "@prisma/client";
|
||||
|
||||
export async function relationshipSet(tx: Prisma.TransactionClient, from: string, to: string, status: RelationshipStatus) {
|
||||
export async function relationshipSet(tx: Prisma.TransactionClient, from: string, to: string, status: RelationshipStatus, lastNotifiedAt?: Date) {
|
||||
// Get existing relationship to preserve lastNotifiedAt
|
||||
const existing = await tx.userRelationship.findUnique({
|
||||
where: {
|
||||
fromUserId_toUserId: {
|
||||
fromUserId: from,
|
||||
toUserId: to
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (status === RelationshipStatus.friend) {
|
||||
await tx.userRelationship.upsert({
|
||||
where: {
|
||||
@ -14,11 +24,14 @@ export async function relationshipSet(tx: Prisma.TransactionClient, from: string
|
||||
fromUserId: from,
|
||||
toUserId: to,
|
||||
status,
|
||||
acceptedAt: new Date()
|
||||
acceptedAt: new Date(),
|
||||
lastNotifiedAt: lastNotifiedAt || null
|
||||
},
|
||||
update: {
|
||||
status,
|
||||
acceptedAt: new Date()
|
||||
acceptedAt: new Date(),
|
||||
// Preserve existing lastNotifiedAt, only update if explicitly provided
|
||||
lastNotifiedAt: lastNotifiedAt || existing?.lastNotifiedAt || undefined
|
||||
}
|
||||
});
|
||||
} else {
|
||||
@ -33,11 +46,14 @@ export async function relationshipSet(tx: Prisma.TransactionClient, from: string
|
||||
fromUserId: from,
|
||||
toUserId: to,
|
||||
status,
|
||||
acceptedAt: null
|
||||
acceptedAt: null,
|
||||
lastNotifiedAt: lastNotifiedAt || null
|
||||
},
|
||||
update: {
|
||||
status,
|
||||
acceptedAt: null
|
||||
acceptedAt: null,
|
||||
// Preserve existing lastNotifiedAt, only update if explicitly provided
|
||||
lastNotifiedAt: lastNotifiedAt || existing?.lastNotifiedAt || undefined
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -11,6 +11,4 @@ export class Context {
|
||||
private constructor(uid: string) {
|
||||
this.uid = uid;
|
||||
}
|
||||
}
|
||||
|
||||
export type Tx = Prisma.TransactionClient | PrismaClient;
|
||||
}
|
Loading…
Reference in New Issue
Block a user