import OpenAI from 'openai'; import { safeSend, broadcastToUser } from './helpers.js'; import { LIGHTNING_BASE, PUBLIC_URL } from './config.js'; import { sessionStore, deviceSessionStore } from './sessionStore.js'; import { rateLimiter } from './rateLimiter.js'; import { initGuestRequestLimiter, consumeGuestRequest } from './guestRequestLimiter.js'; import { verifySupabaseToken, getUserSettings, saveUserSettings, getUserProfile, setUsername, getSubscriptionInfo, getTierConfig, getUsageInfo, } from './auth.js'; import { streamChat } from './chatStream.js'; import { mediaStore } from './mediaStore.js'; import { memoryStore } from './memoryStore.js'; import { chatTrashStore } from './chatTrashStore.js'; import { systemPromptStore } from './systemPromptStore.js'; import { getWebSearchUsage } from './webSearchUsageStore.js'; import crypto from 'crypto'; import path from 'path'; /** * Message Structure: Tree-based with versioned tails * * Each message has versions, and each version has a complete tail of subsequent messages. * Messages only exist within parent tails (no separate flat array). * * { * id: "msg-123", * role: "user" | "assistant", * content: string | array, * timestamp: number, * versions: [ * { * content: string | array, * tail: [ // Full message objects * { id, role, content, timestamp, versions: [...], currentVersionIdx, ... }, * ... * ], * timestamp: number * }, * ... * ], * currentVersionIdx: 0, * toolCalls?: [...] * } */ const activeStreams = new Map(); const VERSION_META_FIELDS = ['toolCalls', 'responseEdits', 'responseSegments', 'error']; const CONTINUE_ASSISTANT_PROMPT = 'Continue your previous response exactly where it left off. Do not restart, summarize, or repeat the opening. Preserve the same formatting and only add the missing continuation.'; const FREE_WEB_SEARCH_LIMIT = 15; export function abortActiveStream(ws) { if (!activeStreams.has(ws)) return; activeStreams.get(ws).abort(); activeStreams.delete(ws); } initGuestRequestLimiter().catch(err => console.error('Failed to initialize guest request limiter:', err)); function usageOwnerKey(client, clientId = '') { if (client?.userId) return `user:${client.userId}`; return `guest:${clientId || client?.tempId || 'anonymous'}`; } function isFreeSearchPlan(client, usageInfo) { if (!client?.userId) return true; const planKey = usageInfo?.plan_key || usageInfo?.planKey || null; return planKey === 'free'; } async function buildUsagePayload(client, clientId = '') { const usageInfo = await getUsageInfo(client?.accessToken, clientId); const webSearchDaily = await getWebSearchUsage(usageOwnerKey(client, clientId), FREE_WEB_SEARCH_LIMIT); return { ...(usageInfo || {}), usage: { ...(usageInfo?.usage || {}), webSearchDaily, }, }; } export async function handleWsMessage(ws, msg, wsClients) { const client = wsClients.get(ws); if (!client) return; // Require turnstile verification for most message types if (!client.verified && msg.type !== 'ping' && msg.type !== 'turnstile:verify') { return safeSend(ws, { type: 'error', message: 'turnstile:required' }); } const bypassDeviceValidation = new Set([ 'ping', 'turnstile:verify', 'auth:login', 'auth:guest', 'auth:logout', ]); if (client.userId && client.deviceToken && !bypassDeviceValidation.has(msg.type)) { const activeDeviceSession = await deviceSessionStore.validate(client.deviceToken); if (!activeDeviceSession) { const priorUserId = client.userId; Object.assign(client, { userId: null, authenticated: false, accessToken: null, deviceToken: null }); sessionStore.markOffline(priorUserId, ws); return safeSend(ws, { type: 'auth:forcedLogout', reason: 'Session revoked by another device' }); } } const h = handlers[msg.type]; if (h) return h(ws, msg, client, wsClients); safeSend(ws, { type: 'error', message: `Unknown: ${msg.type}` }); } function bcast(wsClients, userId, data, excludeWs) { broadcastToUser(wsClients, userId, data, excludeWs); } const handlers = { 'ping': (ws) => { safeSend(ws, { type: 'pong' }); }, 'turnstile:verify': async (ws, msg, client) => { try { const token = msg?.token; const secret = process.env.TURNSTILE_SECRET_KEY; if (!token || !secret) return safeSend(ws, { type: 'turnstile:error', message: 'Missing token or server not configured' }); const params = new URLSearchParams(); params.append('secret', secret); params.append('response', token); if (client.ip) params.append('remoteip', client.ip); const r = await fetch('https://challenges.cloudflare.com/turnstile/v0/siteverify', { method: 'POST', body: params }); const j = await r.json(); if (j?.success) { client.verified = true; return safeSend(ws, { type: 'turnstile:ok' }); } return safeSend(ws, { type: 'turnstile:error', message: 'Verification failed' }); } catch (e) { console.error('ws turnstile verify', e); return safeSend(ws, { type: 'turnstile:error', message: 'Server error' }); } }, 'auth:login': async (ws, msg, client, wsClients) => { const { accessToken, tempId: clientTempId, deviceToken: requestedDeviceToken } = msg; if (!accessToken) return safeSend(ws, { type: 'auth:error', message: 'Missing token' }); const user = await verifySupabaseToken(accessToken); if (!user) return safeSend(ws, { type: 'auth:error', message: 'Invalid token' }); let nextDeviceToken = null; let reusedDeviceSession = false; if (requestedDeviceToken) { const existingDevice = await deviceSessionStore.validate(requestedDeviceToken); if (existingDevice?.userId === user.id) { existingDevice.ip = client.ip; existingDevice.userAgent = client.userAgent; nextDeviceToken = existingDevice.token; reusedDeviceSession = true; } } if (!nextDeviceToken) { nextDeviceToken = await deviceSessionStore.create(user.id, client.ip, client.userAgent); } if (client.deviceToken && client.deviceToken !== nextDeviceToken) { await deviceSessionStore.revoke(client.deviceToken); } client.userId = user.id; client.accessToken = accessToken; client.authenticated = true; client.deviceToken = nextDeviceToken; sessionStore.markOnline(user.id, ws); if (clientTempId) client.tempId = clientTempId; const tId = client.tempId; await sessionStore.transferTempToUser(tId, user.id, accessToken); const [sessions, settings, profile, subscription] = await Promise.all([ sessionStore.loadUserSessions(user.id, accessToken), getUserSettings(user.id, accessToken), getUserProfile(user.id, accessToken), getSubscriptionInfo(accessToken), ]); const authOkMsg = { type: 'auth:ok', userId: user.id, email: user.email, deviceToken: client.deviceToken, sessions: sessions.map(ser), settings, profile, subscription }; safeSend(ws, authOkMsg); if (!reusedDeviceSession) { bcast(wsClients, user.id, { type: 'auth:newLogin', message: 'New login on your account.', ip: client.ip, userAgent: client.userAgent, timestamp: new Date().toISOString() }, ws); } }, 'auth:logout': async (ws, msg, client) => { const priorUserId = client.userId; if (client.deviceToken) await deviceSessionStore.revoke(client.deviceToken); Object.assign(client, { userId: null, authenticated: false, accessToken: null, deviceToken: null }); if (priorUserId) sessionStore.markOffline(priorUserId, ws); safeSend(ws, { type: 'auth:loggedOut' }); }, 'auth:guest': async (ws, msg, client) => { const t = msg.tempId || client.tempId; client.tempId = t; sessionStore.initTemp(t); const sessions = await sessionStore.getTempSessions(t); safeSend(ws, { type: 'auth:guestOk', tempId: t, sessions: sessions.map(ser) }); }, 'sessions:list': async (ws, msg, client) => { const list = client.userId ? sessionStore.getUserSessions(client.userId) : await sessionStore.getTempSessions(client.tempId); list.sort((a, b) => b.created - a.created); safeSend(ws, { type: 'sessions:list', sessions: list.map(ser) }); }, 'sessions:create': async (ws, msg, client) => { const s = client.userId ? await sessionStore.createUserSession(client.userId, client.accessToken) : await sessionStore.createTempSession(client.tempId); safeSend(ws, { type: 'sessions:created', session: ser(s) }); }, 'sessions:delete': async (ws, msg, client) => { const owner = getClientOwner(client); const session = client.userId ? sessionStore.getUserSession(client.userId, msg.sessionId) : await sessionStore.getTempSession(client.tempId, msg.sessionId); if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' }); await chatTrashStore.add(owner, JSON.parse(JSON.stringify(session))); if (client.userId) { await sessionStore.deleteUserSession(client.userId, client.accessToken, msg.sessionId); await sessionStore.deleteTempSessionEverywhere(msg.sessionId); } else { await sessionStore.deleteTempSession(client.tempId, msg.sessionId); } safeSend(ws, { type: 'sessions:deleted', sessionId: msg.sessionId }); safeSend(ws, { type: 'trash:chats:changed' }); }, 'sessions:deleteAll': async (ws, msg, client) => { const owner = getClientOwner(client); const sessions = client.userId ? sessionStore.getUserSessions(client.userId) : await sessionStore.getTempSessions(client.tempId); for (const session of sessions) { await chatTrashStore.add(owner, JSON.parse(JSON.stringify(session))); if (client.userId) await sessionStore.deleteTempSessionEverywhere(session.id); } if (client.userId) await sessionStore.deleteAllUserSessions(client.userId, client.accessToken); else await sessionStore.deleteTempAll(client.tempId); safeSend(ws, { type: 'sessions:deletedAll' }); safeSend(ws, { type: 'trash:chats:changed' }); }, 'sessions:rename': async (ws, msg, client) => { const name = (msg.name || '').trim(); if (!name) return; if (client.userId) await sessionStore.updateUserSession(client.userId, client.accessToken, msg.sessionId, { name }); else await sessionStore.updateTempSession(client.tempId, msg.sessionId, { name }); safeSend(ws, { type: 'sessions:renamed', sessionId: msg.sessionId, name }); }, 'sessions:get': async (ws, msg, client) => { const s = client.userId ? sessionStore.getUserSession(client.userId, msg.sessionId) : await sessionStore.getTempSession(client.tempId, msg.sessionId); if (!s) return safeSend(ws, { type: 'error', message: 'Session not found' }); safeSend(ws, { type: 'sessions:data', session: ser(s) }); }, 'sessions:share': async (ws, msg, client) => { if (!client.userId) return safeSend(ws, { type: 'error', message: 'Sign in to share' }); const token = await sessionStore.createShareToken(client.userId, client.accessToken, msg.sessionId); if (!token) return safeSend(ws, { type: 'error', message: 'Share failed' }); safeSend(ws, { type: 'sessions:shareUrl', url: `${PUBLIC_URL}/?share=${token}`, sessionId: msg.sessionId }); }, 'sessions:import': async (ws, msg, client) => { if (!client.userId) return safeSend(ws, { type: 'error', message: 'Sign in to import' }); const s = await sessionStore.importSharedSession(client.userId, client.accessToken, msg.token); if (!s) return safeSend(ws, { type: 'error', message: 'Invalid share link' }); safeSend(ws, { type: 'sessions:imported', session: ser(s) }); }, 'trash:chats:list': async (ws, msg, client) => { const owner = getClientOwner(client); const items = await chatTrashStore.list(owner); safeSend(ws, { type: 'trash:chats:list', items }); }, 'trash:chats:restore': async (ws, msg, client) => { const owner = getClientOwner(client); const restored = await chatTrashStore.restore(owner, msg.ids || []); const sessions = []; for (const snapshot of restored) { const restoredSession = await restoreDeletedSession(client, snapshot); if (restoredSession) sessions.push(ser(restoredSession)); } safeSend(ws, { type: 'trash:chats:restored', sessions }); safeSend(ws, { type: 'trash:chats:changed' }); }, 'trash:chats:deleteForever': async (ws, msg, client) => { const owner = getClientOwner(client); const removedIds = await chatTrashStore.deleteForever(owner, msg.ids || []); safeSend(ws, { type: 'trash:chats:deletedForever', ids: removedIds }); safeSend(ws, { type: 'trash:chats:changed' }); }, 'chat:send': async (ws, msg, client) => { const { sessionId, content, tools } = msg; const owner = getClientOwner(client); if (!client.userId) { const allowed = await consumeGuestRequest(client.ip || 'unknown'); if (!allowed) return safeSend(ws, { type: 'guest:rateLimit', message: 'Guest request limit exceeded' }); if (!await sessionStore.tempCanSend(client.tempId)) return safeSend(ws, { type: 'chat:limitReached' }); await sessionStore.tempBump(client.tempId); } const session = client.userId ? sessionStore.getUserSession(client.userId, sessionId) : await sessionStore.getTempSession(client.tempId, sessionId); if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' }); abortActiveStream(ws); const abort = new AbortController(); activeStreams.set(ws, abort); safeSend(ws, { type: 'chat:start', sessionId }); let fullText = ''; const assetsCollected = [], toolCallsCollected = []; // Extract flat history from tree structure const rootMessage = session.history?.[0]; const flatHistory = rootMessage ? extractFlatHistory(rootMessage) : []; if (Array.isArray(msg.linkedMediaIds) && msg.linkedMediaIds.length) { await mediaStore.attachToSession(owner, msg.linkedMediaIds, sessionId).catch((err) => { console.error('Failed to link uploaded media to session:', err); }); } const usagePayload = await buildUsagePayload(client, msg.clientId || ''); const webSearchLimit = isFreeSearchPlan(client, usagePayload) ? { key: usageOwnerKey(client, msg.clientId || ''), limit: FREE_WEB_SEARCH_LIMIT } : null; await streamChat({ sessionId, model: session.model, history: flatHistory, userMessage: content, tools: tools || {}, accessToken: client.accessToken, clientId: msg.clientId, webSearchLimit, owner, sessionName: session.name, abortSignal: abort.signal, onToken(t) { fullText += t; safeSend(ws, { type: 'chat:token', token: t, sessionId }); }, onToolCall(call) { safeSend(ws, { type: 'chat:toolCall', call, sessionId }); if (call.state === 'resolved' || call.state === 'canceled') toolCallsCollected.push(call); }, onNewAsset(asset) { safeSend(ws, { type: 'chat:asset', asset, sessionId }); assetsCollected.push(asset); safeSend(ws, { type: 'media:changed' }); }, onDraftEdit(edit, draftText) { safeSend(ws, { type: 'chat:draftEdited', edit, text: draftText, sessionId }); }, async onDone(text, toolCalls, aborted, sessionNameFromTag, responseEdits = [], responseSegments = []) { activeStreams.delete(ws); const finalText = text || fullText; // Only create user entry if content was actually provided const hasContent = content !== undefined && content !== null && content !== '' && !(Array.isArray(content) && content.length === 0); const userEntry = hasContent ? buildEntry('user', content) : null; const resolvedMap = new Map(toolCallsCollected.map(c => [c.id, c])); const mergedCalls = (toolCalls || []).map(c => { const resolved = resolvedMap.get(c.id) || {}; return { ...c, state: resolved.state || 'resolved', result: resolved.result }; }); const asstEntry = buildEntry('assistant', finalText, mergedCalls, { responseEdits, responseSegments, }); const mediaEntries = assetsCollected.map((asset) => buildMediaEntry(asset.role, { assetId: asset.id, mimeType: asset.mimeType, name: asset.name, }) ); const generatedFiles = extractAssistantGeneratedFiles(finalText); for (const file of generatedFiles) { await mediaStore.storeBuffer(owner, { name: file.name, mimeType: file.mimeType, buffer: file.buffer, sessionId, source: 'assistant_generated', kind: file.kind, }).catch((err) => console.error('Failed to store generated text asset:', err)); } if (generatedFiles.length) safeSend(ws, { type: 'media:changed' }); // Rebuild tree structure with new messages appended to the active branch leaf. let newRootMessage = rootMessage ? cloneAndRepairTree(rootMessage) : null; if (!newRootMessage) { // First message in session - must have user entry if (!userEntry) return safeSend(ws, { type: 'error', message: 'No content for first message' }); newRootMessage = userEntry; newRootMessage.versions[0].tail = [{ ...asstEntry }, ...mediaEntries]; } else if (userEntry) { appendConversationTurn(newRootMessage, userEntry, asstEntry, mediaEntries); } else { const appendedEntries = [ asstEntry, ...mediaEntries, ]; appendEntriesToActiveLeaf(newRootMessage, appendedEntries); } const newHistory = [newRootMessage]; let newName = session.name; if (sessionNameFromTag) { newName = sessionNameFromTag; } else if (!session.history?.length || session.name === 'New Chat') { newName = session.name; } if (client.userId) await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory, name: newName }); else await sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory, name: newName }); safeSend(ws, { type: aborted ? 'chat:aborted' : 'chat:done', sessionId, name: newName, history: extractFlatHistory(newRootMessage) }); }, onError(err) { activeStreams.delete(ws); console.error('streamChat error:', err); safeSend(ws, { type: 'chat:error', error: String(err), sessionId }); safeSend(ws, { type: 'chat:aborted', sessionId, reason: 'error' }); }, }); }, 'chat:stop': (ws) => { abortActiveStream(ws); }, 'chat:editMessage': async (ws, msg, client) => { const { sessionId, messageIndex, newContent } = msg; const session = client.userId ? sessionStore.getUserSession(client.userId, sessionId) : await sessionStore.getTempSession(client.tempId, sessionId); if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' }); const rootMessage = session.history?.[0]; if (!rootMessage) return safeSend(ws, { type: 'error', message: 'No history' }); const flatHistory = extractFlatHistory(rootMessage); const targetMsg = flatHistory[messageIndex]; if (!targetMsg) { console.error(`chat:editMessage: Message at index ${messageIndex} not found. History length: ${flatHistory.length}`); return safeSend(ws, { type: 'error', message: 'Message not found' }); } // Find the target message in the tree and add new version const newRoot = cloneAndRepairTree(rootMessage); const context = findMessageContext(newRoot, targetMsg.id); if (!context?.message) return safeSend(ws, { type: 'error', message: 'Message not found in tree' }); const found = findAndUpdateMessage(newRoot, targetMsg.id, (msgInTree) => { if (msgInTree.role === 'user' && Array.isArray(context.parentTail) && context.index >= 0) { const trailing = context.parentTail.splice(context.index + 1); if (trailing.length) { const currentVersion = getActiveVersion(msgInTree); currentVersion.tail = [...(currentVersion.tail || []), ...trailing]; } } // Add new version with EMPTY tail (no responses yet for this edited version) msgInTree.versions.push({ content: newContent, tail: [], // New version starts fresh, no tail timestamp: Date.now() }); msgInTree.currentVersionIdx = msgInTree.versions.length - 1; msgInTree.content = newContent; }); if (!found) return; const newHistory = [newRoot]; if (client.userId) { await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory }); } else { await sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory }); } // Send back the updated message with its ID and the full flat history const updatedFlatHistory = extractFlatHistory(newRoot); const updatedTargetMsg = updatedFlatHistory[messageIndex]; if (!updatedTargetMsg) { console.error(`chat:editMessage: Updated message not found at index ${messageIndex}. Updated history length: ${updatedFlatHistory.length}`); return safeSend(ws, { type: 'error', message: 'Failed to apply edit - message lost' }); } safeSend(ws, { type: 'chat:messageEdited', sessionId, messageId: targetMsg.id, messageIndex, message: updatedTargetMsg, history: updatedFlatHistory }); }, 'chat:selectVersion': async (ws, msg, client) => { const { sessionId, messageIndex, versionIdx } = msg; const session = client.userId ? sessionStore.getUserSession(client.userId, sessionId) : await sessionStore.getTempSession(client.tempId, sessionId); if (!session) return; const rootMessage = session.history?.[0]; if (!rootMessage) return; const flatHistory = extractFlatHistory(rootMessage); const targetMsg = flatHistory[messageIndex]; if (!targetMsg || !targetMsg.versions || versionIdx >= targetMsg.versions.length) return; // Find and update the message in tree, switching to specified version const newRoot = cloneAndRepairTree(rootMessage); const found = findAndUpdateMessage(newRoot, targetMsg.id, (msgInTree) => { msgInTree.currentVersionIdx = versionIdx; msgInTree.content = msgInTree.versions[versionIdx].content; // Tail is automatically correct since each version has its own tail }); if (!found) return; const newHistory = [newRoot]; if (client.userId) { await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory }); } else { await sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory }); } // Send back with messageId for clarity safeSend(ws, { type: 'chat:versionSelected', sessionId, messageId: targetMsg.id, messageIndex, history: extractFlatHistory(newRoot) }); }, 'chat:assistantAction': async (ws, msg, client) => { const { sessionId, messageIndex } = msg; const action = msg.action === 'continue' ? 'continue' : 'regenerate'; const session = client.userId ? sessionStore.getUserSession(client.userId, sessionId) : await sessionStore.getTempSession(client.tempId, sessionId); if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' }); const rootMessage = session.history?.[0]; if (!rootMessage) return safeSend(ws, { type: 'error', message: 'No history' }); const flatHistory = extractFlatHistory(rootMessage); const targetMsg = flatHistory[messageIndex]; if (!targetMsg || targetMsg.role !== 'assistant') { return safeSend(ws, { type: 'error', message: 'Assistant message not found' }); } abortActiveStream(ws); const abort = new AbortController(); activeStreams.set(ws, abort); const owner = getClientOwner(client); const historyBeforeTarget = flatHistory.slice(0, messageIndex); const baseAssistantText = stripSessionTagText(targetMsg.content || ''); const actionHistory = action === 'continue' ? [...historyBeforeTarget, { role: 'assistant', content: baseAssistantText }] : historyBeforeTarget; const actionUserMessage = action === 'continue' ? CONTINUE_ASSISTANT_PROMPT : null; safeSend(ws, { type: 'chat:start', sessionId, streamKind: 'assistantAction', action, messageIndex, prefillText: action === 'continue' ? baseAssistantText : '', }); let fullText = ''; const assetsCollected = []; const toolCallsCollected = []; const usagePayload = await buildUsagePayload(client, msg.clientId || ''); const webSearchLimit = isFreeSearchPlan(client, usagePayload) ? { key: usageOwnerKey(client, msg.clientId || ''), limit: FREE_WEB_SEARCH_LIMIT } : null; await streamChat({ sessionId, model: session.model, history: actionHistory, userMessage: actionUserMessage, tools: msg.tools || {}, accessToken: client.accessToken, clientId: msg.clientId, webSearchLimit, owner, sessionName: session.name, abortSignal: abort.signal, onToken(t) { fullText += t; safeSend(ws, { type: 'chat:token', token: t, sessionId }); }, onToolCall(call) { safeSend(ws, { type: 'chat:toolCall', call, sessionId }); if (call.state === 'resolved' || call.state === 'canceled') toolCallsCollected.push(call); }, onNewAsset(asset) { safeSend(ws, { type: 'chat:asset', asset, sessionId }); assetsCollected.push(asset); safeSend(ws, { type: 'media:changed' }); }, onDraftEdit(edit, draftText) { safeSend(ws, { type: 'chat:draftEdited', edit, text: draftText, sessionId }); }, async onDone(text, toolCalls, aborted, sessionNameFromTag, responseEdits = [], responseSegments = []) { activeStreams.delete(ws); if (aborted) { return safeSend(ws, { type: 'chat:aborted', sessionId }); } const rawAssistantText = text || fullText; const resolvedMap = new Map(toolCallsCollected.map((call) => [call.id, call])); const mergedCalls = (toolCalls || []).map((call) => { const resolved = resolvedMap.get(call.id) || {}; return { ...call, state: resolved.state || 'resolved', result: resolved.result }; }); let finalText = rawAssistantText; let finalSegments = responseSegments; if (action === 'continue') { const { continuationText, overlapLength } = stripContinuationOverlap(baseAssistantText, rawAssistantText); finalText = baseAssistantText + continuationText; finalSegments = [ ...(baseAssistantText ? [{ type: 'text', text: baseAssistantText }] : []), ...trimLeadingTextFromSegments(responseSegments, overlapLength), ]; } const mediaEntries = assetsCollected.map((asset) => buildMediaEntry(asset.role, { assetId: asset.id, mimeType: asset.mimeType, name: asset.name, }) ); const generatedFiles = extractAssistantGeneratedFiles(finalText); for (const file of generatedFiles) { await mediaStore.storeBuffer(owner, { name: file.name, mimeType: file.mimeType, buffer: file.buffer, sessionId, source: 'assistant_generated', kind: file.kind, }).catch((err) => console.error('Failed to store generated text asset:', err)); } if (generatedFiles.length) safeSend(ws, { type: 'media:changed' }); const newRoot = cloneAndRepairTree(rootMessage); const found = findAndUpdateMessage(newRoot, targetMsg.id, (msgInTree) => { const nextVersion = buildVersionRecord(finalText, { tail: mediaEntries, toolCalls: mergedCalls, responseEdits, responseSegments: finalSegments, }); applyVersionToMessage(msgInTree, nextVersion); }); if (!found) { return safeSend(ws, { type: 'error', message: 'Assistant branch could not be updated' }); } let newName = session.name; if (sessionNameFromTag) { newName = sessionNameFromTag; } const newHistory = [newRoot]; if (client.userId) { await sessionStore.updateUserSession(client.userId, client.accessToken, sessionId, { history: newHistory, name: newName }); } else { await sessionStore.updateTempSession(client.tempId, sessionId, { history: newHistory, name: newName }); } safeSend(ws, { type: 'chat:done', sessionId, name: newName, history: extractFlatHistory(newRoot) }); }, onError(err) { activeStreams.delete(ws); console.error('assistant action streamChat error:', err); safeSend(ws, { type: 'chat:error', error: String(err), sessionId }); safeSend(ws, { type: 'chat:aborted', sessionId, reason: 'error' }); }, }); }, 'settings:get': async (ws, msg, client) => { const s = client.userId ? await getUserSettings(client.userId, client.accessToken) : { theme: 'dark', webSearch: true, imageGen: true, videoGen: true, audioGen: true }; safeSend(ws, { type: 'settings:data', settings: s }); }, 'settings:save': async (ws, msg, client, wsClients) => { if (!client.userId) return; await saveUserSettings(client.userId, client.accessToken, msg.settings); safeSend(ws, { type: 'settings:saved' }); bcast(wsClients, client.userId, { type: 'settings:updated', settings: msg.settings }, ws); }, 'personalization:get': async (ws, msg, client) => { const defaultPrompt = await systemPromptStore.getDefaultPrompt(); const personalization = client.userId ? await systemPromptStore.getPersonalization(client.userId) : { defaultPrompt, customPrompt: null, resolvedPrompt: defaultPrompt, isCustom: false, updatedAt: null, canEdit: false, }; safeSend(ws, { type: 'personalization:data', personalization }); }, 'personalization:saveSystemPrompt': async (ws, msg, client, wsClients) => { if (!client.userId) return safeSend(ws, { type: 'personalization:error', message: 'Sign in to customize your system prompt' }); try { const personalization = await systemPromptStore.setUserPrompt(client.userId, msg.markdown); safeSend(ws, { type: 'personalization:updated', personalization }); bcast(wsClients, client.userId, { type: 'personalization:updated', personalization }, ws); } catch (err) { safeSend(ws, { type: 'personalization:error', message: err.message || 'Unable to save system prompt' }); } }, 'personalization:resetSystemPrompt': async (ws, msg, client, wsClients) => { if (!client.userId) return safeSend(ws, { type: 'personalization:error', message: 'Sign in to customize your system prompt' }); try { const personalization = await systemPromptStore.resetUserPrompt(client.userId); safeSend(ws, { type: 'personalization:updated', personalization }); bcast(wsClients, client.userId, { type: 'personalization:updated', personalization }, ws); } catch (err) { safeSend(ws, { type: 'personalization:error', message: err.message || 'Unable to reset system prompt' }); } }, 'memories:list': async (ws, msg, client) => { const owner = getClientOwner(client); const items = await memoryStore.list(owner); safeSend(ws, { type: 'memories:list', items }); }, 'memories:create': async (ws, msg, client, wsClients) => { const owner = getClientOwner(client); const memory = await memoryStore.create(owner, { content: msg.content, sessionId: msg.sessionId || null, source: msg.source || 'manual', }); safeSend(ws, { type: 'memories:created', memory }); if (client.userId) bcast(wsClients, client.userId, { type: 'memories:changed' }, ws); }, 'memories:update': async (ws, msg, client, wsClients) => { const owner = getClientOwner(client); const memory = await memoryStore.update(owner, msg.id, msg.content); safeSend(ws, { type: 'memories:updated', memory }); if (client.userId) bcast(wsClients, client.userId, { type: 'memories:changed' }, ws); }, 'memories:delete': async (ws, msg, client, wsClients) => { const owner = getClientOwner(client); const ok = await memoryStore.delete(owner, msg.id); safeSend(ws, { type: 'memories:deleted', id: ok ? msg.id : null }); if (client.userId) bcast(wsClients, client.userId, { type: 'memories:changed' }, ws); }, 'account:getProfile': async (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:profile', profile: await getUserProfile(c.userId, c.accessToken) }); }, 'account:setUsername': async (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:usernameResult', ...await setUsername(c.userId, c.accessToken, msg.username) }); }, 'account:getSubscription': async (ws, msg, c) => { if (!c.userId) return console.warn('[Account] getSubscription called without userId'); const subInfo = await getSubscriptionInfo(c.accessToken); safeSend(ws, { type: 'account:subscription', info: subInfo }); }, 'account:getUsage': async (ws, msg, c) => { safeSend(ws, { type: 'account:usage', usage: await buildUsagePayload(c, msg.clientId || '') }); }, 'account:getTierConfig': async (ws) => { safeSend(ws, { type: 'account:tierConfig', config: await getTierConfig() }); }, 'account:getSessions': async (ws, msg, c) => { if (!c.userId) return; safeSend(ws, { type: 'account:deviceSessions', sessions: await deviceSessionStore.getForUser(c.userId), currentToken: c.deviceToken, }); }, 'account:revokeSession': async (ws, msg, c, wsClients) => { if (!c.userId || !msg.token) return; const revoked = await deviceSessionStore.revoke(msg.token); if (revoked) { const activeSessions = await deviceSessionStore.getForUser(c.userId); for (const [ows, oc] of wsClients) { if (oc.userId !== c.userId) continue; if (oc.deviceToken === msg.token) { safeSend(ows, { type: 'auth:forcedLogout', reason: 'Session revoked by another device' }); sessionStore.markOffline(oc.userId, ows); Object.assign(oc, { userId: null, authenticated: false, accessToken: null, deviceToken: null }); continue; } safeSend(ows, { type: 'account:deviceSessions', sessions: activeSessions, currentToken: oc.deviceToken, }); } } safeSend(ws, { type: 'account:sessionRevoked', token: msg.token }); }, 'account:revokeAllOthers': async (ws, msg, c, wsClients) => { if (!c.userId) return; await deviceSessionStore.revokeAllExcept(c.userId, c.deviceToken); const activeSessions = await deviceSessionStore.getForUser(c.userId); for (const [ows, oc] of wsClients) { if (oc.userId !== c.userId) continue; if (oc.deviceToken && oc.deviceToken !== c.deviceToken) { safeSend(ows, { type: 'auth:forcedLogout', reason: 'Session revoked by another device' }); sessionStore.markOffline(oc.userId, ows); Object.assign(oc, { userId: null, authenticated: false, accessToken: null, deviceToken: null }); continue; } safeSend(ows, { type: 'account:deviceSessions', sessions: activeSessions, currentToken: oc.deviceToken, }); } safeSend(ws, { type: 'account:allOthersRevoked' }); }, }; function ser(s) { return { id: s.id, name: s.name, created: s.created, history: s.history || [], model: s.model }; } function getClientOwner(client) { return client.userId ? { type: 'user', id: client.userId } : { type: 'guest', id: client.tempId }; } async function restoreDeletedSession(client, snapshot) { if (!snapshot) return null; const restored = JSON.parse(JSON.stringify(snapshot)); const existing = client.userId ? sessionStore.getUserSession(client.userId, restored.id) : await sessionStore.getTempSession(client.tempId, restored.id); if (existing) restored.id = crypto.randomUUID(); restored.created = restored.created || Date.now(); if (client.userId) { return sessionStore.restoreUserSession(client.userId, client.accessToken, restored); } return sessionStore.restoreTempSession(client.tempId, restored); } function generateMessageId() { return `msg-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; } function buildEntry(role, content, toolCalls = [], extraFields = {}) { const normalizedCalls = toolCalls.map(c => ({ id: c.id, name: c.name || c.function?.name, args: c.args ?? (c.function?.arguments ? (() => { try { return JSON.parse(c.function.arguments); } catch { return c.function.arguments; } })() : {}), state: c.state || 'resolved', result: c.result, })); const validContent = (content === undefined || content === null) ? '' : content; const versionMeta = {}; const topLevelExtraFields = { ...extraFields }; VERSION_META_FIELDS.forEach((key) => { if (key in topLevelExtraFields) { versionMeta[key] = topLevelExtraFields[key]; delete topLevelExtraFields[key]; } }); return { id: generateMessageId(), role, content: validContent, timestamp: Date.now(), versions: [{ content: validContent, tail: [], timestamp: Date.now(), ...(normalizedCalls.length ? { toolCalls: normalizedCalls } : {}), ...versionMeta, }], currentVersionIdx: 0, ...(normalizedCalls.length ? { toolCalls: normalizedCalls } : {}), ...versionMeta, ...topLevelExtraFields, }; } function buildMediaEntry(role, content) { return { id: generateMessageId(), role, content, timestamp: Date.now(), versions: [{ content, tail: [], timestamp: Date.now() }], currentVersionIdx: 0, }; } function buildVersionRecord(content, extraFields = {}) { const validContent = content === undefined || content === null ? '' : content; const version = { content: validContent, tail: Array.isArray(extraFields.tail) ? extraFields.tail : [], timestamp: Date.now(), }; VERSION_META_FIELDS.forEach((key) => { if (extraFields[key] !== undefined && extraFields[key] !== null) { version[key] = extraFields[key]; } }); return version; } function applyVersionToMessage(message, versionRecord) { if (!message?.versions || !Array.isArray(message.versions)) { message.versions = []; } message.versions.push(versionRecord); message.currentVersionIdx = message.versions.length - 1; syncMessageFromActiveVersion(message); return message; } function stripSessionTagText(content) { return typeof content === 'string' ? content.replace(/[\s\S]*?<\/session_name>/gi, '').trim() : content; } function stripContinuationOverlap(baseText = '', generatedText = '') { const base = String(baseText || ''); const generated = String(generatedText || ''); if (!base) return { continuationText: generated, overlapLength: 0 }; if (!generated) return { continuationText: '', overlapLength: 0 }; if (generated.startsWith(base)) { return { continuationText: generated.slice(base.length), overlapLength: base.length }; } const maxWindow = Math.min(base.length, generated.length, 400); for (let size = maxWindow; size > 0; size--) { if (base.slice(-size) === generated.slice(0, size)) { return { continuationText: generated.slice(size), overlapLength: size }; } } return { continuationText: generated, overlapLength: 0 }; } function trimLeadingTextFromSegments(segments = [], overlapLength = 0) { let remaining = Math.max(0, overlapLength); const trimmedSegments = []; for (const segment of segments || []) { if (!segment || segment.type !== 'text' || remaining <= 0) { trimmedSegments.push(segment); continue; } const text = String(segment.text || ''); if (remaining >= text.length) { remaining -= text.length; continue; } trimmedSegments.push({ ...segment, text: text.slice(remaining), }); remaining = 0; } return trimmedSegments.filter((segment) => segment && (segment.type !== 'text' || segment.text)); } function extractAssistantGeneratedFiles(text) { if (!text || typeof text !== 'string') return []; const files = []; const detailsRe = /
([^<]+?)<\/summary>\s*```(?:\w*)\n([\s\S]*?)\n```\s*<\/details>/g; const svgRe = /```svg\n([\s\S]*?)\n```/g; let match; while ((match = detailsRe.exec(text)) !== null) { const name = String(match[1] || '').trim(); const ext = path.extname(name).toLowerCase(); files.push({ name: name || 'generated-file.txt', mimeType: ext === '.html' || ext === '.htm' ? 'text/html' : 'text/plain', kind: ext === '.html' || ext === '.htm' ? 'rich_text' : 'text', buffer: Buffer.from(match[2], 'utf8'), }); } let svgIndex = 1; while ((match = svgRe.exec(text)) !== null) { files.push({ name: `generated-image-${svgIndex++}.svg`, mimeType: 'image/svg+xml', kind: 'image', buffer: Buffer.from(match[1], 'utf8'), }); } return files; } /** * Validate and repair tree structure after cloning/modification * Ensures all messages and versions have valid content property */ function validateAndRepairTree(rootMessage) { const repair = (msg) => { if (!msg) return; // Ensure message has content if (msg.content === undefined || msg.content === null) { msg.content = ''; } // Ensure versions array and each version's content if (msg.versions && Array.isArray(msg.versions)) { for (const version of msg.versions) { if (version.content === undefined || version.content === null) { version.content = ''; } // Recursively repair tail messages if (version.tail && Array.isArray(version.tail)) { for (const tailMsg of version.tail) { repair(tailMsg); } } } } }; repair(rootMessage); return rootMessage; } function cloneAndRepairTree(rootMessage) { return validateAndRepairTree(JSON.parse(JSON.stringify(rootMessage))); } function getActiveVersion(message) { if (!message) return null; const versions = Array.isArray(message.versions) ? message.versions : []; if (!versions.length) { message.versions = [{ content: message.content ?? '', tail: [], timestamp: Date.now() }]; message.currentVersionIdx = 0; return message.versions[0]; } const currentVersionIdx = Number.isInteger(message.currentVersionIdx) ? Math.max(0, Math.min(message.currentVersionIdx, versions.length - 1)) : 0; message.currentVersionIdx = currentVersionIdx; if (!Array.isArray(message.versions[currentVersionIdx].tail)) { message.versions[currentVersionIdx].tail = []; } if (message.versions[currentVersionIdx].content === undefined || message.versions[currentVersionIdx].content === null) { message.versions[currentVersionIdx].content = message.content ?? ''; } return message.versions[currentVersionIdx]; } function cloneVersionMetaValue(value) { if (value === undefined) return undefined; return JSON.parse(JSON.stringify(value)); } function syncMessageFromActiveVersion(message) { if (!message) return message; const currentVersion = getActiveVersion(message); if (!currentVersion) return message; message.content = currentVersion.content ?? message.content ?? ''; VERSION_META_FIELDS.forEach((key) => { if (key in currentVersion) { message[key] = cloneVersionMetaValue(currentVersion[key]); } else { delete message[key]; } }); return message; } function getActiveLeafMessage(rootMessage) { let current = rootMessage; while (current) { const currentVersion = getActiveVersion(current); const tail = Array.isArray(currentVersion?.tail) ? currentVersion.tail : []; if (!tail.length) return current; current = tail[tail.length - 1]; } return rootMessage; } function appendEntriesToActiveLeaf(rootMessage, entries = []) { if (!rootMessage || !entries.length) return rootMessage; const leaf = getActiveLeafMessage(rootMessage); const currentVersion = getActiveVersion(leaf); currentVersion.tail = [...(currentVersion.tail || []), ...entries]; return rootMessage; } function appendConversationTurn(rootMessage, userEntry, assistantEntry, mediaEntries = []) { if (!rootMessage || !userEntry) return rootMessage; const leaf = getActiveLeafMessage(rootMessage); const currentVersion = getActiveVersion(leaf); const userVersion = getActiveVersion(userEntry); userVersion.tail = [ ...(assistantEntry ? [assistantEntry] : []), ...(Array.isArray(mediaEntries) ? mediaEntries : []), ]; syncMessageFromActiveVersion(userEntry); currentVersion.tail = [...(currentVersion.tail || []), userEntry]; return rootMessage; } function extractFlatHistory(rootMessage) { if (!rootMessage) return []; // Helper to ensure message has valid content const ensureValidContent = (msg) => { if (msg.content === undefined || msg.content === null) { msg.content = ''; } return syncMessageFromActiveVersion(msg); }; const history = [ensureValidContent(rootMessage)]; const currentVerIdx = rootMessage.currentVersionIdx ?? 0; if (!Array.isArray(rootMessage.versions)) { console.warn(`extractFlatHistory: Root message ${rootMessage.id} missing versions array`); return history; } if (currentVerIdx >= rootMessage.versions.length) { console.warn(`extractFlatHistory: Root message currentVersionIdx ${currentVerIdx} out of bounds (${rootMessage.versions.length} versions)`); return history; } const currentTail = rootMessage.versions[currentVerIdx]?.tail; if (currentTail && Array.isArray(currentTail)) { const walkTail = (tail) => { for (let i = 0; i < tail.length; i++) { const msg = tail[i]; history.push(ensureValidContent(msg)); const ver = msg.versions?.[msg.currentVersionIdx ?? 0]; if (ver?.tail && Array.isArray(ver.tail)) { walkTail(ver.tail); } if (msg.role === 'user' && Array.isArray(msg.versions) && msg.versions.length > 1) { break; } } }; walkTail(currentTail); } return history; } function findMessageContext(rootMessage, targetId) { if (!rootMessage) return null; if (rootMessage.id === targetId) { return { message: rootMessage, parent: null, parentTail: null, index: -1 }; } const search = (msg) => { const verIdx = msg.currentVersionIdx ?? 0; const tail = msg.versions?.[verIdx]?.tail; if (!tail || !Array.isArray(tail)) return null; for (let i = 0; i < tail.length; i++) { const child = tail[i]; if (child.id === targetId) { return { message: child, parent: msg, parentTail: tail, index: i }; } const nested = search(child); if (nested) return nested; } return null; }; return search(rootMessage); } function findAndUpdateMessage(rootMessage, targetId, updateFn) { if (rootMessage.id === targetId) { updateFn(rootMessage); return true; } const search = (msg) => { const verIdx = msg.currentVersionIdx ?? 0; const tail = msg.versions?.[verIdx]?.tail; if (!tail || !Array.isArray(tail)) return false; for (const child of tail) { if (child.id === targetId) { updateFn(child); return true; } if (search(child)) return true; } return false; }; return search(rootMessage); }