import { createClient } from '@supabase/supabase-js'; import crypto from 'crypto'; import { saveEncryptedJson, loadEncryptedJson } from './cryptoUtils.js'; import path from 'path'; import { isPostgresStorageMode } from './dataPaths.js'; import { decryptJsonPayload, encryptJsonPayload, makeLookupToken, makeOwnerLookup, pgQuery, } from './postgres.js'; let _SUPABASE_URL, _SUPABASE_ANON_KEY; export function initStoreConfig(url, key) { _SUPABASE_URL = url; _SUPABASE_ANON_KEY = key; } const TEMP_TTL_MS = 24 * 60 * 60 * 1000; const TEMP_INACTIVITY = 12 * 60 * 60 * 1000; const TEMP_MSG_LIMIT = 10; const userCache = new Map(); const tempStore = new Map(); const devSessions = new Map(); const loadedTempIds = new Set(); const loadedUserIds = new Set(); const TEMP_STORE_FILE = '/data/temp_sessions.json'; function nowIso() { return new Date().toISOString(); } function tempOwner(tempId) { return { type: 'guest', id: tempId }; } function userOwner(userId) { return { type: 'user', id: userId }; } function guestStateLookup(tempId) { return makeLookupToken('guest-state', tempId); } function guestStateAad(tempId) { return `guest-state:${tempId}`; } function sessionAad(scopeType, sessionId) { return `chat-session:${scopeType}:${sessionId}`; } function shareTokenLookup(token) { return makeLookupToken('session-share-token', token); } function shareAad(recordId) { return `session-share:${recordId}`; } function deviceTokenLookup(token) { return makeLookupToken('device-session-token', token); } function deviceSessionAad(tokenLookup) { return `device-session:${tokenLookup}`; } function guestExpiryRecord(tempData) { const createdExpires = (tempData.created || Date.now()) + TEMP_TTL_MS; const inactiveExpires = (tempData.lastActive || Date.now()) + TEMP_INACTIVITY; return new Date(Math.min(createdExpires, inactiveExpires)).toISOString(); } function ensureTempRecord(tempId) { if (!tempStore.has(tempId)) { tempStore.set(tempId, { sessions: new Map(), msgCount: 0, created: Date.now(), lastActive: Date.now(), }); } return tempStore.get(tempId); } async function loadTempStore() { if (isPostgresStorageMode()) return; const data = await loadEncryptedJson(TEMP_STORE_FILE); if (data) { for (const [id, d] of Object.entries(data)) { tempStore.set(id, { sessions: new Map(Object.entries(d.sessions || {})), msgCount: d.msgCount || 0, created: d.created || Date.now(), lastActive: d.lastActive || Date.now(), }); loadedTempIds.add(id); } } } async function saveTempStore() { if (isPostgresStorageMode()) return; const data = {}; for (const [id, d] of tempStore) { data[id] = { sessions: Object.fromEntries(d.sessions), msgCount: d.msgCount, created: d.created, lastActive: d.lastActive, }; } await saveEncryptedJson(TEMP_STORE_FILE, data); } async function ensureSqlTempLoaded(tempId) { if (!isPostgresStorageMode() || loadedTempIds.has(tempId)) return; const owner = tempOwner(tempId); const lookup = makeOwnerLookup(owner); const [guestStateResult, sessionResult] = await Promise.all([ pgQuery('SELECT payload FROM guest_state WHERE owner_lookup = $1', [guestStateLookup(tempId)]), pgQuery( 'SELECT id, payload FROM chat_sessions WHERE owner_lookup = $1 AND scope_type = $2 ORDER BY updated_at DESC', [lookup, 'guest'] ), ]); const base = ensureTempRecord(tempId); const guestState = guestStateResult.rows[0] ? decryptJsonPayload(guestStateResult.rows[0].payload, guestStateAad(tempId)) : null; base.msgCount = Number(guestState?.msgCount) || base.msgCount || 0; base.created = Number(guestState?.created) || base.created || Date.now(); base.lastActive = Number(guestState?.lastActive) || base.lastActive || Date.now(); base.sessions = new Map( sessionResult.rows .map((row) => decryptJsonPayload(row.payload, sessionAad('guest', row.id))) .filter((session) => session?.id) .map((session) => [session.id, session]) ); loadedTempIds.add(tempId); } async function persistSqlTempState(tempId) { const data = tempStore.get(tempId); if (!data) return; await pgQuery( `INSERT INTO guest_state (owner_lookup, expires_at, updated_at, payload) VALUES ($1, $2, $3, $4::jsonb) ON CONFLICT (owner_lookup) DO UPDATE SET expires_at = EXCLUDED.expires_at, updated_at = EXCLUDED.updated_at, payload = EXCLUDED.payload`, [ guestStateLookup(tempId), guestExpiryRecord(data), nowIso(), JSON.stringify(encryptJsonPayload({ tempId, msgCount: data.msgCount, created: data.created, lastActive: data.lastActive, }, guestStateAad(tempId))), ] ); } async function persistSqlSession(owner, scopeType, session, expiresAt = null) { await pgQuery( `INSERT INTO chat_sessions (id, scope_type, owner_lookup, created_at, updated_at, expires_at, payload) VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb) ON CONFLICT (id) DO UPDATE SET scope_type = EXCLUDED.scope_type, owner_lookup = EXCLUDED.owner_lookup, created_at = EXCLUDED.created_at, updated_at = EXCLUDED.updated_at, expires_at = EXCLUDED.expires_at, payload = EXCLUDED.payload`, [ session.id, scopeType, makeOwnerLookup(owner), new Date(session.created).toISOString(), nowIso(), expiresAt, JSON.stringify(encryptJsonPayload(session, sessionAad(scopeType, session.id))), ] ); } async function loadUserSessionsSql(userId) { const user = sessionStore._ensureUser(userId); const { rows } = await pgQuery( 'SELECT id, payload FROM chat_sessions WHERE owner_lookup = $1 AND scope_type = $2 ORDER BY updated_at DESC', [makeOwnerLookup(userOwner(userId)), 'user'] ); user.sessions.clear(); for (const row of rows) { const session = decryptJsonPayload(row.payload, sessionAad('user', row.id)); if (session?.id) user.sessions.set(session.id, session); } loadedUserIds.add(userId); return [...user.sessions.values()]; } async function ensureUserLoaded(userId) { if (!isPostgresStorageMode() || loadedUserIds.has(userId)) return; await loadUserSessionsSql(userId); } function userClient(accessToken) { return createClient(_SUPABASE_URL, _SUPABASE_ANON_KEY, { global: { headers: { Authorization: `Bearer ${accessToken}` } }, auth: { persistSession: false }, }); } loadTempStore().catch((err) => console.error('Failed to load temp store:', err)); setInterval(async () => { const now = Date.now(); const expiredTempIds = []; for (const [id, d] of tempStore) { if (now - d.created > TEMP_TTL_MS || now - d.lastActive > TEMP_INACTIVITY) { tempStore.delete(id); expiredTempIds.push(id); } } if (isPostgresStorageMode()) { try { for (const tempId of expiredTempIds) { loadedTempIds.delete(tempId); await pgQuery('DELETE FROM guest_state WHERE owner_lookup = $1', [guestStateLookup(tempId)]); } await pgQuery( `DELETE FROM chat_sessions WHERE scope_type = 'guest' AND expires_at IS NOT NULL AND expires_at <= $1`, [nowIso()] ); await pgQuery( `DELETE FROM guest_state WHERE expires_at IS NOT NULL AND expires_at <= $1`, [nowIso()] ); } catch (err) { console.error('Failed to prune SQL temp store:', err); } return; } await saveTempStore().catch((err) => console.error('Failed to save temp store:', err)); }, 30 * 60 * 1000); export const sessionStore = { initTemp(t) { return ensureTempRecord(t); }, async tempCanSend(t) { if (isPostgresStorageMode()) await ensureSqlTempLoaded(t); const d = tempStore.get(t); return d ? d.msgCount < TEMP_MSG_LIMIT : false; }, async tempBump(t) { if (isPostgresStorageMode()) await ensureSqlTempLoaded(t); const d = ensureTempRecord(t); d.msgCount += 1; d.lastActive = Date.now(); if (isPostgresStorageMode()) { await persistSqlTempState(t); } else { saveTempStore().catch((err) => console.error('Failed to save temp store:', err)); } }, async getTempSessions(t) { if (isPostgresStorageMode()) await ensureSqlTempLoaded(t); return [...(tempStore.get(t)?.sessions.values() || [])]; }, async getTempSession(t, id) { if (isPostgresStorageMode()) await ensureSqlTempLoaded(t); return tempStore.get(t)?.sessions.get(id) || null; }, async createTempSession(t) { if (isPostgresStorageMode()) await ensureSqlTempLoaded(t); const d = ensureTempRecord(t); const s = { id: crypto.randomUUID(), name: 'New Chat', created: Date.now(), history: [] }; d.sessions.set(s.id, s); d.lastActive = Date.now(); if (isPostgresStorageMode()) { await persistSqlSession(tempOwner(t), 'guest', s, guestExpiryRecord(d)); await persistSqlTempState(t); } else { saveTempStore().catch((err) => console.error('Failed to save temp store:', err)); } return s; }, async updateTempSession(t, id, patch) { if (isPostgresStorageMode()) await ensureSqlTempLoaded(t); const d = tempStore.get(t); if (!d) return null; const s = d.sessions.get(id); if (!s) return null; Object.assign(s, patch); d.lastActive = Date.now(); if (isPostgresStorageMode()) { await persistSqlSession(tempOwner(t), 'guest', s, guestExpiryRecord(d)); await persistSqlTempState(t); } else { saveTempStore().catch((err) => console.error('Failed to save temp store:', err)); } return s; }, async restoreTempSession(t, session) { if (isPostgresStorageMode()) await ensureSqlTempLoaded(t); const d = ensureTempRecord(t); const restored = JSON.parse(JSON.stringify(session)); d.sessions.set(restored.id, restored); d.lastActive = Date.now(); if (isPostgresStorageMode()) { await persistSqlSession(tempOwner(t), 'guest', restored, guestExpiryRecord(d)); await persistSqlTempState(t); } else { saveTempStore().catch((err) => console.error('Failed to save temp store:', err)); } return restored; }, async deleteTempSession(t, id) { if (isPostgresStorageMode()) await ensureSqlTempLoaded(t); tempStore.get(t)?.sessions.delete(id); if (isPostgresStorageMode()) { await pgQuery( 'DELETE FROM chat_sessions WHERE id = $1 AND scope_type = $2 AND owner_lookup = $3', [id, 'guest', makeOwnerLookup(tempOwner(t))] ); await persistSqlTempState(t); } else { saveTempStore().catch((err) => console.error('Failed to save temp store:', err)); } }, async deleteTempAll(t) { if (isPostgresStorageMode()) await ensureSqlTempLoaded(t); tempStore.get(t)?.sessions.clear(); if (isPostgresStorageMode()) { await pgQuery( 'DELETE FROM chat_sessions WHERE scope_type = $1 AND owner_lookup = $2', ['guest', makeOwnerLookup(tempOwner(t))] ); await persistSqlTempState(t); } else { saveTempStore().catch((err) => console.error('Failed to save temp store:', err)); } }, async deleteTempSessionEverywhere(id) { let changed = false; for (const temp of tempStore.values()) { if (temp.sessions.delete(id)) changed = true; } if (isPostgresStorageMode()) { const result = await pgQuery( 'DELETE FROM chat_sessions WHERE id = $1 AND scope_type = $2', [id, 'guest'] ); return changed || result.rowCount > 0; } if (changed) saveTempStore().catch((err) => console.error('Failed to save temp store:', err)); return changed; }, async transferTempToUser(tempId, userId, accessToken) { if (isPostgresStorageMode()) await ensureSqlTempLoaded(tempId); const d = tempStore.get(tempId); if (!d || !d.sessions.size) return; const user = this._ensureUser(userId); if (isPostgresStorageMode()) await ensureUserLoaded(userId); const uc = isPostgresStorageMode() ? null : userClient(accessToken); for (const s of d.sessions.values()) { if (!s.history || s.history.length === 0) continue; if (user.sessions.has(s.id)) continue; const copy = JSON.parse(JSON.stringify(s)); user.sessions.set(copy.id, copy); if (isPostgresStorageMode()) { await persistSqlSession(userOwner(userId), 'user', copy, null); } else { await this._persist(uc, userId, copy).catch((err) => console.error('transferTempToUser persist error:', err.message)); } } }, _ensureUser(uid) { if (!userCache.has(uid)) userCache.set(uid, { sessions: new Map(), online: new Set() }); return userCache.get(uid); }, async loadUserSessions(userId, accessToken) { if (isPostgresStorageMode()) return loadUserSessionsSql(userId); const uc = userClient(accessToken); const { data, error } = await uc.from('web_sessions').select('*') .eq('user_id', userId).order('updated_at', { ascending: false }); if (error) { console.error('loadUserSessions', error.message); return []; } const user = this._ensureUser(userId); for (const row of data || []) { user.sessions.set(row.id, { id: row.id, name: row.name, created: new Date(row.created_at).getTime(), history: row.history || [], model: row.model, }); } return [...user.sessions.values()]; }, getUserSessions(uid) { return [...(userCache.get(uid)?.sessions.values() || [])]; }, getUserSession(uid, id) { return userCache.get(uid)?.sessions.get(id) || null; }, async createUserSession(userId, accessToken) { if (isPostgresStorageMode()) await ensureUserLoaded(userId); const s = { id: crypto.randomUUID(), name: 'New Chat', created: Date.now(), history: [] }; this._ensureUser(userId).sessions.set(s.id, s); if (isPostgresStorageMode()) { await persistSqlSession(userOwner(userId), 'user', s, null); } else { await this._persist(userClient(accessToken), userId, s).catch(() => {}); } return s; }, async restoreUserSession(userId, accessToken, session) { if (isPostgresStorageMode()) await ensureUserLoaded(userId); const restored = JSON.parse(JSON.stringify(session)); this._ensureUser(userId).sessions.set(restored.id, restored); if (isPostgresStorageMode()) { await persistSqlSession(userOwner(userId), 'user', restored, null); } else { await this._persist(userClient(accessToken), userId, restored).catch(() => {}); } return restored; }, async updateUserSession(userId, accessToken, sessionId, patch) { if (isPostgresStorageMode()) await ensureUserLoaded(userId); const user = userCache.get(userId); if (!user) { console.error('No user for ' + userId); return null; } const s = user.sessions.get(sessionId); if (!s) { console.error('No session found for ' + sessionId); return null; } Object.assign(s, patch); if (isPostgresStorageMode()) { await persistSqlSession(userOwner(userId), 'user', s, null); } else { await this._persist(userClient(accessToken), userId, s).catch(() => {}); } return s; }, async deleteUserSession(userId, accessToken, id) { try { userCache.get(userId)?.sessions.delete(id); if (isPostgresStorageMode()) { await pgQuery( 'DELETE FROM chat_sessions WHERE id = $1 AND scope_type = $2 AND owner_lookup = $3', [id, 'user', makeOwnerLookup(userOwner(userId))] ); return; } const { error } = await userClient(accessToken) .from('web_sessions') .delete() .eq('id', id) .eq('user_id', userId); if (error) console.error('Supabase delete error:', error.message); } catch (ex) { console.error('Unexpected deleteUserSession error:', ex); } }, async deleteAllUserSessions(userId, accessToken) { const u = userCache.get(userId); if (u) { u.sessions.clear(); } else { console.error('No user for ' + userId); return null; } try { if (isPostgresStorageMode()) { await pgQuery( 'DELETE FROM chat_sessions WHERE scope_type = $1 AND owner_lookup = $2', ['user', makeOwnerLookup(userOwner(userId))] ); return; } const { error } = await userClient(accessToken) .from('web_sessions') .delete() .eq('user_id', userId); if (error) console.error('Supabase bulk delete error:', error.message); } catch (ex) { console.error('Unexpected deleteAllUserSessions error:', ex); } }, async _persist(uc, userId, s) { await uc.from('web_sessions').upsert({ id: s.id, user_id: userId, name: s.name, history: s.history || [], model: s.model || null, updated_at: new Date().toISOString(), created_at: new Date(s.created).toISOString(), }); }, markOnline(uid, ws) { this._ensureUser(uid).online.add(ws); }, markOffline(uid, ws) { userCache.get(uid)?.online.delete(ws); }, async createShareToken(userId, accessToken, sessionId) { const s = this.getUserSession(userId, sessionId); if (!s) return null; const token = crypto.randomBytes(24).toString('base64url'); if (isPostgresStorageMode()) { const record = { id: crypto.randomUUID(), ownerId: userId, sessionSnapshot: s, createdAt: nowIso(), }; await pgQuery( `INSERT INTO session_shares (id, token_lookup, owner_lookup, created_at, payload) VALUES ($1, $2, $3, $4, $5::jsonb)`, [ record.id, shareTokenLookup(token), makeOwnerLookup(userOwner(userId)), record.createdAt, JSON.stringify(encryptJsonPayload(record, shareAad(record.id))), ] ); return token; } const uc = userClient(accessToken); const { error } = await uc.from('shared_sessions').insert({ token, owner_id: userId, session_snapshot: s, created_at: new Date().toISOString(), }); return error ? null : token; }, async resolveShareToken(token) { if (isPostgresStorageMode()) { const { rows } = await pgQuery( 'SELECT id, payload FROM session_shares WHERE token_lookup = $1', [shareTokenLookup(token)] ); const record = rows[0] ? decryptJsonPayload(rows[0].payload, shareAad(rows[0].id)) : null; return record ? { token, owner_id: record.ownerId, session_snapshot: record.sessionSnapshot, created_at: record.createdAt, } : null; } const uc = createClient(_SUPABASE_URL, _SUPABASE_ANON_KEY, { auth: { persistSession: false } }); const { data } = await uc.from('shared_sessions').select('*').eq('token', token).single(); return data || null; }, async importSharedSession(userId, accessToken, token) { const shared = await this.resolveShareToken(token); if (!shared) return null; const snap = shared.session_snapshot; const newSession = { ...snap, id: crypto.randomUUID(), name: `${snap.name} (shared)`, created: Date.now(), }; if (isPostgresStorageMode()) await ensureUserLoaded(userId); this._ensureUser(userId).sessions.set(newSession.id, newSession); if (isPostgresStorageMode()) { await persistSqlSession(userOwner(userId), 'user', newSession, null); } else { const uc = userClient(accessToken); await this._persist(uc, userId, newSession).catch(() => {}); } return newSession; }, }; async function upsertSqlDeviceSession(session) { const lookup = deviceTokenLookup(session.token); await pgQuery( `INSERT INTO device_sessions (token_lookup, user_lookup, active, created_at, last_seen_at, payload) VALUES ($1, $2, $3, $4, $5, $6::jsonb) ON CONFLICT (token_lookup) DO UPDATE SET user_lookup = EXCLUDED.user_lookup, active = EXCLUDED.active, created_at = EXCLUDED.created_at, last_seen_at = EXCLUDED.last_seen_at, payload = EXCLUDED.payload`, [ lookup, makeOwnerLookup(userOwner(session.userId)), !!session.active, session.createdAt, session.lastSeen, JSON.stringify(encryptJsonPayload(session, deviceSessionAad(lookup))), ] ); } async function loadSqlDeviceSession(token) { const lookup = deviceTokenLookup(token); const { rows } = await pgQuery( 'SELECT payload FROM device_sessions WHERE token_lookup = $1', [lookup] ); return rows[0] ? decryptJsonPayload(rows[0].payload, deviceSessionAad(lookup)) : null; } export const deviceSessionStore = { async create(userId, ip, userAgent) { const token = crypto.randomBytes(32).toString('hex'); const session = { token, userId, ip, userAgent, createdAt: nowIso(), lastSeen: nowIso(), active: true, }; devSessions.set(token, session); if (isPostgresStorageMode()) { await upsertSqlDeviceSession(session); } return token; }, async getForUser(uid) { if (isPostgresStorageMode()) { const { rows } = await pgQuery( 'SELECT token_lookup, payload FROM device_sessions WHERE user_lookup = $1 AND active = TRUE ORDER BY last_seen_at DESC', [makeOwnerLookup(userOwner(uid))] ); const sessions = rows .map((row) => decryptJsonPayload(row.payload, deviceSessionAad(row.token_lookup))) .filter((session) => session?.userId === uid && session.active); for (const session of sessions) devSessions.set(session.token, session); return sessions; } return [...devSessions.values()].filter((s) => s.userId === uid && s.active); }, async revoke(token) { let session = devSessions.get(token) || null; if (isPostgresStorageMode() && !session) { session = await loadSqlDeviceSession(token); } if (!session) return null; session.active = false; devSessions.set(token, session); if (isPostgresStorageMode()) { await upsertSqlDeviceSession(session); } return session; }, async revokeAllExcept(uid, except) { if (isPostgresStorageMode()) { const sessions = await this.getForUser(uid); for (const session of sessions) { if (session.token === except) continue; session.active = false; devSessions.set(session.token, session); await upsertSqlDeviceSession(session); } return; } for (const [t, s] of devSessions) { if (s.userId === uid && t !== except) s.active = false; } }, async validate(token) { let session = devSessions.get(token) || null; if (isPostgresStorageMode() && !session) { session = await loadSqlDeviceSession(token); } if (!session || !session.active) return null; session.lastSeen = nowIso(); devSessions.set(token, session); if (isPostgresStorageMode()) { await upsertSqlDeviceSession(session); } return session; }, };