feat: add update state
This commit is contained in:
parent
0ccdef14ff
commit
e2854faaa8
@ -672,10 +672,94 @@ export async function startApi() {
|
||||
pubsub.emit('update', userId, result.update);
|
||||
|
||||
// Send success response with new version via callback
|
||||
callback({ success: true, metadataVersion: result.newMetadataVersion, metadata: metadata });
|
||||
callback({ result: 'success', version: result.newMetadataVersion, metadata: metadata });
|
||||
|
||||
});
|
||||
|
||||
socket.on('update-state', async (data: any, callback: (response: any) => void) => {
|
||||
const { sid, agentState, expectedVersion } = data;
|
||||
|
||||
// Validate input
|
||||
if (!sid || (typeof agentState !== 'string' && agentState !== null) || typeof expectedVersion !== 'number') {
|
||||
if (callback) {
|
||||
callback({ result: 'error' });
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Start transaction to ensure consistency
|
||||
const result = await db.$transaction(async (tx) => {
|
||||
// Verify session belongs to user and lock it
|
||||
const session = await tx.session.findFirst({
|
||||
where: {
|
||||
id: sid,
|
||||
accountId: userId
|
||||
}
|
||||
});
|
||||
const user = await tx.account.findUnique({
|
||||
where: { id: userId }
|
||||
});
|
||||
if (!user || !session) {
|
||||
callback({ result: 'error' });
|
||||
return null;
|
||||
}
|
||||
|
||||
// Check version
|
||||
if (session.agentStateVersion !== expectedVersion) {
|
||||
callback({ result: 'version-mismatch', version: session.agentStateVersion, agentState: session.agentState });
|
||||
return null;
|
||||
}
|
||||
|
||||
// Get next sequence number
|
||||
const updSeq = user.seq + 1;
|
||||
const newAgentStateVersion = session.agentStateVersion + 1;
|
||||
|
||||
// Update session agent state
|
||||
await tx.session.update({
|
||||
where: { id: sid },
|
||||
data: {
|
||||
agentState: agentState,
|
||||
agentStateVersion: newAgentStateVersion
|
||||
}
|
||||
});
|
||||
|
||||
// Create update
|
||||
const updContent: PrismaJson.UpdateBody = {
|
||||
t: 'update-session',
|
||||
id: sid,
|
||||
agentState: {
|
||||
value: agentState,
|
||||
version: newAgentStateVersion
|
||||
}
|
||||
};
|
||||
|
||||
const update = await tx.update.create({
|
||||
data: {
|
||||
accountId: userId,
|
||||
seq: updSeq,
|
||||
content: updContent
|
||||
}
|
||||
});
|
||||
|
||||
// Update user sequence
|
||||
await tx.account.update({
|
||||
where: { id: userId },
|
||||
data: { seq: updSeq }
|
||||
});
|
||||
|
||||
return { update, newAgentStateVersion };
|
||||
});
|
||||
if (!result) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Emit update to connected sockets
|
||||
pubsub.emit('update', userId, result.update);
|
||||
|
||||
// Send success response with new version via callback
|
||||
callback({ result: 'success', version: result.newAgentStateVersion, agentState: agentState });
|
||||
});
|
||||
|
||||
socket.emit('auth', { success: true, user: userId });
|
||||
log({ module: 'websocket' }, `User connected: ${userId}`);
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user