Compare commits
10 Commits
595e23967a
...
b594832c30
Author | SHA1 | Date | |
---|---|---|---|
b594832c30 | |||
26d42b4cf1 | |||
|
5ba780c8a2 | ||
|
b2c72cc485 | ||
|
aeba1b6f1c | ||
|
014473a7ac | ||
|
cfd7a7b783 | ||
|
9822512aeb | ||
|
c534331ce5 | ||
|
0ce1bb4c9a |
43
.github/workflows/build-main-docker.yml
vendored
Normal file
43
.github/workflows/build-main-docker.yml
vendored
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
name: Build Develop Docker to Docker Hub
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- main
|
||||||
|
|
||||||
|
env:
|
||||||
|
REGISTRY: docker.io
|
||||||
|
IMAGE_NAME: ${{ secrets.DOCKERHUB_USERNAME }}/${{ github.event.repository.name }}
|
||||||
|
USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||||
|
TOKEN: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
docker:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
-
|
||||||
|
name: Checkout
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
-
|
||||||
|
name: Login to ${{ env.REGISTRY }}
|
||||||
|
if: github.event_name != 'pull_request'
|
||||||
|
uses: docker/login-action@v3
|
||||||
|
with:
|
||||||
|
registry: ${{ env.REGISTRY }}
|
||||||
|
username: ${{ env.USERNAME }}
|
||||||
|
password: ${{ env.TOKEN }}
|
||||||
|
-
|
||||||
|
name: Set up QEMU
|
||||||
|
uses: docker/setup-qemu-action@v3
|
||||||
|
-
|
||||||
|
name: Set up Docker Buildx
|
||||||
|
uses: docker/setup-buildx-action@v3
|
||||||
|
-
|
||||||
|
name: Build and push
|
||||||
|
uses: docker/build-push-action@v6
|
||||||
|
with:
|
||||||
|
context: .
|
||||||
|
push: ${{ github.event_name != 'pull_request' }}
|
||||||
|
build-args: |
|
||||||
|
PUSH_RECENT=1
|
||||||
|
tags: |
|
||||||
|
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
|
@ -160,6 +160,7 @@ The project has pending Prisma migrations that need to be applied:
|
|||||||
- Routes are in `/sources/apps/api/routes`
|
- Routes are in `/sources/apps/api/routes`
|
||||||
- Use Fastify with Zod for type-safe route definitions
|
- Use Fastify with Zod for type-safe route definitions
|
||||||
- Always validate inputs using Zod
|
- Always validate inputs using Zod
|
||||||
|
- **Idempotency**: Design all operations to be idempotent - clients may retry requests automatically and the backend must handle multiple invocations of the same operation gracefully, producing the same result as a single invocation
|
||||||
|
|
||||||
## Docker Deployment
|
## Docker Deployment
|
||||||
|
|
||||||
@ -276,4 +277,5 @@ tail -500 .logs/*.log | grep "applySessions.*active" | tail -10
|
|||||||
- Do not add logging when not asked
|
- Do not add logging when not asked
|
||||||
- do not run non-transactional things (like uploadign files) in transactions
|
- do not run non-transactional things (like uploadign files) in transactions
|
||||||
- After writing an action - add a documentation comment that explains logic, also keep it in sync.
|
- After writing an action - add a documentation comment that explains logic, also keep it in sync.
|
||||||
- always use github usernames
|
- always use github usernames
|
||||||
|
- Always use privacyKit.decodeBase64 and privacyKit.encodeBase64 from privacy-kit instead of using buffer
|
@ -37,9 +37,10 @@ COPY --from=builder /app/tsconfig.json ./tsconfig.json
|
|||||||
COPY --from=builder /app/package.json ./package.json
|
COPY --from=builder /app/package.json ./package.json
|
||||||
COPY --from=builder /app/node_modules ./node_modules
|
COPY --from=builder /app/node_modules ./node_modules
|
||||||
COPY --from=builder /app/sources ./sources
|
COPY --from=builder /app/sources ./sources
|
||||||
|
COPY --from=builder /app/prisma ./prisma
|
||||||
|
|
||||||
# Expose the port the app will run on
|
# Expose the port the app will run on
|
||||||
EXPOSE 3000
|
EXPOSE 3000
|
||||||
|
|
||||||
# Command to run the application
|
# Command to run the application
|
||||||
CMD ["yarn", "start"]
|
CMD ["yarn", "start"]
|
||||||
|
27
prisma/migrations/20250920213557_add_user_feed/migration.sql
Normal file
27
prisma/migrations/20250920213557_add_user_feed/migration.sql
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
-- AlterTable
|
||||||
|
ALTER TABLE "Account" ADD COLUMN "feedSeq" BIGINT NOT NULL DEFAULT 0;
|
||||||
|
|
||||||
|
-- CreateTable
|
||||||
|
CREATE TABLE "UserFeedItem" (
|
||||||
|
"id" TEXT NOT NULL,
|
||||||
|
"userId" TEXT NOT NULL,
|
||||||
|
"counter" BIGINT NOT NULL,
|
||||||
|
"repeatKey" TEXT,
|
||||||
|
"body" JSONB NOT NULL,
|
||||||
|
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
"updatedAt" TIMESTAMP(3) NOT NULL,
|
||||||
|
|
||||||
|
CONSTRAINT "UserFeedItem_pkey" PRIMARY KEY ("id")
|
||||||
|
);
|
||||||
|
|
||||||
|
-- CreateIndex
|
||||||
|
CREATE INDEX "UserFeedItem_userId_counter_idx" ON "UserFeedItem"("userId", "counter" DESC);
|
||||||
|
|
||||||
|
-- CreateIndex
|
||||||
|
CREATE UNIQUE INDEX "UserFeedItem_userId_counter_key" ON "UserFeedItem"("userId", "counter");
|
||||||
|
|
||||||
|
-- CreateIndex
|
||||||
|
CREATE UNIQUE INDEX "UserFeedItem_userId_repeatKey_key" ON "UserFeedItem"("userId", "repeatKey");
|
||||||
|
|
||||||
|
-- AddForeignKey
|
||||||
|
ALTER TABLE "UserFeedItem" ADD CONSTRAINT "UserFeedItem_userId_fkey" FOREIGN KEY ("userId") REFERENCES "Account"("id") ON DELETE CASCADE ON UPDATE CASCADE;
|
@ -0,0 +1,2 @@
|
|||||||
|
-- AlterTable
|
||||||
|
ALTER TABLE "UserRelationship" ADD COLUMN "lastNotifiedAt" TIMESTAMP(3);
|
21
prisma/migrations/20250922000310_add_user_kv/migration.sql
Normal file
21
prisma/migrations/20250922000310_add_user_kv/migration.sql
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
-- CreateTable
|
||||||
|
CREATE TABLE "UserKVStore" (
|
||||||
|
"id" TEXT NOT NULL,
|
||||||
|
"accountId" TEXT NOT NULL,
|
||||||
|
"key" TEXT NOT NULL,
|
||||||
|
"value" BYTEA,
|
||||||
|
"version" INTEGER NOT NULL DEFAULT 0,
|
||||||
|
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
"updatedAt" TIMESTAMP(3) NOT NULL,
|
||||||
|
|
||||||
|
CONSTRAINT "UserKVStore_pkey" PRIMARY KEY ("id")
|
||||||
|
);
|
||||||
|
|
||||||
|
-- CreateIndex
|
||||||
|
CREATE INDEX "UserKVStore_accountId_idx" ON "UserKVStore"("accountId");
|
||||||
|
|
||||||
|
-- CreateIndex
|
||||||
|
CREATE UNIQUE INDEX "UserKVStore_accountId_key_key" ON "UserKVStore"("accountId", "key");
|
||||||
|
|
||||||
|
-- AddForeignKey
|
||||||
|
ALTER TABLE "UserKVStore" ADD CONSTRAINT "UserKVStore_accountId_fkey" FOREIGN KEY ("accountId") REFERENCES "Account"("id") ON DELETE CASCADE ON UPDATE CASCADE;
|
@ -23,6 +23,7 @@ model Account {
|
|||||||
id String @id @default(cuid())
|
id String @id @default(cuid())
|
||||||
publicKey String @unique
|
publicKey String @unique
|
||||||
seq Int @default(0)
|
seq Int @default(0)
|
||||||
|
feedSeq BigInt @default(0)
|
||||||
createdAt DateTime @default(now())
|
createdAt DateTime @default(now())
|
||||||
updatedAt DateTime @updatedAt
|
updatedAt DateTime @updatedAt
|
||||||
settings String?
|
settings String?
|
||||||
@ -49,6 +50,8 @@ model Account {
|
|||||||
RelationshipsTo UserRelationship[] @relation("RelationshipsTo")
|
RelationshipsTo UserRelationship[] @relation("RelationshipsTo")
|
||||||
Artifact Artifact[]
|
Artifact Artifact[]
|
||||||
AccessKey AccessKey[]
|
AccessKey AccessKey[]
|
||||||
|
UserFeedItem UserFeedItem[]
|
||||||
|
UserKVStore UserKVStore[]
|
||||||
}
|
}
|
||||||
|
|
||||||
model TerminalAuthRequest {
|
model TerminalAuthRequest {
|
||||||
@ -306,16 +309,55 @@ enum RelationshipStatus {
|
|||||||
}
|
}
|
||||||
|
|
||||||
model UserRelationship {
|
model UserRelationship {
|
||||||
fromUserId String
|
fromUserId String
|
||||||
fromUser Account @relation("RelationshipsFrom", fields: [fromUserId], references: [id], onDelete: Cascade)
|
fromUser Account @relation("RelationshipsFrom", fields: [fromUserId], references: [id], onDelete: Cascade)
|
||||||
toUserId String
|
toUserId String
|
||||||
toUser Account @relation("RelationshipsTo", fields: [toUserId], references: [id], onDelete: Cascade)
|
toUser Account @relation("RelationshipsTo", fields: [toUserId], references: [id], onDelete: Cascade)
|
||||||
status RelationshipStatus @default(pending)
|
status RelationshipStatus @default(pending)
|
||||||
createdAt DateTime @default(now())
|
createdAt DateTime @default(now())
|
||||||
updatedAt DateTime @updatedAt
|
updatedAt DateTime @updatedAt
|
||||||
acceptedAt DateTime?
|
acceptedAt DateTime?
|
||||||
|
lastNotifiedAt DateTime?
|
||||||
|
|
||||||
@@id([fromUserId, toUserId])
|
@@id([fromUserId, toUserId])
|
||||||
@@index([toUserId, status])
|
@@index([toUserId, status])
|
||||||
@@index([fromUserId, status])
|
@@index([fromUserId, status])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Feed
|
||||||
|
//
|
||||||
|
|
||||||
|
model UserFeedItem {
|
||||||
|
id String @id @default(cuid())
|
||||||
|
userId String
|
||||||
|
user Account @relation(fields: [userId], references: [id], onDelete: Cascade)
|
||||||
|
counter BigInt
|
||||||
|
repeatKey String?
|
||||||
|
/// [FeedBody]
|
||||||
|
body Json
|
||||||
|
createdAt DateTime @default(now())
|
||||||
|
updatedAt DateTime @updatedAt
|
||||||
|
|
||||||
|
@@unique([userId, counter])
|
||||||
|
@@unique([userId, repeatKey])
|
||||||
|
@@index([userId, counter(sort: Desc)])
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Key-Value Storage
|
||||||
|
//
|
||||||
|
|
||||||
|
model UserKVStore {
|
||||||
|
id String @id @default(cuid())
|
||||||
|
accountId String
|
||||||
|
account Account @relation(fields: [accountId], references: [id], onDelete: Cascade)
|
||||||
|
key String // Unencrypted for indexing
|
||||||
|
value Bytes? // Encrypted value, null when "deleted"
|
||||||
|
version Int @default(0)
|
||||||
|
createdAt DateTime @default(now())
|
||||||
|
updatedAt DateTime @updatedAt
|
||||||
|
|
||||||
|
@@unique([accountId, key])
|
||||||
|
@@index([accountId])
|
||||||
|
}
|
||||||
|
@ -19,6 +19,8 @@ import { enableMonitoring } from "./utils/enableMonitoring";
|
|||||||
import { enableErrorHandlers } from "./utils/enableErrorHandlers";
|
import { enableErrorHandlers } from "./utils/enableErrorHandlers";
|
||||||
import { enableAuthentication } from "./utils/enableAuthentication";
|
import { enableAuthentication } from "./utils/enableAuthentication";
|
||||||
import { userRoutes } from "./routes/userRoutes";
|
import { userRoutes } from "./routes/userRoutes";
|
||||||
|
import { feedRoutes } from "./routes/feedRoutes";
|
||||||
|
import { kvRoutes } from "./routes/kvRoutes";
|
||||||
|
|
||||||
export async function startApi() {
|
export async function startApi() {
|
||||||
|
|
||||||
@ -62,6 +64,8 @@ export async function startApi() {
|
|||||||
versionRoutes(typed);
|
versionRoutes(typed);
|
||||||
voiceRoutes(typed);
|
voiceRoutes(typed);
|
||||||
userRoutes(typed);
|
userRoutes(typed);
|
||||||
|
feedRoutes(typed);
|
||||||
|
kvRoutes(typed);
|
||||||
|
|
||||||
// Start HTTP
|
// Start HTTP
|
||||||
const port = process.env.PORT ? parseInt(process.env.PORT, 10) : 3005;
|
const port = process.env.PORT ? parseInt(process.env.PORT, 10) : 3005;
|
||||||
|
40
sources/app/api/routes/feedRoutes.ts
Normal file
40
sources/app/api/routes/feedRoutes.ts
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
import { z } from "zod";
|
||||||
|
import { Fastify } from "../types";
|
||||||
|
import { FeedBodySchema } from "@/app/feed/types";
|
||||||
|
import { feedGet } from "@/app/feed/feedGet";
|
||||||
|
import { Context } from "@/context";
|
||||||
|
import { db } from "@/storage/db";
|
||||||
|
|
||||||
|
export function feedRoutes(app: Fastify) {
|
||||||
|
app.get('/v1/feed', {
|
||||||
|
preHandler: app.authenticate,
|
||||||
|
schema: {
|
||||||
|
querystring: z.object({
|
||||||
|
before: z.string().optional(),
|
||||||
|
after: z.string().optional(),
|
||||||
|
limit: z.coerce.number().int().min(1).max(200).default(50)
|
||||||
|
}).optional(),
|
||||||
|
response: {
|
||||||
|
200: z.object({
|
||||||
|
items: z.array(z.object({
|
||||||
|
id: z.string(),
|
||||||
|
body: FeedBodySchema,
|
||||||
|
repeatKey: z.string().nullable(),
|
||||||
|
cursor: z.string(),
|
||||||
|
createdAt: z.number()
|
||||||
|
})),
|
||||||
|
hasMore: z.boolean()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, async (request, reply) => {
|
||||||
|
const items = await feedGet(db, Context.create(request.userId), {
|
||||||
|
cursor: {
|
||||||
|
before: request.query?.before,
|
||||||
|
after: request.query?.after
|
||||||
|
},
|
||||||
|
limit: request.query?.limit
|
||||||
|
});
|
||||||
|
return reply.send({ items: items.items, hasMore: items.hasMore });
|
||||||
|
});
|
||||||
|
}
|
172
sources/app/api/routes/kvRoutes.ts
Normal file
172
sources/app/api/routes/kvRoutes.ts
Normal file
@ -0,0 +1,172 @@
|
|||||||
|
import { z } from "zod";
|
||||||
|
import { Fastify } from "../types";
|
||||||
|
import { kvGet } from "@/app/kv/kvGet";
|
||||||
|
import { kvList } from "@/app/kv/kvList";
|
||||||
|
import { kvBulkGet } from "@/app/kv/kvBulkGet";
|
||||||
|
import { kvMutate } from "@/app/kv/kvMutate";
|
||||||
|
import { log } from "@/utils/log";
|
||||||
|
|
||||||
|
export function kvRoutes(app: Fastify) {
|
||||||
|
// GET /v1/kv/:key - Get single value
|
||||||
|
app.get('/v1/kv/:key', {
|
||||||
|
preHandler: app.authenticate,
|
||||||
|
schema: {
|
||||||
|
params: z.object({
|
||||||
|
key: z.string()
|
||||||
|
}),
|
||||||
|
response: {
|
||||||
|
200: z.object({
|
||||||
|
key: z.string(),
|
||||||
|
value: z.string(),
|
||||||
|
version: z.number()
|
||||||
|
}).nullable(),
|
||||||
|
404: z.object({
|
||||||
|
error: z.literal('Key not found')
|
||||||
|
}),
|
||||||
|
500: z.object({
|
||||||
|
error: z.literal('Failed to get value')
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, async (request, reply) => {
|
||||||
|
const userId = request.userId;
|
||||||
|
const { key } = request.params;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const result = await kvGet({ uid: userId }, key);
|
||||||
|
|
||||||
|
if (!result) {
|
||||||
|
return reply.code(404).send({ error: 'Key not found' });
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.send(result);
|
||||||
|
} catch (error) {
|
||||||
|
log({ module: 'api', level: 'error' }, `Failed to get KV value: ${error}`);
|
||||||
|
return reply.code(500).send({ error: 'Failed to get value' });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// GET /v1/kv - List key-value pairs with optional prefix filter
|
||||||
|
app.get('/v1/kv', {
|
||||||
|
preHandler: app.authenticate,
|
||||||
|
schema: {
|
||||||
|
querystring: z.object({
|
||||||
|
prefix: z.string().optional(),
|
||||||
|
limit: z.coerce.number().int().min(1).max(1000).default(100)
|
||||||
|
}),
|
||||||
|
response: {
|
||||||
|
200: z.object({
|
||||||
|
items: z.array(z.object({
|
||||||
|
key: z.string(),
|
||||||
|
value: z.string(),
|
||||||
|
version: z.number()
|
||||||
|
}))
|
||||||
|
}),
|
||||||
|
500: z.object({
|
||||||
|
error: z.literal('Failed to list items')
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, async (request, reply) => {
|
||||||
|
const userId = request.userId;
|
||||||
|
const { prefix, limit } = request.query;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const result = await kvList({ uid: userId }, { prefix, limit });
|
||||||
|
return reply.send(result);
|
||||||
|
} catch (error) {
|
||||||
|
log({ module: 'api', level: 'error' }, `Failed to list KV items: ${error}`);
|
||||||
|
return reply.code(500).send({ error: 'Failed to list items' });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// POST /v1/kv/bulk - Bulk get values
|
||||||
|
app.post('/v1/kv/bulk', {
|
||||||
|
preHandler: app.authenticate,
|
||||||
|
schema: {
|
||||||
|
body: z.object({
|
||||||
|
keys: z.array(z.string()).min(1).max(100)
|
||||||
|
}),
|
||||||
|
response: {
|
||||||
|
200: z.object({
|
||||||
|
values: z.array(z.object({
|
||||||
|
key: z.string(),
|
||||||
|
value: z.string(),
|
||||||
|
version: z.number()
|
||||||
|
}))
|
||||||
|
}),
|
||||||
|
500: z.object({
|
||||||
|
error: z.literal('Failed to get values')
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, async (request, reply) => {
|
||||||
|
const userId = request.userId;
|
||||||
|
const { keys } = request.body;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const result = await kvBulkGet({ uid: userId }, keys);
|
||||||
|
return reply.send(result);
|
||||||
|
} catch (error) {
|
||||||
|
log({ module: 'api', level: 'error' }, `Failed to bulk get KV values: ${error}`);
|
||||||
|
return reply.code(500).send({ error: 'Failed to get values' });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// PUT /v1/kv - Atomic batch mutation
|
||||||
|
app.post('/v1/kv', {
|
||||||
|
preHandler: app.authenticate,
|
||||||
|
schema: {
|
||||||
|
body: z.object({
|
||||||
|
mutations: z.array(z.object({
|
||||||
|
key: z.string(),
|
||||||
|
value: z.string().nullable(),
|
||||||
|
version: z.number() // Always required, use -1 for new keys
|
||||||
|
})).min(1).max(100)
|
||||||
|
}),
|
||||||
|
response: {
|
||||||
|
200: z.object({
|
||||||
|
success: z.literal(true),
|
||||||
|
results: z.array(z.object({
|
||||||
|
key: z.string(),
|
||||||
|
version: z.number()
|
||||||
|
}))
|
||||||
|
}),
|
||||||
|
409: z.object({
|
||||||
|
success: z.literal(false),
|
||||||
|
errors: z.array(z.object({
|
||||||
|
key: z.string(),
|
||||||
|
error: z.literal('version-mismatch'),
|
||||||
|
version: z.number(),
|
||||||
|
value: z.string().nullable()
|
||||||
|
}))
|
||||||
|
}),
|
||||||
|
500: z.object({
|
||||||
|
error: z.literal('Failed to mutate values')
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, async (request, reply) => {
|
||||||
|
const userId = request.userId;
|
||||||
|
const { mutations } = request.body;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const result = await kvMutate({ uid: userId }, mutations);
|
||||||
|
|
||||||
|
if (!result.success) {
|
||||||
|
return reply.code(409).send({
|
||||||
|
success: false as const,
|
||||||
|
errors: result.errors!
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.send({
|
||||||
|
success: true as const,
|
||||||
|
results: result.results!
|
||||||
|
});
|
||||||
|
} catch (error) {
|
||||||
|
log({ module: 'api', level: 'error' }, `Failed to mutate KV values: ${error}`);
|
||||||
|
return reply.code(500).send({ error: 'Failed to mutate values' });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
@ -6,6 +6,7 @@ import { Prisma } from "@prisma/client";
|
|||||||
import { log } from "@/utils/log";
|
import { log } from "@/utils/log";
|
||||||
import { randomKeyNaked } from "@/utils/randomKeyNaked";
|
import { randomKeyNaked } from "@/utils/randomKeyNaked";
|
||||||
import { allocateUserSeq } from "@/storage/seq";
|
import { allocateUserSeq } from "@/storage/seq";
|
||||||
|
import { sessionDelete } from "@/app/session/sessionDelete";
|
||||||
|
|
||||||
export function sessionRoutes(app: Fastify) {
|
export function sessionRoutes(app: Fastify) {
|
||||||
|
|
||||||
@ -352,4 +353,25 @@ export function sessionRoutes(app: Fastify) {
|
|||||||
}))
|
}))
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Delete session
|
||||||
|
app.delete('/v1/sessions/:sessionId', {
|
||||||
|
schema: {
|
||||||
|
params: z.object({
|
||||||
|
sessionId: z.string()
|
||||||
|
})
|
||||||
|
},
|
||||||
|
preHandler: app.authenticate
|
||||||
|
}, async (request, reply) => {
|
||||||
|
const userId = request.userId;
|
||||||
|
const { sessionId } = request.params;
|
||||||
|
|
||||||
|
const deleted = await sessionDelete({ uid: userId }, sessionId);
|
||||||
|
|
||||||
|
if (!deleted) {
|
||||||
|
return reply.code(404).send({ error: 'Session not found or not owned by user' });
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.send({ success: true });
|
||||||
|
});
|
||||||
}
|
}
|
@ -17,15 +17,15 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
|||||||
// Check if method was already registered
|
// Check if method was already registered
|
||||||
const previousSocket = rpcListeners.get(method);
|
const previousSocket = rpcListeners.get(method);
|
||||||
if (previousSocket && previousSocket !== socket) {
|
if (previousSocket && previousSocket !== socket) {
|
||||||
log({ module: 'websocket-rpc' }, `RPC method ${method} re-registered: ${previousSocket.id} -> ${socket.id}`);
|
// log({ module: 'websocket-rpc' }, `RPC method ${method} re-registered: ${previousSocket.id} -> ${socket.id}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register this socket as the listener for this method
|
// Register this socket as the listener for this method
|
||||||
rpcListeners.set(method, socket);
|
rpcListeners.set(method, socket);
|
||||||
|
|
||||||
socket.emit('rpc-registered', { method });
|
socket.emit('rpc-registered', { method });
|
||||||
log({ module: 'websocket-rpc' }, `RPC method registered: ${method} on socket ${socket.id} (user: ${userId})`);
|
// log({ module: 'websocket-rpc' }, `RPC method registered: ${method} on socket ${socket.id} (user: ${userId})`);
|
||||||
log({ module: 'websocket-rpc' }, `Active RPC methods for user ${userId}: ${Array.from(rpcListeners.keys()).join(', ')}`);
|
// log({ module: 'websocket-rpc' }, `Active RPC methods for user ${userId}: ${Array.from(rpcListeners.keys()).join(', ')}`);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
log({ module: 'websocket', level: 'error' }, `Error in rpc-register: ${error}`);
|
log({ module: 'websocket', level: 'error' }, `Error in rpc-register: ${error}`);
|
||||||
socket.emit('rpc-error', { type: 'register', error: 'Internal error' });
|
socket.emit('rpc-error', { type: 'register', error: 'Internal error' });
|
||||||
@ -44,16 +44,16 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
|||||||
|
|
||||||
if (rpcListeners.get(method) === socket) {
|
if (rpcListeners.get(method) === socket) {
|
||||||
rpcListeners.delete(method);
|
rpcListeners.delete(method);
|
||||||
log({ module: 'websocket-rpc' }, `RPC method unregistered: ${method} from socket ${socket.id} (user: ${userId})`);
|
// log({ module: 'websocket-rpc' }, `RPC method unregistered: ${method} from socket ${socket.id} (user: ${userId})`);
|
||||||
|
|
||||||
if (rpcListeners.size === 0) {
|
if (rpcListeners.size === 0) {
|
||||||
rpcListeners.delete(userId);
|
rpcListeners.delete(userId);
|
||||||
log({ module: 'websocket-rpc' }, `All RPC methods unregistered for user ${userId}`);
|
// log({ module: 'websocket-rpc' }, `All RPC methods unregistered for user ${userId}`);
|
||||||
} else {
|
} else {
|
||||||
log({ module: 'websocket-rpc' }, `Remaining RPC methods for user ${userId}: ${Array.from(rpcListeners.keys()).join(', ')}`);
|
// log({ module: 'websocket-rpc' }, `Remaining RPC methods for user ${userId}: ${Array.from(rpcListeners.keys()).join(', ')}`);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log({ module: 'websocket-rpc' }, `RPC unregister ignored: ${method} not registered on socket ${socket.id}`);
|
// log({ module: 'websocket-rpc' }, `RPC unregister ignored: ${method} not registered on socket ${socket.id}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
socket.emit('rpc-unregistered', { method });
|
socket.emit('rpc-unregistered', { method });
|
||||||
@ -80,7 +80,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
|||||||
|
|
||||||
const targetSocket = rpcListeners.get(method);
|
const targetSocket = rpcListeners.get(method);
|
||||||
if (!targetSocket || !targetSocket.connected) {
|
if (!targetSocket || !targetSocket.connected) {
|
||||||
log({ module: 'websocket-rpc' }, `RPC call failed: Method ${method} not available (disconnected or not registered)`);
|
// log({ module: 'websocket-rpc' }, `RPC call failed: Method ${method} not available (disconnected or not registered)`);
|
||||||
if (callback) {
|
if (callback) {
|
||||||
callback({
|
callback({
|
||||||
ok: false,
|
ok: false,
|
||||||
@ -92,7 +92,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
|||||||
|
|
||||||
// Don't allow calling your own socket
|
// Don't allow calling your own socket
|
||||||
if (targetSocket === socket) {
|
if (targetSocket === socket) {
|
||||||
log({ module: 'websocket-rpc' }, `RPC call failed: Attempted self-call on method ${method}`);
|
// log({ module: 'websocket-rpc' }, `RPC call failed: Attempted self-call on method ${method}`);
|
||||||
if (callback) {
|
if (callback) {
|
||||||
callback({
|
callback({
|
||||||
ok: false,
|
ok: false,
|
||||||
@ -104,7 +104,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
|||||||
|
|
||||||
// Log RPC call initiation
|
// Log RPC call initiation
|
||||||
const startTime = Date.now();
|
const startTime = Date.now();
|
||||||
log({ module: 'websocket-rpc' }, `RPC call initiated: ${socket.id} -> ${method} (target: ${targetSocket.id})`);
|
// log({ module: 'websocket-rpc' }, `RPC call initiated: ${socket.id} -> ${method} (target: ${targetSocket.id})`);
|
||||||
|
|
||||||
// Forward the RPC request to the target socket using emitWithAck
|
// Forward the RPC request to the target socket using emitWithAck
|
||||||
try {
|
try {
|
||||||
@ -114,7 +114,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
|||||||
});
|
});
|
||||||
|
|
||||||
const duration = Date.now() - startTime;
|
const duration = Date.now() - startTime;
|
||||||
log({ module: 'websocket-rpc' }, `RPC call succeeded: ${method} (${duration}ms)`);
|
// log({ module: 'websocket-rpc' }, `RPC call succeeded: ${method} (${duration}ms)`);
|
||||||
|
|
||||||
// Forward the response back to the caller via callback
|
// Forward the response back to the caller via callback
|
||||||
if (callback) {
|
if (callback) {
|
||||||
@ -127,7 +127,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
|||||||
} catch (error) {
|
} catch (error) {
|
||||||
const duration = Date.now() - startTime;
|
const duration = Date.now() - startTime;
|
||||||
const errorMsg = error instanceof Error ? error.message : 'RPC call failed';
|
const errorMsg = error instanceof Error ? error.message : 'RPC call failed';
|
||||||
log({ module: 'websocket-rpc' }, `RPC call failed: ${method} - ${errorMsg} (${duration}ms)`);
|
// log({ module: 'websocket-rpc' }, `RPC call failed: ${method} - ${errorMsg} (${duration}ms)`);
|
||||||
|
|
||||||
// Timeout or error occurred
|
// Timeout or error occurred
|
||||||
if (callback) {
|
if (callback) {
|
||||||
@ -138,7 +138,7 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
log({ module: 'websocket', level: 'error' }, `Error in rpc-call: ${error}`);
|
// log({ module: 'websocket', level: 'error' }, `Error in rpc-call: ${error}`);
|
||||||
if (callback) {
|
if (callback) {
|
||||||
callback({
|
callback({
|
||||||
ok: false,
|
ok: false,
|
||||||
@ -158,13 +158,13 @@ export function rpcHandler(userId: string, socket: Socket, rpcListeners: Map<str
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (methodsToRemove.length > 0) {
|
if (methodsToRemove.length > 0) {
|
||||||
log({ module: 'websocket-rpc' }, `Cleaning up RPC methods on disconnect for socket ${socket.id}: ${methodsToRemove.join(', ')}`);
|
// log({ module: 'websocket-rpc' }, `Cleaning up RPC methods on disconnect for socket ${socket.id}: ${methodsToRemove.join(', ')}`);
|
||||||
methodsToRemove.forEach(method => rpcListeners.delete(method));
|
methodsToRemove.forEach(method => rpcListeners.delete(method));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rpcListeners.size === 0) {
|
if (rpcListeners.size === 0) {
|
||||||
rpcListeners.delete(userId);
|
rpcListeners.delete(userId);
|
||||||
log({ module: 'websocket-rpc' }, `All RPC listeners removed for user ${userId}`);
|
// log({ module: 'websocket-rpc' }, `All RPC listeners removed for user ${userId}`);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
@ -130,11 +130,27 @@ export type UpdateEvent = {
|
|||||||
} | {
|
} | {
|
||||||
type: 'delete-artifact';
|
type: 'delete-artifact';
|
||||||
artifactId: string;
|
artifactId: string;
|
||||||
|
} | {
|
||||||
|
type: 'delete-session';
|
||||||
|
sessionId: string;
|
||||||
} | {
|
} | {
|
||||||
type: 'relationship-updated';
|
type: 'relationship-updated';
|
||||||
uid: string;
|
uid: string;
|
||||||
status: 'none' | 'requested' | 'pending' | 'friend' | 'rejected';
|
status: 'none' | 'requested' | 'pending' | 'friend' | 'rejected';
|
||||||
timestamp: number;
|
timestamp: number;
|
||||||
|
} | {
|
||||||
|
type: 'new-feed-post';
|
||||||
|
id: string;
|
||||||
|
body: any;
|
||||||
|
cursor: string;
|
||||||
|
createdAt: number;
|
||||||
|
} | {
|
||||||
|
type: 'kv-batch-update';
|
||||||
|
changes: Array<{
|
||||||
|
key: string;
|
||||||
|
value: string | null; // null indicates deletion
|
||||||
|
version: number; // -1 for deleted keys
|
||||||
|
}>;
|
||||||
};
|
};
|
||||||
|
|
||||||
// === EPHEMERAL EVENT TYPES (Transient) ===
|
// === EPHEMERAL EVENT TYPES (Transient) ===
|
||||||
@ -380,6 +396,18 @@ export function buildUpdateSessionUpdate(sessionId: string, updateSeq: number, u
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function buildDeleteSessionUpdate(sessionId: string, updateSeq: number, updateId: string): UpdatePayload {
|
||||||
|
return {
|
||||||
|
id: updateId,
|
||||||
|
seq: updateSeq,
|
||||||
|
body: {
|
||||||
|
t: 'delete-session',
|
||||||
|
sid: sessionId
|
||||||
|
},
|
||||||
|
createdAt: Date.now()
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
export function buildUpdateAccountUpdate(userId: string, profile: Partial<AccountProfile>, updateSeq: number, updateId: string): UpdatePayload {
|
export function buildUpdateAccountUpdate(userId: string, profile: Partial<AccountProfile>, updateSeq: number, updateId: string): UpdatePayload {
|
||||||
return {
|
return {
|
||||||
id: updateId,
|
id: updateId,
|
||||||
@ -556,3 +584,39 @@ export function buildRelationshipUpdatedEvent(
|
|||||||
createdAt: Date.now()
|
createdAt: Date.now()
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function buildNewFeedPostUpdate(feedItem: {
|
||||||
|
id: string;
|
||||||
|
body: any;
|
||||||
|
cursor: string;
|
||||||
|
createdAt: number;
|
||||||
|
}, updateSeq: number, updateId: string): UpdatePayload {
|
||||||
|
return {
|
||||||
|
id: updateId,
|
||||||
|
seq: updateSeq,
|
||||||
|
body: {
|
||||||
|
t: 'new-feed-post',
|
||||||
|
id: feedItem.id,
|
||||||
|
body: feedItem.body,
|
||||||
|
cursor: feedItem.cursor,
|
||||||
|
createdAt: feedItem.createdAt
|
||||||
|
},
|
||||||
|
createdAt: Date.now()
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export function buildKVBatchUpdateUpdate(
|
||||||
|
changes: Array<{ key: string; value: string | null; version: number }>,
|
||||||
|
updateSeq: number,
|
||||||
|
updateId: string
|
||||||
|
): UpdatePayload {
|
||||||
|
return {
|
||||||
|
id: updateId,
|
||||||
|
seq: updateSeq,
|
||||||
|
body: {
|
||||||
|
t: 'kv-batch-update',
|
||||||
|
changes
|
||||||
|
},
|
||||||
|
createdAt: Date.now()
|
||||||
|
};
|
||||||
|
}
|
||||||
|
55
sources/app/feed/feedGet.ts
Normal file
55
sources/app/feed/feedGet.ts
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
import { Context } from "@/context";
|
||||||
|
import { FeedOptions, FeedResult } from "./types";
|
||||||
|
import { Prisma } from "@prisma/client";
|
||||||
|
import { Tx } from "@/storage/inTx";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch user's feed with pagination.
|
||||||
|
* Returns items in reverse chronological order (newest first).
|
||||||
|
* Supports cursor-based pagination using the counter field.
|
||||||
|
*/
|
||||||
|
export async function feedGet(
|
||||||
|
tx: Tx,
|
||||||
|
ctx: Context,
|
||||||
|
options?: FeedOptions
|
||||||
|
): Promise<FeedResult> {
|
||||||
|
const limit = options?.limit ?? 100;
|
||||||
|
const cursor = options?.cursor;
|
||||||
|
|
||||||
|
// Build where clause for cursor pagination
|
||||||
|
const where: Prisma.UserFeedItemWhereInput = { userId: ctx.uid };
|
||||||
|
|
||||||
|
if (cursor?.before !== undefined) {
|
||||||
|
if (cursor.before.startsWith('0-')) {
|
||||||
|
where.counter = { lt: parseInt(cursor.before.substring(2), 10) };
|
||||||
|
} else {
|
||||||
|
throw new Error('Invalid cursor format');
|
||||||
|
}
|
||||||
|
} else if (cursor?.after !== undefined) {
|
||||||
|
if (cursor.after.startsWith('0-')) {
|
||||||
|
where.counter = { gt: parseInt(cursor.after.substring(2), 10) };
|
||||||
|
} else {
|
||||||
|
throw new Error('Invalid cursor format');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch items + 1 to determine hasMore
|
||||||
|
const items = await tx.userFeedItem.findMany({
|
||||||
|
where,
|
||||||
|
orderBy: { counter: 'desc' },
|
||||||
|
take: limit + 1
|
||||||
|
});
|
||||||
|
|
||||||
|
// Check if there are more items
|
||||||
|
const hasMore = items.length > limit;
|
||||||
|
|
||||||
|
// Return only requested limit
|
||||||
|
return {
|
||||||
|
items: items.slice(0, limit).map(item => ({
|
||||||
|
...item,
|
||||||
|
createdAt: item.createdAt.getTime(),
|
||||||
|
cursor: '0-' + item.counter.toString(10)
|
||||||
|
})),
|
||||||
|
hasMore
|
||||||
|
};
|
||||||
|
}
|
67
sources/app/feed/feedPost.ts
Normal file
67
sources/app/feed/feedPost.ts
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
import { Context } from "@/context";
|
||||||
|
import { FeedBody, UserFeedItem } from "./types";
|
||||||
|
import { afterTx, Tx } from "@/storage/inTx";
|
||||||
|
import { allocateUserSeq } from "@/storage/seq";
|
||||||
|
import { eventRouter, buildNewFeedPostUpdate } from "@/app/events/eventRouter";
|
||||||
|
import { randomKeyNaked } from "@/utils/randomKeyNaked";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a post to user's feed.
|
||||||
|
* If repeatKey is provided and exists, the post will be updated in-place.
|
||||||
|
* Otherwise, a new post is created with an incremented counter.
|
||||||
|
*/
|
||||||
|
export async function feedPost(
|
||||||
|
tx: Tx,
|
||||||
|
ctx: Context,
|
||||||
|
body: FeedBody,
|
||||||
|
repeatKey?: string | null
|
||||||
|
): Promise<UserFeedItem> {
|
||||||
|
|
||||||
|
|
||||||
|
// Delete existing items with the same repeatKey
|
||||||
|
if (repeatKey) {
|
||||||
|
await tx.userFeedItem.deleteMany({
|
||||||
|
where: {
|
||||||
|
userId: ctx.uid,
|
||||||
|
repeatKey: repeatKey
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allocate new counter
|
||||||
|
const user = await tx.account.update({
|
||||||
|
where: { id: ctx.uid },
|
||||||
|
select: { feedSeq: true },
|
||||||
|
data: { feedSeq: { increment: 1 } }
|
||||||
|
});
|
||||||
|
|
||||||
|
// Create new item
|
||||||
|
const item = await tx.userFeedItem.create({
|
||||||
|
data: {
|
||||||
|
counter: user.feedSeq,
|
||||||
|
userId: ctx.uid,
|
||||||
|
repeatKey: repeatKey,
|
||||||
|
body: body
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = {
|
||||||
|
...item,
|
||||||
|
createdAt: item.createdAt.getTime(),
|
||||||
|
cursor: '0-' + item.counter.toString(10)
|
||||||
|
};
|
||||||
|
|
||||||
|
// Emit socket event after transaction completes
|
||||||
|
afterTx(tx, async () => {
|
||||||
|
const updateSeq = await allocateUserSeq(ctx.uid);
|
||||||
|
const updatePayload = buildNewFeedPostUpdate(result, updateSeq, randomKeyNaked(12));
|
||||||
|
|
||||||
|
eventRouter.emitUpdate({
|
||||||
|
userId: ctx.uid,
|
||||||
|
payload: updatePayload,
|
||||||
|
recipientFilter: { type: 'all-user-authenticated-connections' }
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
33
sources/app/feed/types.ts
Normal file
33
sources/app/feed/types.ts
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
import * as z from "zod";
|
||||||
|
|
||||||
|
export const FeedBodySchema = z.discriminatedUnion('kind', [
|
||||||
|
z.object({ kind: z.literal('friend_request'), uid: z.string() }),
|
||||||
|
z.object({ kind: z.literal('friend_accepted'), uid: z.string() }),
|
||||||
|
z.object({ kind: z.literal('text'), text: z.string() })
|
||||||
|
]);
|
||||||
|
|
||||||
|
export type FeedBody = z.infer<typeof FeedBodySchema>;
|
||||||
|
|
||||||
|
export interface UserFeedItem {
|
||||||
|
id: string;
|
||||||
|
userId: string;
|
||||||
|
repeatKey: string | null;
|
||||||
|
body: FeedBody;
|
||||||
|
createdAt: number;
|
||||||
|
cursor: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface FeedCursor {
|
||||||
|
before?: string;
|
||||||
|
after?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface FeedOptions {
|
||||||
|
limit?: number;
|
||||||
|
cursor?: FeedCursor;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface FeedResult {
|
||||||
|
items: UserFeedItem[];
|
||||||
|
hasMore: boolean;
|
||||||
|
}
|
41
sources/app/kv/kvBulkGet.ts
Normal file
41
sources/app/kv/kvBulkGet.ts
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
import { db } from "@/storage/db";
|
||||||
|
import * as privacyKit from "privacy-kit";
|
||||||
|
|
||||||
|
export interface KVBulkGetResult {
|
||||||
|
values: Array<{
|
||||||
|
key: string;
|
||||||
|
value: string;
|
||||||
|
version: number;
|
||||||
|
}>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get multiple key-value pairs for the authenticated user.
|
||||||
|
* Only returns existing keys with non-null values; missing or deleted keys are omitted.
|
||||||
|
*/
|
||||||
|
export async function kvBulkGet(
|
||||||
|
ctx: { uid: string },
|
||||||
|
keys: string[]
|
||||||
|
): Promise<KVBulkGetResult> {
|
||||||
|
const results = await db.userKVStore.findMany({
|
||||||
|
where: {
|
||||||
|
accountId: ctx.uid,
|
||||||
|
key: {
|
||||||
|
in: keys
|
||||||
|
},
|
||||||
|
value: {
|
||||||
|
not: null // Exclude deleted entries
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
values: results
|
||||||
|
.filter(r => r.value !== null) // Extra safety check
|
||||||
|
.map(r => ({
|
||||||
|
key: r.key,
|
||||||
|
value: privacyKit.encodeBase64(r.value!),
|
||||||
|
version: r.version
|
||||||
|
}))
|
||||||
|
};
|
||||||
|
}
|
37
sources/app/kv/kvGet.ts
Normal file
37
sources/app/kv/kvGet.ts
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
import { db } from "@/storage/db";
|
||||||
|
import * as privacyKit from "privacy-kit";
|
||||||
|
|
||||||
|
export type KVGetResult = {
|
||||||
|
key: string;
|
||||||
|
value: string;
|
||||||
|
version: number;
|
||||||
|
} | null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a single key-value pair for the authenticated user.
|
||||||
|
* Returns null if the key doesn't exist or if the value is null (deleted).
|
||||||
|
*/
|
||||||
|
export async function kvGet(
|
||||||
|
ctx: { uid: string },
|
||||||
|
key: string
|
||||||
|
): Promise<KVGetResult> {
|
||||||
|
const result = await db.userKVStore.findUnique({
|
||||||
|
where: {
|
||||||
|
accountId_key: {
|
||||||
|
accountId: ctx.uid,
|
||||||
|
key
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Treat missing records and null values as "not found"
|
||||||
|
if (!result || result.value === null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
key: result.key,
|
||||||
|
value: privacyKit.encodeBase64(result.value),
|
||||||
|
version: result.version
|
||||||
|
};
|
||||||
|
}
|
56
sources/app/kv/kvList.ts
Normal file
56
sources/app/kv/kvList.ts
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
import { db } from "@/storage/db";
|
||||||
|
import * as privacyKit from "privacy-kit";
|
||||||
|
|
||||||
|
export interface KVListOptions {
|
||||||
|
prefix?: string;
|
||||||
|
limit?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface KVListResult {
|
||||||
|
items: Array<{
|
||||||
|
key: string;
|
||||||
|
value: string;
|
||||||
|
version: number;
|
||||||
|
}>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List all key-value pairs for the authenticated user, optionally filtered by prefix.
|
||||||
|
* Returns keys, values, and versions. Excludes entries with null values (deleted).
|
||||||
|
*/
|
||||||
|
export async function kvList(
|
||||||
|
ctx: { uid: string },
|
||||||
|
options?: KVListOptions
|
||||||
|
): Promise<KVListResult> {
|
||||||
|
const where: any = {
|
||||||
|
accountId: ctx.uid,
|
||||||
|
value: {
|
||||||
|
not: null // Exclude deleted entries (null values)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Add prefix filter if specified
|
||||||
|
if (options?.prefix) {
|
||||||
|
where.key = {
|
||||||
|
startsWith: options.prefix
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const results = await db.userKVStore.findMany({
|
||||||
|
where,
|
||||||
|
orderBy: {
|
||||||
|
key: 'asc'
|
||||||
|
},
|
||||||
|
take: options?.limit
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
items: results
|
||||||
|
.filter(r => r.value !== null) // Extra safety check
|
||||||
|
.map(r => ({
|
||||||
|
key: r.key,
|
||||||
|
value: privacyKit.encodeBase64(r.value!),
|
||||||
|
version: r.version
|
||||||
|
}))
|
||||||
|
};
|
||||||
|
}
|
139
sources/app/kv/kvMutate.ts
Normal file
139
sources/app/kv/kvMutate.ts
Normal file
@ -0,0 +1,139 @@
|
|||||||
|
import { db } from "@/storage/db";
|
||||||
|
import { inTx, afterTx } from "@/storage/inTx";
|
||||||
|
import { allocateUserSeq } from "@/storage/seq";
|
||||||
|
import { randomKeyNaked } from "@/utils/randomKeyNaked";
|
||||||
|
import { eventRouter, buildKVBatchUpdateUpdate } from "@/app/events/eventRouter";
|
||||||
|
import * as privacyKit from "privacy-kit";
|
||||||
|
|
||||||
|
export interface KVMutation {
|
||||||
|
key: string;
|
||||||
|
value: string | null; // null = delete (sets value to null but keeps record)
|
||||||
|
version: number; // Always required, use -1 for new keys
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface KVMutateResult {
|
||||||
|
success: boolean;
|
||||||
|
results?: Array<{
|
||||||
|
key: string;
|
||||||
|
version: number;
|
||||||
|
}>;
|
||||||
|
errors?: Array<{
|
||||||
|
key: string;
|
||||||
|
error: 'version-mismatch';
|
||||||
|
version: number;
|
||||||
|
value: string | null; // Current value (null if deleted)
|
||||||
|
}>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Atomically mutate multiple key-value pairs.
|
||||||
|
* All mutations succeed or all fail.
|
||||||
|
* Version is always required for all operations (use -1 for new keys).
|
||||||
|
* Delete operations set value to null but keep the record with incremented version.
|
||||||
|
* Sends a single bundled update notification for all changes.
|
||||||
|
*/
|
||||||
|
export async function kvMutate(
|
||||||
|
ctx: { uid: string },
|
||||||
|
mutations: KVMutation[]
|
||||||
|
): Promise<KVMutateResult> {
|
||||||
|
return await inTx(async (tx) => {
|
||||||
|
const errors: KVMutateResult['errors'] = [];
|
||||||
|
|
||||||
|
// Pre-validate all mutations
|
||||||
|
for (const mutation of mutations) {
|
||||||
|
const existing = await tx.userKVStore.findUnique({
|
||||||
|
where: {
|
||||||
|
accountId_key: {
|
||||||
|
accountId: ctx.uid,
|
||||||
|
key: mutation.key
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const currentVersion = existing?.version ?? -1;
|
||||||
|
|
||||||
|
// Version check is always required
|
||||||
|
if (currentVersion !== mutation.version) {
|
||||||
|
errors.push({
|
||||||
|
key: mutation.key,
|
||||||
|
error: 'version-mismatch',
|
||||||
|
version: currentVersion,
|
||||||
|
value: existing?.value ? privacyKit.encodeBase64(existing.value) : null
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If any errors, return all errors and abort
|
||||||
|
if (errors.length > 0) {
|
||||||
|
return { success: false, errors };
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply all mutations and collect results
|
||||||
|
const results: Array<{ key: string; version: number }> = [];
|
||||||
|
const changes: Array<{ key: string; value: string | null; version: number }> = [];
|
||||||
|
|
||||||
|
for (const mutation of mutations) {
|
||||||
|
if (mutation.version === -1) {
|
||||||
|
// Create new entry (must not exist)
|
||||||
|
const result = await tx.userKVStore.create({
|
||||||
|
data: {
|
||||||
|
accountId: ctx.uid,
|
||||||
|
key: mutation.key,
|
||||||
|
value: mutation.value ? new Uint8Array(Buffer.from(mutation.value, 'base64')) : null,
|
||||||
|
version: 0
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
results.push({
|
||||||
|
key: mutation.key,
|
||||||
|
version: result.version
|
||||||
|
});
|
||||||
|
|
||||||
|
changes.push({
|
||||||
|
key: mutation.key,
|
||||||
|
value: mutation.value,
|
||||||
|
version: result.version
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
// Update existing entry (including "delete" which sets value to null)
|
||||||
|
const newVersion = mutation.version + 1;
|
||||||
|
|
||||||
|
const result = await tx.userKVStore.update({
|
||||||
|
where: {
|
||||||
|
accountId_key: {
|
||||||
|
accountId: ctx.uid,
|
||||||
|
key: mutation.key
|
||||||
|
}
|
||||||
|
},
|
||||||
|
data: {
|
||||||
|
value: mutation.value ? privacyKit.decodeBase64(mutation.value) : null,
|
||||||
|
version: newVersion
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
results.push({
|
||||||
|
key: mutation.key,
|
||||||
|
version: result.version
|
||||||
|
});
|
||||||
|
|
||||||
|
changes.push({
|
||||||
|
key: mutation.key,
|
||||||
|
value: mutation.value,
|
||||||
|
version: result.version
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send single bundled notification for all changes
|
||||||
|
afterTx(tx, async () => {
|
||||||
|
const updateSeq = await allocateUserSeq(ctx.uid);
|
||||||
|
eventRouter.emitUpdate({
|
||||||
|
userId: ctx.uid,
|
||||||
|
payload: buildKVBatchUpdateUpdate(changes, updateSeq, randomKeyNaked(12)),
|
||||||
|
recipientFilter: { type: 'user-scoped-only' }
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
return { success: true, results };
|
||||||
|
});
|
||||||
|
}
|
108
sources/app/session/sessionDelete.ts
Normal file
108
sources/app/session/sessionDelete.ts
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
import { Context } from "@/context";
|
||||||
|
import { inTx, afterTx } from "@/storage/inTx";
|
||||||
|
import { eventRouter, buildDeleteSessionUpdate } from "@/app/events/eventRouter";
|
||||||
|
import { allocateUserSeq } from "@/storage/seq";
|
||||||
|
import { randomKeyNaked } from "@/utils/randomKeyNaked";
|
||||||
|
import { log } from "@/utils/log";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete a session and all its related data.
|
||||||
|
* Handles:
|
||||||
|
* - Deleting all session messages
|
||||||
|
* - Deleting all usage reports for the session
|
||||||
|
* - Deleting all access keys for the session
|
||||||
|
* - Deleting the session itself
|
||||||
|
* - Sending socket notification to all connected clients
|
||||||
|
*
|
||||||
|
* @param ctx - Context with user information
|
||||||
|
* @param sessionId - ID of the session to delete
|
||||||
|
* @returns true if deletion was successful, false if session not found or not owned by user
|
||||||
|
*/
|
||||||
|
export async function sessionDelete(ctx: Context, sessionId: string): Promise<boolean> {
|
||||||
|
return await inTx(async (tx) => {
|
||||||
|
// Verify session exists and belongs to the user
|
||||||
|
const session = await tx.session.findFirst({
|
||||||
|
where: {
|
||||||
|
id: sessionId,
|
||||||
|
accountId: ctx.uid
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!session) {
|
||||||
|
log({
|
||||||
|
module: 'session-delete',
|
||||||
|
userId: ctx.uid,
|
||||||
|
sessionId
|
||||||
|
}, `Session not found or not owned by user`);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete all related data
|
||||||
|
// Note: Order matters to avoid foreign key constraint violations
|
||||||
|
|
||||||
|
// 1. Delete session messages
|
||||||
|
const deletedMessages = await tx.sessionMessage.deleteMany({
|
||||||
|
where: { sessionId }
|
||||||
|
});
|
||||||
|
log({
|
||||||
|
module: 'session-delete',
|
||||||
|
userId: ctx.uid,
|
||||||
|
sessionId,
|
||||||
|
deletedCount: deletedMessages.count
|
||||||
|
}, `Deleted ${deletedMessages.count} session messages`);
|
||||||
|
|
||||||
|
// 2. Delete usage reports
|
||||||
|
const deletedReports = await tx.usageReport.deleteMany({
|
||||||
|
where: { sessionId }
|
||||||
|
});
|
||||||
|
log({
|
||||||
|
module: 'session-delete',
|
||||||
|
userId: ctx.uid,
|
||||||
|
sessionId,
|
||||||
|
deletedCount: deletedReports.count
|
||||||
|
}, `Deleted ${deletedReports.count} usage reports`);
|
||||||
|
|
||||||
|
// 3. Delete access keys
|
||||||
|
const deletedAccessKeys = await tx.accessKey.deleteMany({
|
||||||
|
where: { sessionId }
|
||||||
|
});
|
||||||
|
log({
|
||||||
|
module: 'session-delete',
|
||||||
|
userId: ctx.uid,
|
||||||
|
sessionId,
|
||||||
|
deletedCount: deletedAccessKeys.count
|
||||||
|
}, `Deleted ${deletedAccessKeys.count} access keys`);
|
||||||
|
|
||||||
|
// 4. Delete the session itself
|
||||||
|
await tx.session.delete({
|
||||||
|
where: { id: sessionId }
|
||||||
|
});
|
||||||
|
log({
|
||||||
|
module: 'session-delete',
|
||||||
|
userId: ctx.uid,
|
||||||
|
sessionId
|
||||||
|
}, `Session deleted successfully`);
|
||||||
|
|
||||||
|
// Send notification after transaction commits
|
||||||
|
afterTx(tx, async () => {
|
||||||
|
const updSeq = await allocateUserSeq(ctx.uid);
|
||||||
|
const updatePayload = buildDeleteSessionUpdate(sessionId, updSeq, randomKeyNaked(12));
|
||||||
|
|
||||||
|
log({
|
||||||
|
module: 'session-delete',
|
||||||
|
userId: ctx.uid,
|
||||||
|
sessionId,
|
||||||
|
updateType: 'delete-session',
|
||||||
|
updatePayload: JSON.stringify(updatePayload)
|
||||||
|
}, `Emitting delete-session update to all user connections`);
|
||||||
|
|
||||||
|
eventRouter.emitUpdate({
|
||||||
|
userId: ctx.uid,
|
||||||
|
payload: updatePayload,
|
||||||
|
recipientFilter: { type: 'all-user-authenticated-connections' }
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
}
|
@ -1,10 +1,18 @@
|
|||||||
import { Context } from "@/context";
|
import { Context } from "@/context";
|
||||||
import { buildUserProfile, UserProfile } from "./type";
|
import { buildUserProfile, UserProfile } from "./type";
|
||||||
import { db } from "@/storage/db";
|
import { inTx } from "@/storage/inTx";
|
||||||
import { RelationshipStatus } from "@prisma/client";
|
import { RelationshipStatus } from "@prisma/client";
|
||||||
import { relationshipSet } from "./relationshipSet";
|
import { relationshipSet } from "./relationshipSet";
|
||||||
import { relationshipGet } from "./relationshipGet";
|
import { relationshipGet } from "./relationshipGet";
|
||||||
|
import { sendFriendRequestNotification, sendFriendshipEstablishedNotification } from "./friendNotification";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a friend or accept a friend request.
|
||||||
|
* Handles:
|
||||||
|
* - Accepting incoming friend requests (both users become friends)
|
||||||
|
* - Sending new friend requests
|
||||||
|
* - Sending appropriate notifications with 24-hour cooldown
|
||||||
|
*/
|
||||||
export async function friendAdd(ctx: Context, uid: string): Promise<UserProfile | null> {
|
export async function friendAdd(ctx: Context, uid: string): Promise<UserProfile | null> {
|
||||||
// Prevent self-friendship
|
// Prevent self-friendship
|
||||||
if (ctx.uid === uid) {
|
if (ctx.uid === uid) {
|
||||||
@ -12,7 +20,7 @@ export async function friendAdd(ctx: Context, uid: string): Promise<UserProfile
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Update relationship status
|
// Update relationship status
|
||||||
return await db.$transaction(async (tx) => {
|
return await inTx(async (tx) => {
|
||||||
|
|
||||||
// Read current user objects
|
// Read current user objects
|
||||||
const currentUser = await tx.account.findUnique({
|
const currentUser = await tx.account.findUnique({
|
||||||
@ -40,6 +48,9 @@ export async function friendAdd(ctx: Context, uid: string): Promise<UserProfile
|
|||||||
await relationshipSet(tx, targetUser.id, currentUser.id, RelationshipStatus.friend);
|
await relationshipSet(tx, targetUser.id, currentUser.id, RelationshipStatus.friend);
|
||||||
await relationshipSet(tx, currentUser.id, targetUser.id, RelationshipStatus.friend);
|
await relationshipSet(tx, currentUser.id, targetUser.id, RelationshipStatus.friend);
|
||||||
|
|
||||||
|
// Send friendship established notifications to both users
|
||||||
|
await sendFriendshipEstablishedNotification(tx, currentUser.id, targetUser.id);
|
||||||
|
|
||||||
// Return the target user profile
|
// Return the target user profile
|
||||||
return buildUserProfile(targetUser, RelationshipStatus.friend);
|
return buildUserProfile(targetUser, RelationshipStatus.friend);
|
||||||
}
|
}
|
||||||
@ -54,6 +65,9 @@ export async function friendAdd(ctx: Context, uid: string): Promise<UserProfile
|
|||||||
await relationshipSet(tx, targetUser.id, currentUser.id, RelationshipStatus.pending);
|
await relationshipSet(tx, targetUser.id, currentUser.id, RelationshipStatus.pending);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send friend request notification to the receiver
|
||||||
|
await sendFriendRequestNotification(tx, targetUser.id, currentUser.id);
|
||||||
|
|
||||||
// Return the target user profile
|
// Return the target user profile
|
||||||
return buildUserProfile(targetUser, RelationshipStatus.requested);
|
return buildUserProfile(targetUser, RelationshipStatus.requested);
|
||||||
}
|
}
|
||||||
|
61
sources/app/social/friendNotification.spec.ts
Normal file
61
sources/app/social/friendNotification.spec.ts
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
import { describe, it, expect, vi } from "vitest";
|
||||||
|
import { RelationshipStatus } from "@prisma/client";
|
||||||
|
|
||||||
|
// Mock the dependencies that require environment variables
|
||||||
|
vi.mock("@/storage/files", () => ({
|
||||||
|
getPublicUrl: vi.fn((path: string) => `https://example.com/${path}`)
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@/app/feed/feedPost", () => ({
|
||||||
|
feedPost: vi.fn()
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@/storage/inTx", () => ({
|
||||||
|
afterTx: vi.fn()
|
||||||
|
}));
|
||||||
|
|
||||||
|
// Import after mocking
|
||||||
|
import { shouldSendNotification } from "./friendNotification";
|
||||||
|
|
||||||
|
describe("friendNotification", () => {
|
||||||
|
describe("shouldSendNotification", () => {
|
||||||
|
it("should return true when lastNotifiedAt is null", () => {
|
||||||
|
const result = shouldSendNotification(null, RelationshipStatus.pending);
|
||||||
|
expect(result).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should return false for rejected relationships", () => {
|
||||||
|
const result = shouldSendNotification(null, RelationshipStatus.rejected);
|
||||||
|
expect(result).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should return false for rejected relationships even if 24 hours passed", () => {
|
||||||
|
const twentyFiveHoursAgo = new Date(Date.now() - 25 * 60 * 60 * 1000);
|
||||||
|
const result = shouldSendNotification(twentyFiveHoursAgo, RelationshipStatus.rejected);
|
||||||
|
expect(result).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should return true when 24 hours have passed since last notification", () => {
|
||||||
|
const twentyFiveHoursAgo = new Date(Date.now() - 25 * 60 * 60 * 1000);
|
||||||
|
const result = shouldSendNotification(twentyFiveHoursAgo, RelationshipStatus.pending);
|
||||||
|
expect(result).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should return false when less than 24 hours have passed", () => {
|
||||||
|
const tenHoursAgo = new Date(Date.now() - 10 * 60 * 60 * 1000);
|
||||||
|
const result = shouldSendNotification(tenHoursAgo, RelationshipStatus.pending);
|
||||||
|
expect(result).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should work for friend status", () => {
|
||||||
|
const twentyFiveHoursAgo = new Date(Date.now() - 25 * 60 * 60 * 1000);
|
||||||
|
const result = shouldSendNotification(twentyFiveHoursAgo, RelationshipStatus.friend);
|
||||||
|
expect(result).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should work for requested status", () => {
|
||||||
|
const result = shouldSendNotification(null, RelationshipStatus.requested);
|
||||||
|
expect(result).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
170
sources/app/social/friendNotification.ts
Normal file
170
sources/app/social/friendNotification.ts
Normal file
@ -0,0 +1,170 @@
|
|||||||
|
import { Prisma, RelationshipStatus } from "@prisma/client";
|
||||||
|
import { feedPost } from "@/app/feed/feedPost";
|
||||||
|
import { Context } from "@/context";
|
||||||
|
import { afterTx } from "@/storage/inTx";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a notification should be sent based on the last notification time and relationship status.
|
||||||
|
* Returns true if:
|
||||||
|
* - No previous notification was sent (lastNotifiedAt is null)
|
||||||
|
* - OR 24 hours have passed since the last notification
|
||||||
|
* - AND the relationship is not rejected
|
||||||
|
*/
|
||||||
|
export function shouldSendNotification(
|
||||||
|
lastNotifiedAt: Date | null,
|
||||||
|
status: RelationshipStatus
|
||||||
|
): boolean {
|
||||||
|
// Don't send notifications for rejected relationships
|
||||||
|
if (status === RelationshipStatus.rejected) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If never notified, send notification
|
||||||
|
if (!lastNotifiedAt) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if 24 hours have passed since last notification
|
||||||
|
const twentyFourHoursAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
|
||||||
|
return lastNotifiedAt < twentyFourHoursAgo;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a friend request notification to the receiver and update lastNotifiedAt.
|
||||||
|
* This creates a feed item for the receiver about the incoming friend request.
|
||||||
|
*/
|
||||||
|
export async function sendFriendRequestNotification(
|
||||||
|
tx: Prisma.TransactionClient,
|
||||||
|
receiverUserId: string,
|
||||||
|
senderUserId: string
|
||||||
|
): Promise<void> {
|
||||||
|
// Check if we should send notification to receiver
|
||||||
|
const receiverRelationship = await tx.userRelationship.findUnique({
|
||||||
|
where: {
|
||||||
|
fromUserId_toUserId: {
|
||||||
|
fromUserId: receiverUserId,
|
||||||
|
toUserId: senderUserId
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!receiverRelationship || !shouldSendNotification(
|
||||||
|
receiverRelationship.lastNotifiedAt,
|
||||||
|
receiverRelationship.status
|
||||||
|
)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create feed notification for receiver
|
||||||
|
const receiverCtx = Context.create(receiverUserId);
|
||||||
|
await feedPost(
|
||||||
|
tx,
|
||||||
|
receiverCtx,
|
||||||
|
{
|
||||||
|
kind: 'friend_request',
|
||||||
|
uid: senderUserId
|
||||||
|
},
|
||||||
|
`friend_request_${senderUserId}` // repeatKey to avoid duplicates
|
||||||
|
);
|
||||||
|
|
||||||
|
// Update lastNotifiedAt for the receiver's relationship record
|
||||||
|
await tx.userRelationship.update({
|
||||||
|
where: {
|
||||||
|
fromUserId_toUserId: {
|
||||||
|
fromUserId: receiverUserId,
|
||||||
|
toUserId: senderUserId
|
||||||
|
}
|
||||||
|
},
|
||||||
|
data: {
|
||||||
|
lastNotifiedAt: new Date()
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send friendship established notifications to both users and update lastNotifiedAt.
|
||||||
|
* This creates feed items for both users about the new friendship.
|
||||||
|
*/
|
||||||
|
export async function sendFriendshipEstablishedNotification(
|
||||||
|
tx: Prisma.TransactionClient,
|
||||||
|
user1Id: string,
|
||||||
|
user2Id: string
|
||||||
|
): Promise<void> {
|
||||||
|
// Check and send notification to user1
|
||||||
|
const user1Relationship = await tx.userRelationship.findUnique({
|
||||||
|
where: {
|
||||||
|
fromUserId_toUserId: {
|
||||||
|
fromUserId: user1Id,
|
||||||
|
toUserId: user2Id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (user1Relationship && shouldSendNotification(
|
||||||
|
user1Relationship.lastNotifiedAt,
|
||||||
|
user1Relationship.status
|
||||||
|
)) {
|
||||||
|
const user1Ctx = Context.create(user1Id);
|
||||||
|
await feedPost(
|
||||||
|
tx,
|
||||||
|
user1Ctx,
|
||||||
|
{
|
||||||
|
kind: 'friend_accepted',
|
||||||
|
uid: user2Id
|
||||||
|
},
|
||||||
|
`friend_accepted_${user2Id}` // repeatKey to avoid duplicates
|
||||||
|
);
|
||||||
|
|
||||||
|
// Update lastNotifiedAt for user1
|
||||||
|
await tx.userRelationship.update({
|
||||||
|
where: {
|
||||||
|
fromUserId_toUserId: {
|
||||||
|
fromUserId: user1Id,
|
||||||
|
toUserId: user2Id
|
||||||
|
}
|
||||||
|
},
|
||||||
|
data: {
|
||||||
|
lastNotifiedAt: new Date()
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check and send notification to user2
|
||||||
|
const user2Relationship = await tx.userRelationship.findUnique({
|
||||||
|
where: {
|
||||||
|
fromUserId_toUserId: {
|
||||||
|
fromUserId: user2Id,
|
||||||
|
toUserId: user1Id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (user2Relationship && shouldSendNotification(
|
||||||
|
user2Relationship.lastNotifiedAt,
|
||||||
|
user2Relationship.status
|
||||||
|
)) {
|
||||||
|
const user2Ctx = Context.create(user2Id);
|
||||||
|
await feedPost(
|
||||||
|
tx,
|
||||||
|
user2Ctx,
|
||||||
|
{
|
||||||
|
kind: 'friend_accepted',
|
||||||
|
uid: user1Id
|
||||||
|
},
|
||||||
|
`friend_accepted_${user1Id}` // repeatKey to avoid duplicates
|
||||||
|
);
|
||||||
|
|
||||||
|
// Update lastNotifiedAt for user2
|
||||||
|
await tx.userRelationship.update({
|
||||||
|
where: {
|
||||||
|
fromUserId_toUserId: {
|
||||||
|
fromUserId: user2Id,
|
||||||
|
toUserId: user1Id
|
||||||
|
}
|
||||||
|
},
|
||||||
|
data: {
|
||||||
|
lastNotifiedAt: new Date()
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -1,12 +1,12 @@
|
|||||||
import { Context } from "@/context";
|
import { Context } from "@/context";
|
||||||
import { buildUserProfile, UserProfile } from "./type";
|
import { buildUserProfile, UserProfile } from "./type";
|
||||||
import { db } from "@/storage/db";
|
import { inTx } from "@/storage/inTx";
|
||||||
import { RelationshipStatus } from "@prisma/client";
|
import { RelationshipStatus } from "@prisma/client";
|
||||||
import { relationshipSet } from "./relationshipSet";
|
import { relationshipSet } from "./relationshipSet";
|
||||||
import { relationshipGet } from "./relationshipGet";
|
import { relationshipGet } from "./relationshipGet";
|
||||||
|
|
||||||
export async function friendRemove(ctx: Context, uid: string): Promise<UserProfile | null> {
|
export async function friendRemove(ctx: Context, uid: string): Promise<UserProfile | null> {
|
||||||
return await db.$transaction(async (tx) => {
|
return await inTx(async (tx) => {
|
||||||
|
|
||||||
// Read current user objects
|
// Read current user objects
|
||||||
const currentUser = await tx.account.findUnique({
|
const currentUser = await tx.account.findUnique({
|
||||||
|
@ -1,7 +1,17 @@
|
|||||||
import { Prisma } from "@prisma/client";
|
import { Prisma } from "@prisma/client";
|
||||||
import { RelationshipStatus } from "@prisma/client";
|
import { RelationshipStatus } from "@prisma/client";
|
||||||
|
|
||||||
export async function relationshipSet(tx: Prisma.TransactionClient, from: string, to: string, status: RelationshipStatus) {
|
export async function relationshipSet(tx: Prisma.TransactionClient, from: string, to: string, status: RelationshipStatus, lastNotifiedAt?: Date) {
|
||||||
|
// Get existing relationship to preserve lastNotifiedAt
|
||||||
|
const existing = await tx.userRelationship.findUnique({
|
||||||
|
where: {
|
||||||
|
fromUserId_toUserId: {
|
||||||
|
fromUserId: from,
|
||||||
|
toUserId: to
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
if (status === RelationshipStatus.friend) {
|
if (status === RelationshipStatus.friend) {
|
||||||
await tx.userRelationship.upsert({
|
await tx.userRelationship.upsert({
|
||||||
where: {
|
where: {
|
||||||
@ -14,11 +24,14 @@ export async function relationshipSet(tx: Prisma.TransactionClient, from: string
|
|||||||
fromUserId: from,
|
fromUserId: from,
|
||||||
toUserId: to,
|
toUserId: to,
|
||||||
status,
|
status,
|
||||||
acceptedAt: new Date()
|
acceptedAt: new Date(),
|
||||||
|
lastNotifiedAt: lastNotifiedAt || null
|
||||||
},
|
},
|
||||||
update: {
|
update: {
|
||||||
status,
|
status,
|
||||||
acceptedAt: new Date()
|
acceptedAt: new Date(),
|
||||||
|
// Preserve existing lastNotifiedAt, only update if explicitly provided
|
||||||
|
lastNotifiedAt: lastNotifiedAt || existing?.lastNotifiedAt || undefined
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
@ -33,11 +46,14 @@ export async function relationshipSet(tx: Prisma.TransactionClient, from: string
|
|||||||
fromUserId: from,
|
fromUserId: from,
|
||||||
toUserId: to,
|
toUserId: to,
|
||||||
status,
|
status,
|
||||||
acceptedAt: null
|
acceptedAt: null,
|
||||||
|
lastNotifiedAt: lastNotifiedAt || null
|
||||||
},
|
},
|
||||||
update: {
|
update: {
|
||||||
status,
|
status,
|
||||||
acceptedAt: null
|
acceptedAt: null,
|
||||||
|
// Preserve existing lastNotifiedAt, only update if explicitly provided
|
||||||
|
lastNotifiedAt: lastNotifiedAt || existing?.lastNotifiedAt || undefined
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,4 @@ export class Context {
|
|||||||
private constructor(uid: string) {
|
private constructor(uid: string) {
|
||||||
this.uid = uid;
|
this.uid = uid;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export type Tx = Prisma.TransactionClient | PrismaClient;
|
|
Loading…
Reference in New Issue
Block a user