chat-dev / server /wsHandler.js
incognitolm
Migration to PostgreSQL
bff1056
import OpenAI from 'openai';
import { safeSend, broadcastToUser } from './helpers.js';
import { LIGHTNING_BASE, PUBLIC_URL } from './config.js';
import { sessionStore, deviceSessionStore } from './sessionStore.js';
import { rateLimiter } from './rateLimiter.js';
import { initGuestRequestLimiter, consumeGuestRequest } from './guestRequestLimiter.js';
import {
verifySupabaseToken, getUserSettings, saveUserSettings,
getUserProfile, setUsername, getSubscriptionInfo,
getTierConfig, getUsageInfo,
} from './auth.js';
import { streamChat } from './chatStream.js';
import { mediaStore } from './mediaStore.js';
import { memoryStore } from './memoryStore.js';
import { chatTrashStore } from './chatTrashStore.js';
import { systemPromptStore } from './systemPromptStore.js';
import { getWebSearchUsage } from './webSearchUsageStore.js';
import crypto from 'crypto';
import path from 'path';
/**
* Message Structure: Tree-based with versioned tails
*
* Each message has versions, and each version has a complete tail of subsequent messages.
* Messages only exist within parent tails (no separate flat array).
*
* {
* id: "msg-123",
* role: "user" | "assistant",
* content: string | array,
* timestamp: number,
* versions: [
* {
* content: string | array,
* tail: [ // Full message objects
* { id, role, content, timestamp, versions: [...], currentVersionIdx, ... },
* ...
* ],
* timestamp: number
* },
* ...
* ],
* currentVersionIdx: 0,
* toolCalls?: [...]
* }
*/
const activeStreams = new Map();
const VERSION_META_FIELDS = ['toolCalls', 'responseEdits', 'responseSegments', 'error'];
const CONTINUE_ASSISTANT_PROMPT =
'Continue your previous response exactly where it left off. Do not restart, summarize, or repeat the opening. Preserve the same formatting and only add the missing continuation.';
const FREE_WEB_SEARCH_LIMIT = 15;
export function abortActiveStream(ws) {
if (!activeStreams.has(ws)) return;
activeStreams.get(ws).abort();
activeStreams.delete(ws);
}
initGuestRequestLimiter().catch(err => console.error('Failed to initialize guest request limiter:', err));
function usageOwnerKey(client, clientId = '') {
if (client?.userId) return `user:${client.userId}`;
return `guest:${clientId || client?.tempId || 'anonymous'}`;
}
function isFreeSearchPlan(client, usageInfo) {
if (!client?.userId) return true;
const planKey = usageInfo?.plan_key || usageInfo?.planKey || null;
return planKey === 'free';
}
async function buildUsagePayload(client, clientId = '') {
const usageInfo = await getUsageInfo(client?.accessToken, clientId);
const webSearchDaily = await getWebSearchUsage(usageOwnerKey(client, clientId), FREE_WEB_SEARCH_LIMIT);
return {
...(usageInfo || {}),
usage: {
...(usageInfo?.usage || {}),
webSearchDaily,
},
};
}
export async function handleWsMessage(ws, msg, wsClients) {
const client = wsClients.get(ws); if (!client) return;
// Require turnstile verification for most message types
if (!client.verified && msg.type !== 'ping' && msg.type !== 'turnstile:verify') {
return safeSend(ws, { type: 'error', message: 'turnstile:required' });
}
const bypassDeviceValidation = new Set([
'ping',
'turnstile:verify',
'auth:login',
'auth:guest',
'auth:logout',
]);
if (client.userId && client.deviceToken && !bypassDeviceValidation.has(msg.type)) {
const activeDeviceSession = await deviceSessionStore.validate(client.deviceToken);
if (!activeDeviceSession) {
const priorUserId = client.userId;
Object.assign(client, { userId: null, authenticated: false, accessToken: null, deviceToken: null });
sessionStore.markOffline(priorUserId, ws);
return safeSend(ws, { type: 'auth:forcedLogout', reason: 'Session revoked by another device' });
}
}
const h = handlers[msg.type];
if (h) return h(ws, msg, client, wsClients);
safeSend(ws, { type: 'error', message: `Unknown: ${msg.type}` });
}
function bcast(wsClients, userId, data, excludeWs) {
broadcastToUser(wsClients, userId, data, excludeWs);
}
const handlers = {
'ping': (ws) => { safeSend(ws, { type: 'pong' }); },
'turnstile:verify': async (ws, msg, client) => {
try {
const token = msg?.token;
const secret = process.env.TURNSTILE_SECRET_KEY;
if (!token || !secret) return safeSend(ws, { type: 'turnstile:error', message: 'Missing token or server not configured' });
const params = new URLSearchParams(); params.append('secret', secret); params.append('response', token);
if (client.ip) params.append('remoteip', client.ip);
const r = await fetch('https://challenges.cloudflare.com/turnstile/v0/siteverify', { method: 'POST', body: params });
const j = await r.json();
if (j?.success) { client.verified = true; return safeSend(ws, { type: 'turnstile:ok' }); }
return safeSend(ws, { type: 'turnstile:error', message: 'Verification failed' });
} catch (e) { console.error('ws turnstile verify', e); return safeSend(ws, { type: 'turnstile:error', message: 'Server error' }); }
},
'auth:login': async (ws, msg, client, wsClients) => {
const { accessToken, tempId: clientTempId, deviceToken: requestedDeviceToken } = msg;
if (!accessToken) return safeSend(ws, { type: 'auth:error', message: 'Missing token' });
const user = await verifySupabaseToken(accessToken);
if (!user) return safeSend(ws, { type: 'auth:error', message: 'Invalid token' });
let nextDeviceToken = null;
let reusedDeviceSession = false;
if (requestedDeviceToken) {
const existingDevice = await deviceSessionStore.validate(requestedDeviceToken);
if (existingDevice?.userId === user.id) {
existingDevice.ip = client.ip;
existingDevice.userAgent = client.userAgent;
nextDeviceToken = existingDevice.token;
reusedDeviceSession = true;
}
}
if (!nextDeviceToken) {
nextDeviceToken = await deviceSessionStore.create(user.id, client.ip, client.userAgent);
}
if (client.deviceToken && client.deviceToken !== nextDeviceToken) {
await deviceSessionStore.revoke(client.deviceToken);
}
client.userId = user.id; client.accessToken = accessToken; client.authenticated = true;
client.deviceToken = nextDeviceToken;
sessionStore.markOnline(user.id, ws);
if (clientTempId) client.tempId = clientTempId;
const tId = client.tempId;
await sessionStore.transferTempToUser(tId, user.id, accessToken);
const [sessions, settings, profile, subscription] = await Promise.all([
sessionStore.loadUserSessions(user.id, accessToken),
getUserSettings(user.id, accessToken),
getUserProfile(user.id, accessToken),
getSubscriptionInfo(accessToken),
]);
const authOkMsg = { type: 'auth:ok', userId: user.id, email: user.email,
deviceToken: client.deviceToken, sessions: sessions.map(ser), settings, profile, subscription };
safeSend(ws, authOkMsg);
if (!reusedDeviceSession) {
bcast(wsClients, user.id, { type: 'auth:newLogin', message: 'New login on your account.',
ip: client.ip, userAgent: client.userAgent, timestamp: new Date().toISOString() }, ws);
}
},
'auth:logout': async (ws, msg, client) => {
const priorUserId = client.userId;
if (client.deviceToken) await deviceSessionStore.revoke(client.deviceToken);
Object.assign(client, { userId: null, authenticated: false, accessToken: null, deviceToken: null });
if (priorUserId) sessionStore.markOffline(priorUserId, ws);
safeSend(ws, { type: 'auth:loggedOut' });
},
'auth:guest': async (ws, msg, client) => {
const t = msg.tempId || client.tempId;
client.tempId = t;
sessionStore.initTemp(t);
const sessions = await sessionStore.getTempSessions(t);
safeSend(ws, { type: 'auth:guestOk', tempId: t, sessions: sessions.map(ser) });
},
'sessions:list': async (ws, msg, client) => {
const list = client.userId
? sessionStore.getUserSessions(client.userId)
: await sessionStore.getTempSessions(client.tempId);
list.sort((a, b) => b.created - a.created);
safeSend(ws, { type: 'sessions:list', sessions: list.map(ser) });
},
'sessions:create': async (ws, msg, client) => {
const s = client.userId
? await sessionStore.createUserSession(client.userId, client.accessToken)
: await sessionStore.createTempSession(client.tempId);
safeSend(ws, { type: 'sessions:created', session: ser(s) });
},
'sessions:delete': async (ws, msg, client) => {
const owner = getClientOwner(client);
const session = client.userId
? sessionStore.getUserSession(client.userId, msg.sessionId)
: await sessionStore.getTempSession(client.tempId, msg.sessionId);
if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' });
await chatTrashStore.add(owner, JSON.parse(JSON.stringify(session)));
if (client.userId) {
await sessionStore.deleteUserSession(client.userId, client.accessToken, msg.sessionId);
await sessionStore.deleteTempSessionEverywhere(msg.sessionId);
} else {
await sessionStore.deleteTempSession(client.tempId, msg.sessionId);
}
safeSend(ws, { type: 'sessions:deleted', sessionId: msg.sessionId });
safeSend(ws, { type: 'trash:chats:changed' });
},
'sessions:deleteAll': async (ws, msg, client) => {
const owner = getClientOwner(client);
const sessions = client.userId
? sessionStore.getUserSessions(client.userId)
: await sessionStore.getTempSessions(client.tempId);
for (const session of sessions) {
await chatTrashStore.add(owner, JSON.parse(JSON.stringify(session)));
if (client.userId) await sessionStore.deleteTempSessionEverywhere(session.id);
}
if (client.userId) await sessionStore.deleteAllUserSessions(client.userId, client.accessToken);
else await sessionStore.deleteTempAll(client.tempId);
safeSend(ws, { type: 'sessions:deletedAll' });
safeSend(ws, { type: 'trash:chats:changed' });
},
'sessions:rename': async (ws, msg, client) => {
const name = (msg.name || '').trim(); if (!name) return;
if (client.userId)
await sessionStore.updateUserSession(client.userId, client.accessToken, msg.sessionId, { name });
else await sessionStore.updateTempSession(client.tempId, msg.sessionId, { name });
safeSend(ws, { type: 'sessions:renamed', sessionId: msg.sessionId, name });
},
'sessions:get': async (ws, msg, client) => {
const s = client.userId
? sessionStore.getUserSession(client.userId, msg.sessionId)
: await sessionStore.getTempSession(client.tempId, msg.sessionId);
if (!s) return safeSend(ws, { type: 'error', message: 'Session not found' });
safeSend(ws, { type: 'sessions:data', session: ser(s) });
},
'sessions:share': async (ws, msg, client) => {
if (!client.userId) return safeSend(ws, { type: 'error', message: 'Sign in to share' });
const token = await sessionStore.createShareToken(client.userId, client.accessToken, msg.sessionId);
if (!token) return safeSend(ws, { type: 'error', message: 'Share failed' });
safeSend(ws, { type: 'sessions:shareUrl', url: `${PUBLIC_URL}/?share=${token}`, sessionId: msg.sessionId });
},
'sessions:import': async (ws, msg, client) => {
if (!client.userId) return safeSend(ws, { type: 'error', message: 'Sign in to import' });
const s = await sessionStore.importSharedSession(client.userId, client.accessToken, msg.token);
if (!s) return safeSend(ws, { type: 'error', message: 'Invalid share link' });
safeSend(ws, { type: 'sessions:imported', session: ser(s) });
},
'trash:chats:list': async (ws, msg, client) => {
const owner = getClientOwner(client);
const items = await chatTrashStore.list(owner);
safeSend(ws, { type: 'trash:chats:list', items });
},
'trash:chats:restore': async (ws, msg, client) => {
const owner = getClientOwner(client);
const restored = await chatTrashStore.restore(owner, msg.ids || []);
const sessions = [];
for (const snapshot of restored) {
const restoredSession = await restoreDeletedSession(client, snapshot);
if (restoredSession) sessions.push(ser(restoredSession));
}
safeSend(ws, { type: 'trash:chats:restored', sessions });
safeSend(ws, { type: 'trash:chats:changed' });
},
'trash:chats:deleteForever': async (ws, msg, client) => {
const owner = getClientOwner(client);
const removedIds = await chatTrashStore.deleteForever(owner, msg.ids || []);
safeSend(ws, { type: 'trash:chats:deletedForever', ids: removedIds });
safeSend(ws, { type: 'trash:chats:changed' });
},
'chat:send': async (ws, msg, client) => {
const { sessionId, content, tools } = msg;
const owner = getClientOwner(client);
if (!client.userId) {
const allowed = await consumeGuestRequest(client.ip || 'unknown');
if (!allowed) return safeSend(ws, { type: 'guest:rateLimit', message: 'Guest request limit exceeded' });
if (!await sessionStore.tempCanSend(client.tempId)) return safeSend(ws, { type: 'chat:limitReached' });
await sessionStore.tempBump(client.tempId);
}
const session = client.userId
? sessionStore.getUserSession(client.userId, sessionId)
: await sessionStore.getTempSession(client.tempId, sessionId);
if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' });
abortActiveStream(ws);
const abort = new AbortController();
activeStreams.set(ws, abort);
safeSend(ws, { type: 'chat:start', sessionId });
let fullText = '';
const assetsCollected = [], toolCallsCollected = [];
// Extract flat history from tree structure
const rootMessage = session.history?.[0];
const flatHistory = rootMessage ? extractFlatHistory(rootMessage) : [];
if (Array.isArray(msg.linkedMediaIds) && msg.linkedMediaIds.length) {
await mediaStore.attachToSession(owner, msg.linkedMediaIds, sessionId).catch((err) => {
console.error('Failed to link uploaded media to session:', err);
});
}
const usagePayload = await buildUsagePayload(client, msg.clientId || '');
const webSearchLimit = isFreeSearchPlan(client, usagePayload)
? { key: usageOwnerKey(client, msg.clientId || ''), limit: FREE_WEB_SEARCH_LIMIT }
: null;
await streamChat({
sessionId,
model: session.model,
history: flatHistory,
userMessage: content,
tools: tools || {},
accessToken: client.accessToken,
clientId: msg.clientId,
webSearchLimit,
owner,
sessionName: session.name,
abortSignal: abort.signal,
onToken(t) { fullText += t; safeSend(ws, { type: 'chat:token', token: t, sessionId }); },
onToolCall(call) {
safeSend(ws, { type: 'chat:toolCall', call, sessionId });
if (call.state === 'resolved' || call.state === 'canceled') toolCallsCollected.push(call);
},
onNewAsset(asset) {
safeSend(ws, { type: 'chat:asset', asset, sessionId });
assetsCollected.push(asset);
safeSend(ws, { type: 'media:changed' });
},
onDraftEdit(edit, draftText) {
safeSend(ws, { type: 'chat:draftEdited', edit, text: draftText, sessionId });
},
async onDone(text, toolCalls, aborted, sessionNameFromTag, responseEdits = [], responseSegments = []) {
activeStreams.delete(ws);
const finalText = text || fullText;
// Only create user entry if content was actually provided
const hasContent = content !== undefined && content !== null && content !== '' &&
!(Array.isArray(content) && content.length === 0);
const userEntry = hasContent
? buildEntry('user', content)
: null;
const resolvedMap = new Map(toolCallsCollected.map(c => [c.id, c]));
const mergedCalls = (toolCalls || []).map(c => {
const resolved = resolvedMap.get(c.id) || {};
return { ...c, state: resolved.state || 'resolved', result: resolved.result };
});
const asstEntry = buildEntry('assistant', finalText, mergedCalls, {
responseEdits,
responseSegments,
});
const mediaEntries = assetsCollected.map((asset) =>
buildMediaEntry(asset.role, {
assetId: asset.id,
mimeType: asset.mimeType,
name: asset.name,
})
);
const generatedFiles = extractAssistantGeneratedFiles(finalText);
for (const file of generatedFiles) {
await mediaStore.storeBuffer(owner, {
name: file.name,
mimeType: file.mimeType,
buffer: file.buffer,
sessionId,
source: 'assistant_generated',
kind: file.kind,
}).catch((err) => console.error('Failed to store generated text asset:', err));
}
if (generatedFiles.length) safeSend(ws, { type: 'media:changed' });
// Rebuild tree structure with new messages appended to the active branch leaf.
let newRootMessage = rootMessage ? cloneAndRepairTree(rootMessage) : null;
if (!newRootMessage) {
// First message in session - must have user entry
if (!userEntry) return safeSend(ws, { type: 'error', message: 'No content for first message' });
newRootMessage = userEntry;
newRootMessage.versions[0].tail = [{ ...asstEntry }, ...mediaEntries];
} else if (userEntry) {
appendConversationTurn(newRootMessage, userEntry, asstEntry, mediaEntries);
} else {
const appendedEntries = [
asstEntry,
...mediaEntries,
];
appendEntriesToActiveLeaf(newRootMessage, appendedEntries);
}
const newHistory = [newRootMessage];
let newName = session.name;
if (sessionNameFromTag) {
newName = sessionNameFromTag;
} else if (!session.history?.length || session.name === 'New Chat') {
newName = session.name;
}
if (client.userId)
await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory, name: newName });
else await sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory, name: newName });
safeSend(ws, { type: aborted ? 'chat:aborted' : 'chat:done', sessionId, name: newName, history: extractFlatHistory(newRootMessage) });
},
onError(err) {
activeStreams.delete(ws);
console.error('streamChat error:', err);
safeSend(ws, { type: 'chat:error', error: String(err), sessionId });
safeSend(ws, { type: 'chat:aborted', sessionId, reason: 'error' });
},
});
},
'chat:stop': (ws) => { abortActiveStream(ws); },
'chat:editMessage': async (ws, msg, client) => {
const { sessionId, messageIndex, newContent } = msg;
const session = client.userId
? sessionStore.getUserSession(client.userId, sessionId)
: await sessionStore.getTempSession(client.tempId, sessionId);
if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' });
const rootMessage = session.history?.[0];
if (!rootMessage) return safeSend(ws, { type: 'error', message: 'No history' });
const flatHistory = extractFlatHistory(rootMessage);
const targetMsg = flatHistory[messageIndex];
if (!targetMsg) {
console.error(`chat:editMessage: Message at index ${messageIndex} not found. History length: ${flatHistory.length}`);
return safeSend(ws, { type: 'error', message: 'Message not found' });
}
// Find the target message in the tree and add new version
const newRoot = cloneAndRepairTree(rootMessage);
const context = findMessageContext(newRoot, targetMsg.id);
if (!context?.message) return safeSend(ws, { type: 'error', message: 'Message not found in tree' });
const found = findAndUpdateMessage(newRoot, targetMsg.id, (msgInTree) => {
if (msgInTree.role === 'user' && Array.isArray(context.parentTail) && context.index >= 0) {
const trailing = context.parentTail.splice(context.index + 1);
if (trailing.length) {
const currentVersion = getActiveVersion(msgInTree);
currentVersion.tail = [...(currentVersion.tail || []), ...trailing];
}
}
// Add new version with EMPTY tail (no responses yet for this edited version)
msgInTree.versions.push({
content: newContent,
tail: [], // New version starts fresh, no tail
timestamp: Date.now()
});
msgInTree.currentVersionIdx = msgInTree.versions.length - 1;
msgInTree.content = newContent;
});
if (!found) return;
const newHistory = [newRoot];
if (client.userId) {
await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory });
} else {
await sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory });
}
// Send back the updated message with its ID and the full flat history
const updatedFlatHistory = extractFlatHistory(newRoot);
const updatedTargetMsg = updatedFlatHistory[messageIndex];
if (!updatedTargetMsg) {
console.error(`chat:editMessage: Updated message not found at index ${messageIndex}. Updated history length: ${updatedFlatHistory.length}`);
return safeSend(ws, { type: 'error', message: 'Failed to apply edit - message lost' });
}
safeSend(ws, { type: 'chat:messageEdited', sessionId, messageId: targetMsg.id, messageIndex, message: updatedTargetMsg, history: updatedFlatHistory });
},
'chat:selectVersion': async (ws, msg, client) => {
const { sessionId, messageIndex, versionIdx } = msg;
const session = client.userId
? sessionStore.getUserSession(client.userId, sessionId)
: await sessionStore.getTempSession(client.tempId, sessionId);
if (!session) return;
const rootMessage = session.history?.[0];
if (!rootMessage) return;
const flatHistory = extractFlatHistory(rootMessage);
const targetMsg = flatHistory[messageIndex];
if (!targetMsg || !targetMsg.versions || versionIdx >= targetMsg.versions.length) return;
// Find and update the message in tree, switching to specified version
const newRoot = cloneAndRepairTree(rootMessage);
const found = findAndUpdateMessage(newRoot, targetMsg.id, (msgInTree) => {
msgInTree.currentVersionIdx = versionIdx;
msgInTree.content = msgInTree.versions[versionIdx].content;
// Tail is automatically correct since each version has its own tail
});
if (!found) return;
const newHistory = [newRoot];
if (client.userId) {
await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory });
} else {
await sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory });
}
// Send back with messageId for clarity
safeSend(ws, { type: 'chat:versionSelected', sessionId, messageId: targetMsg.id, messageIndex, history: extractFlatHistory(newRoot) });
},
'chat:assistantAction': async (ws, msg, client) => {
const { sessionId, messageIndex } = msg;
const action = msg.action === 'continue' ? 'continue' : 'regenerate';
const session = client.userId
? sessionStore.getUserSession(client.userId, sessionId)
: await sessionStore.getTempSession(client.tempId, sessionId);
if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' });
const rootMessage = session.history?.[0];
if (!rootMessage) return safeSend(ws, { type: 'error', message: 'No history' });
const flatHistory = extractFlatHistory(rootMessage);
const targetMsg = flatHistory[messageIndex];
if (!targetMsg || targetMsg.role !== 'assistant') {
return safeSend(ws, { type: 'error', message: 'Assistant message not found' });
}
abortActiveStream(ws);
const abort = new AbortController();
activeStreams.set(ws, abort);
const owner = getClientOwner(client);
const historyBeforeTarget = flatHistory.slice(0, messageIndex);
const baseAssistantText = stripSessionTagText(targetMsg.content || '');
const actionHistory = action === 'continue'
? [...historyBeforeTarget, { role: 'assistant', content: baseAssistantText }]
: historyBeforeTarget;
const actionUserMessage = action === 'continue' ? CONTINUE_ASSISTANT_PROMPT : null;
safeSend(ws, {
type: 'chat:start',
sessionId,
streamKind: 'assistantAction',
action,
messageIndex,
prefillText: action === 'continue' ? baseAssistantText : '',
});
let fullText = '';
const assetsCollected = [];
const toolCallsCollected = [];
const usagePayload = await buildUsagePayload(client, msg.clientId || '');
const webSearchLimit = isFreeSearchPlan(client, usagePayload)
? { key: usageOwnerKey(client, msg.clientId || ''), limit: FREE_WEB_SEARCH_LIMIT }
: null;
await streamChat({
sessionId,
model: session.model,
history: actionHistory,
userMessage: actionUserMessage,
tools: msg.tools || {},
accessToken: client.accessToken,
clientId: msg.clientId,
webSearchLimit,
owner,
sessionName: session.name,
abortSignal: abort.signal,
onToken(t) {
fullText += t;
safeSend(ws, { type: 'chat:token', token: t, sessionId });
},
onToolCall(call) {
safeSend(ws, { type: 'chat:toolCall', call, sessionId });
if (call.state === 'resolved' || call.state === 'canceled') toolCallsCollected.push(call);
},
onNewAsset(asset) {
safeSend(ws, { type: 'chat:asset', asset, sessionId });
assetsCollected.push(asset);
safeSend(ws, { type: 'media:changed' });
},
onDraftEdit(edit, draftText) {
safeSend(ws, { type: 'chat:draftEdited', edit, text: draftText, sessionId });
},
async onDone(text, toolCalls, aborted, sessionNameFromTag, responseEdits = [], responseSegments = []) {
activeStreams.delete(ws);
if (aborted) {
return safeSend(ws, { type: 'chat:aborted', sessionId });
}
const rawAssistantText = text || fullText;
const resolvedMap = new Map(toolCallsCollected.map((call) => [call.id, call]));
const mergedCalls = (toolCalls || []).map((call) => {
const resolved = resolvedMap.get(call.id) || {};
return { ...call, state: resolved.state || 'resolved', result: resolved.result };
});
let finalText = rawAssistantText;
let finalSegments = responseSegments;
if (action === 'continue') {
const { continuationText, overlapLength } = stripContinuationOverlap(baseAssistantText, rawAssistantText);
finalText = baseAssistantText + continuationText;
finalSegments = [
...(baseAssistantText ? [{ type: 'text', text: baseAssistantText }] : []),
...trimLeadingTextFromSegments(responseSegments, overlapLength),
];
}
const mediaEntries = assetsCollected.map((asset) =>
buildMediaEntry(asset.role, {
assetId: asset.id,
mimeType: asset.mimeType,
name: asset.name,
})
);
const generatedFiles = extractAssistantGeneratedFiles(finalText);
for (const file of generatedFiles) {
await mediaStore.storeBuffer(owner, {
name: file.name,
mimeType: file.mimeType,
buffer: file.buffer,
sessionId,
source: 'assistant_generated',
kind: file.kind,
}).catch((err) => console.error('Failed to store generated text asset:', err));
}
if (generatedFiles.length) safeSend(ws, { type: 'media:changed' });
const newRoot = cloneAndRepairTree(rootMessage);
const found = findAndUpdateMessage(newRoot, targetMsg.id, (msgInTree) => {
const nextVersion = buildVersionRecord(finalText, {
tail: mediaEntries,
toolCalls: mergedCalls,
responseEdits,
responseSegments: finalSegments,
});
applyVersionToMessage(msgInTree, nextVersion);
});
if (!found) {
return safeSend(ws, { type: 'error', message: 'Assistant branch could not be updated' });
}
let newName = session.name;
if (sessionNameFromTag) {
newName = sessionNameFromTag;
}
const newHistory = [newRoot];
if (client.userId) {
await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory, name: newName });
} else {
await sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory, name: newName });
}
safeSend(ws, { type: 'chat:done', sessionId, name: newName, history: extractFlatHistory(newRoot) });
},
onError(err) {
activeStreams.delete(ws);
console.error('assistant action streamChat error:', err);
safeSend(ws, { type: 'chat:error', error: String(err), sessionId });
safeSend(ws, { type: 'chat:aborted', sessionId, reason: 'error' });
},
});
},
'settings:get': async (ws, msg, client) => {
const s = client.userId
? await getUserSettings(client.userId, client.accessToken)
: { theme: 'dark', webSearch: true, imageGen: true, videoGen: true, audioGen: true };
safeSend(ws, { type: 'settings:data', settings: s });
},
'settings:save': async (ws, msg, client, wsClients) => {
if (!client.userId) return;
await saveUserSettings(client.userId, client.accessToken, msg.settings);
safeSend(ws, { type: 'settings:saved' });
bcast(wsClients, client.userId, { type: 'settings:updated', settings: msg.settings }, ws);
},
'personalization:get': async (ws, msg, client) => {
const defaultPrompt = await systemPromptStore.getDefaultPrompt();
const personalization = client.userId
? await systemPromptStore.getPersonalization(client.userId)
: {
defaultPrompt,
customPrompt: null,
resolvedPrompt: defaultPrompt,
isCustom: false,
updatedAt: null,
canEdit: false,
};
safeSend(ws, { type: 'personalization:data', personalization });
},
'personalization:saveSystemPrompt': async (ws, msg, client, wsClients) => {
if (!client.userId) return safeSend(ws, { type: 'personalization:error', message: 'Sign in to customize your system prompt' });
try {
const personalization = await systemPromptStore.setUserPrompt(client.userId, msg.markdown);
safeSend(ws, { type: 'personalization:updated', personalization });
bcast(wsClients, client.userId, { type: 'personalization:updated', personalization }, ws);
} catch (err) {
safeSend(ws, { type: 'personalization:error', message: err.message || 'Unable to save system prompt' });
}
},
'personalization:resetSystemPrompt': async (ws, msg, client, wsClients) => {
if (!client.userId) return safeSend(ws, { type: 'personalization:error', message: 'Sign in to customize your system prompt' });
try {
const personalization = await systemPromptStore.resetUserPrompt(client.userId);
safeSend(ws, { type: 'personalization:updated', personalization });
bcast(wsClients, client.userId, { type: 'personalization:updated', personalization }, ws);
} catch (err) {
safeSend(ws, { type: 'personalization:error', message: err.message || 'Unable to reset system prompt' });
}
},
'memories:list': async (ws, msg, client) => {
const owner = getClientOwner(client);
const items = await memoryStore.list(owner);
safeSend(ws, { type: 'memories:list', items });
},
'memories:create': async (ws, msg, client, wsClients) => {
const owner = getClientOwner(client);
const memory = await memoryStore.create(owner, {
content: msg.content,
sessionId: msg.sessionId || null,
source: msg.source || 'manual',
});
safeSend(ws, { type: 'memories:created', memory });
if (client.userId) bcast(wsClients, client.userId, { type: 'memories:changed' }, ws);
},
'memories:update': async (ws, msg, client, wsClients) => {
const owner = getClientOwner(client);
const memory = await memoryStore.update(owner, msg.id, msg.content);
safeSend(ws, { type: 'memories:updated', memory });
if (client.userId) bcast(wsClients, client.userId, { type: 'memories:changed' }, ws);
},
'memories:delete': async (ws, msg, client, wsClients) => {
const owner = getClientOwner(client);
const ok = await memoryStore.delete(owner, msg.id);
safeSend(ws, { type: 'memories:deleted', id: ok ? msg.id : null });
if (client.userId) bcast(wsClients, client.userId, { type: 'memories:changed' }, ws);
},
'account:getProfile': async (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:profile', profile: await getUserProfile(c.userId, c.accessToken) }); },
'account:setUsername': async (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:usernameResult', ...await setUsername(c.userId, c.accessToken, msg.username) }); },
'account:getSubscription': async (ws, msg, c) => {
if (!c.userId) return console.warn('[Account] getSubscription called without userId');
const subInfo = await getSubscriptionInfo(c.accessToken);
safeSend(ws, { type: 'account:subscription', info: subInfo });
},
'account:getUsage': async (ws, msg, c) => { safeSend(ws, { type: 'account:usage', usage: await buildUsagePayload(c, msg.clientId || '') }); },
'account:getTierConfig': async (ws) => { safeSend(ws, { type: 'account:tierConfig', config: await getTierConfig() }); },
'account:getSessions': async (ws, msg, c) => {
if (!c.userId) return;
safeSend(ws, {
type: 'account:deviceSessions',
sessions: await deviceSessionStore.getForUser(c.userId),
currentToken: c.deviceToken,
});
},
'account:revokeSession': async (ws, msg, c, wsClients) => {
if (!c.userId || !msg.token) return;
const revoked = await deviceSessionStore.revoke(msg.token);
if (revoked) {
const activeSessions = await deviceSessionStore.getForUser(c.userId);
for (const [ows, oc] of wsClients) {
if (oc.userId !== c.userId) continue;
if (oc.deviceToken === msg.token) {
safeSend(ows, { type: 'auth:forcedLogout', reason: 'Session revoked by another device' });
sessionStore.markOffline(oc.userId, ows);
Object.assign(oc, { userId: null, authenticated: false, accessToken: null, deviceToken: null });
continue;
}
safeSend(ows, {
type: 'account:deviceSessions',
sessions: activeSessions,
currentToken: oc.deviceToken,
});
}
}
safeSend(ws, { type: 'account:sessionRevoked', token: msg.token });
},
'account:revokeAllOthers': async (ws, msg, c, wsClients) => {
if (!c.userId) return;
await deviceSessionStore.revokeAllExcept(c.userId, c.deviceToken);
const activeSessions = await deviceSessionStore.getForUser(c.userId);
for (const [ows, oc] of wsClients) {
if (oc.userId !== c.userId) continue;
if (oc.deviceToken && oc.deviceToken !== c.deviceToken) {
safeSend(ows, { type: 'auth:forcedLogout', reason: 'Session revoked by another device' });
sessionStore.markOffline(oc.userId, ows);
Object.assign(oc, { userId: null, authenticated: false, accessToken: null, deviceToken: null });
continue;
}
safeSend(ows, {
type: 'account:deviceSessions',
sessions: activeSessions,
currentToken: oc.deviceToken,
});
}
safeSend(ws, { type: 'account:allOthersRevoked' });
},
};
function ser(s) { return { id: s.id, name: s.name, created: s.created, history: s.history || [], model: s.model }; }
function getClientOwner(client) {
return client.userId
? { type: 'user', id: client.userId }
: { type: 'guest', id: client.tempId };
}
async function restoreDeletedSession(client, snapshot) {
if (!snapshot) return null;
const restored = JSON.parse(JSON.stringify(snapshot));
const existing = client.userId
? sessionStore.getUserSession(client.userId, restored.id)
: await sessionStore.getTempSession(client.tempId, restored.id);
if (existing) restored.id = crypto.randomUUID();
restored.created = restored.created || Date.now();
if (client.userId) {
return sessionStore.restoreUserSession(client.userId, client.accessToken, restored);
}
return sessionStore.restoreTempSession(client.tempId, restored);
}
function generateMessageId() {
return `msg-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
function buildEntry(role, content, toolCalls = [], extraFields = {}) {
const normalizedCalls = toolCalls.map(c => ({
id: c.id,
name: c.name || c.function?.name,
args: c.args ?? (c.function?.arguments ? (() => { try { return JSON.parse(c.function.arguments); } catch { return c.function.arguments; } })() : {}),
state: c.state || 'resolved',
result: c.result,
}));
const validContent = (content === undefined || content === null) ? '' : content;
const versionMeta = {};
const topLevelExtraFields = { ...extraFields };
VERSION_META_FIELDS.forEach((key) => {
if (key in topLevelExtraFields) {
versionMeta[key] = topLevelExtraFields[key];
delete topLevelExtraFields[key];
}
});
return {
id: generateMessageId(),
role,
content: validContent,
timestamp: Date.now(),
versions: [{
content: validContent,
tail: [],
timestamp: Date.now(),
...(normalizedCalls.length ? { toolCalls: normalizedCalls } : {}),
...versionMeta,
}],
currentVersionIdx: 0,
...(normalizedCalls.length ? { toolCalls: normalizedCalls } : {}),
...versionMeta,
...topLevelExtraFields,
};
}
function buildMediaEntry(role, content) {
return {
id: generateMessageId(),
role,
content,
timestamp: Date.now(),
versions: [{ content, tail: [], timestamp: Date.now() }],
currentVersionIdx: 0,
};
}
function buildVersionRecord(content, extraFields = {}) {
const validContent = content === undefined || content === null ? '' : content;
const version = {
content: validContent,
tail: Array.isArray(extraFields.tail) ? extraFields.tail : [],
timestamp: Date.now(),
};
VERSION_META_FIELDS.forEach((key) => {
if (extraFields[key] !== undefined && extraFields[key] !== null) {
version[key] = extraFields[key];
}
});
return version;
}
function applyVersionToMessage(message, versionRecord) {
if (!message?.versions || !Array.isArray(message.versions)) {
message.versions = [];
}
message.versions.push(versionRecord);
message.currentVersionIdx = message.versions.length - 1;
syncMessageFromActiveVersion(message);
return message;
}
function stripSessionTagText(content) {
return typeof content === 'string'
? content.replace(/<session_name>[\s\S]*?<\/session_name>/gi, '').trim()
: content;
}
function stripContinuationOverlap(baseText = '', generatedText = '') {
const base = String(baseText || '');
const generated = String(generatedText || '');
if (!base) return { continuationText: generated, overlapLength: 0 };
if (!generated) return { continuationText: '', overlapLength: 0 };
if (generated.startsWith(base)) {
return { continuationText: generated.slice(base.length), overlapLength: base.length };
}
const maxWindow = Math.min(base.length, generated.length, 400);
for (let size = maxWindow; size > 0; size--) {
if (base.slice(-size) === generated.slice(0, size)) {
return { continuationText: generated.slice(size), overlapLength: size };
}
}
return { continuationText: generated, overlapLength: 0 };
}
function trimLeadingTextFromSegments(segments = [], overlapLength = 0) {
let remaining = Math.max(0, overlapLength);
const trimmedSegments = [];
for (const segment of segments || []) {
if (!segment || segment.type !== 'text' || remaining <= 0) {
trimmedSegments.push(segment);
continue;
}
const text = String(segment.text || '');
if (remaining >= text.length) {
remaining -= text.length;
continue;
}
trimmedSegments.push({
...segment,
text: text.slice(remaining),
});
remaining = 0;
}
return trimmedSegments.filter((segment) => segment && (segment.type !== 'text' || segment.text));
}
function extractAssistantGeneratedFiles(text) {
if (!text || typeof text !== 'string') return [];
const files = [];
const detailsRe = /<details><summary>([^<]+?)<\/summary>\s*```(?:\w*)\n([\s\S]*?)\n```\s*<\/details>/g;
const svgRe = /```svg\n([\s\S]*?)\n```/g;
let match;
while ((match = detailsRe.exec(text)) !== null) {
const name = String(match[1] || '').trim();
const ext = path.extname(name).toLowerCase();
files.push({
name: name || 'generated-file.txt',
mimeType: ext === '.html' || ext === '.htm' ? 'text/html' : 'text/plain',
kind: ext === '.html' || ext === '.htm' ? 'rich_text' : 'text',
buffer: Buffer.from(match[2], 'utf8'),
});
}
let svgIndex = 1;
while ((match = svgRe.exec(text)) !== null) {
files.push({
name: `generated-image-${svgIndex++}.svg`,
mimeType: 'image/svg+xml',
kind: 'image',
buffer: Buffer.from(match[1], 'utf8'),
});
}
return files;
}
/**
* Validate and repair tree structure after cloning/modification
* Ensures all messages and versions have valid content property
*/
function validateAndRepairTree(rootMessage) {
const repair = (msg) => {
if (!msg) return;
// Ensure message has content
if (msg.content === undefined || msg.content === null) {
msg.content = '';
}
// Ensure versions array and each version's content
if (msg.versions && Array.isArray(msg.versions)) {
for (const version of msg.versions) {
if (version.content === undefined || version.content === null) {
version.content = '';
}
// Recursively repair tail messages
if (version.tail && Array.isArray(version.tail)) {
for (const tailMsg of version.tail) {
repair(tailMsg);
}
}
}
}
};
repair(rootMessage);
return rootMessage;
}
function cloneAndRepairTree(rootMessage) {
return validateAndRepairTree(JSON.parse(JSON.stringify(rootMessage)));
}
function getActiveVersion(message) {
if (!message) return null;
const versions = Array.isArray(message.versions) ? message.versions : [];
if (!versions.length) {
message.versions = [{ content: message.content ?? '', tail: [], timestamp: Date.now() }];
message.currentVersionIdx = 0;
return message.versions[0];
}
const currentVersionIdx = Number.isInteger(message.currentVersionIdx)
? Math.max(0, Math.min(message.currentVersionIdx, versions.length - 1))
: 0;
message.currentVersionIdx = currentVersionIdx;
if (!Array.isArray(message.versions[currentVersionIdx].tail)) {
message.versions[currentVersionIdx].tail = [];
}
if (message.versions[currentVersionIdx].content === undefined || message.versions[currentVersionIdx].content === null) {
message.versions[currentVersionIdx].content = message.content ?? '';
}
return message.versions[currentVersionIdx];
}
function cloneVersionMetaValue(value) {
if (value === undefined) return undefined;
return JSON.parse(JSON.stringify(value));
}
function syncMessageFromActiveVersion(message) {
if (!message) return message;
const currentVersion = getActiveVersion(message);
if (!currentVersion) return message;
message.content = currentVersion.content ?? message.content ?? '';
VERSION_META_FIELDS.forEach((key) => {
if (key in currentVersion) {
message[key] = cloneVersionMetaValue(currentVersion[key]);
} else {
delete message[key];
}
});
return message;
}
function getActiveLeafMessage(rootMessage) {
let current = rootMessage;
while (current) {
const currentVersion = getActiveVersion(current);
const tail = Array.isArray(currentVersion?.tail) ? currentVersion.tail : [];
if (!tail.length) return current;
current = tail[tail.length - 1];
}
return rootMessage;
}
function appendEntriesToActiveLeaf(rootMessage, entries = []) {
if (!rootMessage || !entries.length) return rootMessage;
const leaf = getActiveLeafMessage(rootMessage);
const currentVersion = getActiveVersion(leaf);
currentVersion.tail = [...(currentVersion.tail || []), ...entries];
return rootMessage;
}
function appendConversationTurn(rootMessage, userEntry, assistantEntry, mediaEntries = []) {
if (!rootMessage || !userEntry) return rootMessage;
const leaf = getActiveLeafMessage(rootMessage);
const currentVersion = getActiveVersion(leaf);
const userVersion = getActiveVersion(userEntry);
userVersion.tail = [
...(assistantEntry ? [assistantEntry] : []),
...(Array.isArray(mediaEntries) ? mediaEntries : []),
];
syncMessageFromActiveVersion(userEntry);
currentVersion.tail = [...(currentVersion.tail || []), userEntry];
return rootMessage;
}
function extractFlatHistory(rootMessage) {
if (!rootMessage) return [];
// Helper to ensure message has valid content
const ensureValidContent = (msg) => {
if (msg.content === undefined || msg.content === null) {
msg.content = '';
}
return syncMessageFromActiveVersion(msg);
};
const history = [ensureValidContent(rootMessage)];
const currentVerIdx = rootMessage.currentVersionIdx ?? 0;
if (!Array.isArray(rootMessage.versions)) {
console.warn(`extractFlatHistory: Root message ${rootMessage.id} missing versions array`);
return history;
}
if (currentVerIdx >= rootMessage.versions.length) {
console.warn(`extractFlatHistory: Root message currentVersionIdx ${currentVerIdx} out of bounds (${rootMessage.versions.length} versions)`);
return history;
}
const currentTail = rootMessage.versions[currentVerIdx]?.tail;
if (currentTail && Array.isArray(currentTail)) {
const walkTail = (tail) => {
for (let i = 0; i < tail.length; i++) {
const msg = tail[i];
history.push(ensureValidContent(msg));
const ver = msg.versions?.[msg.currentVersionIdx ?? 0];
if (ver?.tail && Array.isArray(ver.tail)) {
walkTail(ver.tail);
}
if (msg.role === 'user' && Array.isArray(msg.versions) && msg.versions.length > 1) {
break;
}
}
};
walkTail(currentTail);
}
return history;
}
function findMessageContext(rootMessage, targetId) {
if (!rootMessage) return null;
if (rootMessage.id === targetId) {
return { message: rootMessage, parent: null, parentTail: null, index: -1 };
}
const search = (msg) => {
const verIdx = msg.currentVersionIdx ?? 0;
const tail = msg.versions?.[verIdx]?.tail;
if (!tail || !Array.isArray(tail)) return null;
for (let i = 0; i < tail.length; i++) {
const child = tail[i];
if (child.id === targetId) {
return { message: child, parent: msg, parentTail: tail, index: i };
}
const nested = search(child);
if (nested) return nested;
}
return null;
};
return search(rootMessage);
}
function findAndUpdateMessage(rootMessage, targetId, updateFn) {
if (rootMessage.id === targetId) {
updateFn(rootMessage);
return true;
}
const search = (msg) => {
const verIdx = msg.currentVersionIdx ?? 0;
const tail = msg.versions?.[verIdx]?.tail;
if (!tail || !Array.isArray(tail)) return false;
for (const child of tail) {
if (child.id === targetId) {
updateFn(child);
return true;
}
if (search(child)) return true;
}
return false;
};
return search(rootMessage);
}