chat-dev / server /sessionStore.js
incognitolm
Migration to PostgreSQL
bff1056
import { createClient } from '@supabase/supabase-js';
import crypto from 'crypto';
import { saveEncryptedJson, loadEncryptedJson } from './cryptoUtils.js';
import path from 'path';
import { isPostgresStorageMode } from './dataPaths.js';
import {
decryptJsonPayload,
encryptJsonPayload,
makeLookupToken,
makeOwnerLookup,
pgQuery,
} from './postgres.js';
let _SUPABASE_URL, _SUPABASE_ANON_KEY;
export function initStoreConfig(url, key) { _SUPABASE_URL = url; _SUPABASE_ANON_KEY = key; }
const TEMP_TTL_MS = 24 * 60 * 60 * 1000;
const TEMP_INACTIVITY = 12 * 60 * 60 * 1000;
const TEMP_MSG_LIMIT = 10;
const userCache = new Map();
const tempStore = new Map();
const devSessions = new Map();
const loadedTempIds = new Set();
const loadedUserIds = new Set();
const TEMP_STORE_FILE = '/data/temp_sessions.json';
function nowIso() {
return new Date().toISOString();
}
function tempOwner(tempId) {
return { type: 'guest', id: tempId };
}
function userOwner(userId) {
return { type: 'user', id: userId };
}
function guestStateLookup(tempId) {
return makeLookupToken('guest-state', tempId);
}
function guestStateAad(tempId) {
return `guest-state:${tempId}`;
}
function sessionAad(scopeType, sessionId) {
return `chat-session:${scopeType}:${sessionId}`;
}
function shareTokenLookup(token) {
return makeLookupToken('session-share-token', token);
}
function shareAad(recordId) {
return `session-share:${recordId}`;
}
function deviceTokenLookup(token) {
return makeLookupToken('device-session-token', token);
}
function deviceSessionAad(tokenLookup) {
return `device-session:${tokenLookup}`;
}
function guestExpiryRecord(tempData) {
const createdExpires = (tempData.created || Date.now()) + TEMP_TTL_MS;
const inactiveExpires = (tempData.lastActive || Date.now()) + TEMP_INACTIVITY;
return new Date(Math.min(createdExpires, inactiveExpires)).toISOString();
}
function ensureTempRecord(tempId) {
if (!tempStore.has(tempId)) {
tempStore.set(tempId, {
sessions: new Map(),
msgCount: 0,
created: Date.now(),
lastActive: Date.now(),
});
}
return tempStore.get(tempId);
}
async function loadTempStore() {
if (isPostgresStorageMode()) return;
const data = await loadEncryptedJson(TEMP_STORE_FILE);
if (data) {
for (const [id, d] of Object.entries(data)) {
tempStore.set(id, {
sessions: new Map(Object.entries(d.sessions || {})),
msgCount: d.msgCount || 0,
created: d.created || Date.now(),
lastActive: d.lastActive || Date.now(),
});
loadedTempIds.add(id);
}
}
}
async function saveTempStore() {
if (isPostgresStorageMode()) return;
const data = {};
for (const [id, d] of tempStore) {
data[id] = {
sessions: Object.fromEntries(d.sessions),
msgCount: d.msgCount,
created: d.created,
lastActive: d.lastActive,
};
}
await saveEncryptedJson(TEMP_STORE_FILE, data);
}
async function ensureSqlTempLoaded(tempId) {
if (!isPostgresStorageMode() || loadedTempIds.has(tempId)) return;
const owner = tempOwner(tempId);
const lookup = makeOwnerLookup(owner);
const [guestStateResult, sessionResult] = await Promise.all([
pgQuery('SELECT payload FROM guest_state WHERE owner_lookup = $1', [guestStateLookup(tempId)]),
pgQuery(
'SELECT id, payload FROM chat_sessions WHERE owner_lookup = $1 AND scope_type = $2 ORDER BY updated_at DESC',
[lookup, 'guest']
),
]);
const base = ensureTempRecord(tempId);
const guestState = guestStateResult.rows[0]
? decryptJsonPayload(guestStateResult.rows[0].payload, guestStateAad(tempId))
: null;
base.msgCount = Number(guestState?.msgCount) || base.msgCount || 0;
base.created = Number(guestState?.created) || base.created || Date.now();
base.lastActive = Number(guestState?.lastActive) || base.lastActive || Date.now();
base.sessions = new Map(
sessionResult.rows
.map((row) => decryptJsonPayload(row.payload, sessionAad('guest', row.id)))
.filter((session) => session?.id)
.map((session) => [session.id, session])
);
loadedTempIds.add(tempId);
}
async function persistSqlTempState(tempId) {
const data = tempStore.get(tempId);
if (!data) return;
await pgQuery(
`INSERT INTO guest_state (owner_lookup, expires_at, updated_at, payload)
VALUES ($1, $2, $3, $4::jsonb)
ON CONFLICT (owner_lookup)
DO UPDATE SET expires_at = EXCLUDED.expires_at, updated_at = EXCLUDED.updated_at, payload = EXCLUDED.payload`,
[
guestStateLookup(tempId),
guestExpiryRecord(data),
nowIso(),
JSON.stringify(encryptJsonPayload({
tempId,
msgCount: data.msgCount,
created: data.created,
lastActive: data.lastActive,
}, guestStateAad(tempId))),
]
);
}
async function persistSqlSession(owner, scopeType, session, expiresAt = null) {
await pgQuery(
`INSERT INTO chat_sessions (id, scope_type, owner_lookup, created_at, updated_at, expires_at, payload)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb)
ON CONFLICT (id)
DO UPDATE SET
scope_type = EXCLUDED.scope_type,
owner_lookup = EXCLUDED.owner_lookup,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
expires_at = EXCLUDED.expires_at,
payload = EXCLUDED.payload`,
[
session.id,
scopeType,
makeOwnerLookup(owner),
new Date(session.created).toISOString(),
nowIso(),
expiresAt,
JSON.stringify(encryptJsonPayload(session, sessionAad(scopeType, session.id))),
]
);
}
async function loadUserSessionsSql(userId) {
const user = sessionStore._ensureUser(userId);
const { rows } = await pgQuery(
'SELECT id, payload FROM chat_sessions WHERE owner_lookup = $1 AND scope_type = $2 ORDER BY updated_at DESC',
[makeOwnerLookup(userOwner(userId)), 'user']
);
user.sessions.clear();
for (const row of rows) {
const session = decryptJsonPayload(row.payload, sessionAad('user', row.id));
if (session?.id) user.sessions.set(session.id, session);
}
loadedUserIds.add(userId);
return [...user.sessions.values()];
}
async function ensureUserLoaded(userId) {
if (!isPostgresStorageMode() || loadedUserIds.has(userId)) return;
await loadUserSessionsSql(userId);
}
function userClient(accessToken) {
return createClient(_SUPABASE_URL, _SUPABASE_ANON_KEY, {
global: { headers: { Authorization: `Bearer ${accessToken}` } },
auth: { persistSession: false },
});
}
loadTempStore().catch((err) => console.error('Failed to load temp store:', err));
setInterval(async () => {
const now = Date.now();
const expiredTempIds = [];
for (const [id, d] of tempStore) {
if (now - d.created > TEMP_TTL_MS || now - d.lastActive > TEMP_INACTIVITY) {
tempStore.delete(id);
expiredTempIds.push(id);
}
}
if (isPostgresStorageMode()) {
try {
for (const tempId of expiredTempIds) {
loadedTempIds.delete(tempId);
await pgQuery('DELETE FROM guest_state WHERE owner_lookup = $1', [guestStateLookup(tempId)]);
}
await pgQuery(
`DELETE FROM chat_sessions
WHERE scope_type = 'guest' AND expires_at IS NOT NULL AND expires_at <= $1`,
[nowIso()]
);
await pgQuery(
`DELETE FROM guest_state
WHERE expires_at IS NOT NULL AND expires_at <= $1`,
[nowIso()]
);
} catch (err) {
console.error('Failed to prune SQL temp store:', err);
}
return;
}
await saveTempStore().catch((err) => console.error('Failed to save temp store:', err));
}, 30 * 60 * 1000);
export const sessionStore = {
initTemp(t) {
return ensureTempRecord(t);
},
async tempCanSend(t) {
if (isPostgresStorageMode()) await ensureSqlTempLoaded(t);
const d = tempStore.get(t);
return d ? d.msgCount < TEMP_MSG_LIMIT : false;
},
async tempBump(t) {
if (isPostgresStorageMode()) await ensureSqlTempLoaded(t);
const d = ensureTempRecord(t);
d.msgCount += 1;
d.lastActive = Date.now();
if (isPostgresStorageMode()) {
await persistSqlTempState(t);
} else {
saveTempStore().catch((err) => console.error('Failed to save temp store:', err));
}
},
async getTempSessions(t) {
if (isPostgresStorageMode()) await ensureSqlTempLoaded(t);
return [...(tempStore.get(t)?.sessions.values() || [])];
},
async getTempSession(t, id) {
if (isPostgresStorageMode()) await ensureSqlTempLoaded(t);
return tempStore.get(t)?.sessions.get(id) || null;
},
async createTempSession(t) {
if (isPostgresStorageMode()) await ensureSqlTempLoaded(t);
const d = ensureTempRecord(t);
const s = { id: crypto.randomUUID(), name: 'New Chat', created: Date.now(), history: [] };
d.sessions.set(s.id, s);
d.lastActive = Date.now();
if (isPostgresStorageMode()) {
await persistSqlSession(tempOwner(t), 'guest', s, guestExpiryRecord(d));
await persistSqlTempState(t);
} else {
saveTempStore().catch((err) => console.error('Failed to save temp store:', err));
}
return s;
},
async updateTempSession(t, id, patch) {
if (isPostgresStorageMode()) await ensureSqlTempLoaded(t);
const d = tempStore.get(t);
if (!d) return null;
const s = d.sessions.get(id);
if (!s) return null;
Object.assign(s, patch);
d.lastActive = Date.now();
if (isPostgresStorageMode()) {
await persistSqlSession(tempOwner(t), 'guest', s, guestExpiryRecord(d));
await persistSqlTempState(t);
} else {
saveTempStore().catch((err) => console.error('Failed to save temp store:', err));
}
return s;
},
async restoreTempSession(t, session) {
if (isPostgresStorageMode()) await ensureSqlTempLoaded(t);
const d = ensureTempRecord(t);
const restored = JSON.parse(JSON.stringify(session));
d.sessions.set(restored.id, restored);
d.lastActive = Date.now();
if (isPostgresStorageMode()) {
await persistSqlSession(tempOwner(t), 'guest', restored, guestExpiryRecord(d));
await persistSqlTempState(t);
} else {
saveTempStore().catch((err) => console.error('Failed to save temp store:', err));
}
return restored;
},
async deleteTempSession(t, id) {
if (isPostgresStorageMode()) await ensureSqlTempLoaded(t);
tempStore.get(t)?.sessions.delete(id);
if (isPostgresStorageMode()) {
await pgQuery(
'DELETE FROM chat_sessions WHERE id = $1 AND scope_type = $2 AND owner_lookup = $3',
[id, 'guest', makeOwnerLookup(tempOwner(t))]
);
await persistSqlTempState(t);
} else {
saveTempStore().catch((err) => console.error('Failed to save temp store:', err));
}
},
async deleteTempAll(t) {
if (isPostgresStorageMode()) await ensureSqlTempLoaded(t);
tempStore.get(t)?.sessions.clear();
if (isPostgresStorageMode()) {
await pgQuery(
'DELETE FROM chat_sessions WHERE scope_type = $1 AND owner_lookup = $2',
['guest', makeOwnerLookup(tempOwner(t))]
);
await persistSqlTempState(t);
} else {
saveTempStore().catch((err) => console.error('Failed to save temp store:', err));
}
},
async deleteTempSessionEverywhere(id) {
let changed = false;
for (const temp of tempStore.values()) {
if (temp.sessions.delete(id)) changed = true;
}
if (isPostgresStorageMode()) {
const result = await pgQuery(
'DELETE FROM chat_sessions WHERE id = $1 AND scope_type = $2',
[id, 'guest']
);
return changed || result.rowCount > 0;
}
if (changed) saveTempStore().catch((err) => console.error('Failed to save temp store:', err));
return changed;
},
async transferTempToUser(tempId, userId, accessToken) {
if (isPostgresStorageMode()) await ensureSqlTempLoaded(tempId);
const d = tempStore.get(tempId);
if (!d || !d.sessions.size) return;
const user = this._ensureUser(userId);
if (isPostgresStorageMode()) await ensureUserLoaded(userId);
const uc = isPostgresStorageMode() ? null : userClient(accessToken);
for (const s of d.sessions.values()) {
if (!s.history || s.history.length === 0) continue;
if (user.sessions.has(s.id)) continue;
const copy = JSON.parse(JSON.stringify(s));
user.sessions.set(copy.id, copy);
if (isPostgresStorageMode()) {
await persistSqlSession(userOwner(userId), 'user', copy, null);
} else {
await this._persist(uc, userId, copy).catch((err) =>
console.error('transferTempToUser persist error:', err.message));
}
}
},
_ensureUser(uid) {
if (!userCache.has(uid)) userCache.set(uid, { sessions: new Map(), online: new Set() });
return userCache.get(uid);
},
async loadUserSessions(userId, accessToken) {
if (isPostgresStorageMode()) return loadUserSessionsSql(userId);
const uc = userClient(accessToken);
const { data, error } = await uc.from('web_sessions').select('*')
.eq('user_id', userId).order('updated_at', { ascending: false });
if (error) { console.error('loadUserSessions', error.message); return []; }
const user = this._ensureUser(userId);
for (const row of data || []) {
user.sessions.set(row.id, {
id: row.id,
name: row.name,
created: new Date(row.created_at).getTime(),
history: row.history || [],
model: row.model,
});
}
return [...user.sessions.values()];
},
getUserSessions(uid) {
return [...(userCache.get(uid)?.sessions.values() || [])];
},
getUserSession(uid, id) {
return userCache.get(uid)?.sessions.get(id) || null;
},
async createUserSession(userId, accessToken) {
if (isPostgresStorageMode()) await ensureUserLoaded(userId);
const s = { id: crypto.randomUUID(), name: 'New Chat', created: Date.now(), history: [] };
this._ensureUser(userId).sessions.set(s.id, s);
if (isPostgresStorageMode()) {
await persistSqlSession(userOwner(userId), 'user', s, null);
} else {
await this._persist(userClient(accessToken), userId, s).catch(() => {});
}
return s;
},
async restoreUserSession(userId, accessToken, session) {
if (isPostgresStorageMode()) await ensureUserLoaded(userId);
const restored = JSON.parse(JSON.stringify(session));
this._ensureUser(userId).sessions.set(restored.id, restored);
if (isPostgresStorageMode()) {
await persistSqlSession(userOwner(userId), 'user', restored, null);
} else {
await this._persist(userClient(accessToken), userId, restored).catch(() => {});
}
return restored;
},
async updateUserSession(userId, accessToken, sessionId, patch) {
if (isPostgresStorageMode()) await ensureUserLoaded(userId);
const user = userCache.get(userId);
if (!user) { console.error('No user for ' + userId); return null; }
const s = user.sessions.get(sessionId);
if (!s) { console.error('No session found for ' + sessionId); return null; }
Object.assign(s, patch);
if (isPostgresStorageMode()) {
await persistSqlSession(userOwner(userId), 'user', s, null);
} else {
await this._persist(userClient(accessToken), userId, s).catch(() => {});
}
return s;
},
async deleteUserSession(userId, accessToken, id) {
try {
userCache.get(userId)?.sessions.delete(id);
if (isPostgresStorageMode()) {
await pgQuery(
'DELETE FROM chat_sessions WHERE id = $1 AND scope_type = $2 AND owner_lookup = $3',
[id, 'user', makeOwnerLookup(userOwner(userId))]
);
return;
}
const { error } = await userClient(accessToken)
.from('web_sessions')
.delete()
.eq('id', id)
.eq('user_id', userId);
if (error) console.error('Supabase delete error:', error.message);
} catch (ex) {
console.error('Unexpected deleteUserSession error:', ex);
}
},
async deleteAllUserSessions(userId, accessToken) {
const u = userCache.get(userId);
if (u) {
u.sessions.clear();
} else {
console.error('No user for ' + userId);
return null;
}
try {
if (isPostgresStorageMode()) {
await pgQuery(
'DELETE FROM chat_sessions WHERE scope_type = $1 AND owner_lookup = $2',
['user', makeOwnerLookup(userOwner(userId))]
);
return;
}
const { error } = await userClient(accessToken)
.from('web_sessions')
.delete()
.eq('user_id', userId);
if (error) console.error('Supabase bulk delete error:', error.message);
} catch (ex) {
console.error('Unexpected deleteAllUserSessions error:', ex);
}
},
async _persist(uc, userId, s) {
await uc.from('web_sessions').upsert({
id: s.id,
user_id: userId,
name: s.name,
history: s.history || [],
model: s.model || null,
updated_at: new Date().toISOString(),
created_at: new Date(s.created).toISOString(),
});
},
markOnline(uid, ws) {
this._ensureUser(uid).online.add(ws);
},
markOffline(uid, ws) {
userCache.get(uid)?.online.delete(ws);
},
async createShareToken(userId, accessToken, sessionId) {
const s = this.getUserSession(userId, sessionId);
if (!s) return null;
const token = crypto.randomBytes(24).toString('base64url');
if (isPostgresStorageMode()) {
const record = {
id: crypto.randomUUID(),
ownerId: userId,
sessionSnapshot: s,
createdAt: nowIso(),
};
await pgQuery(
`INSERT INTO session_shares (id, token_lookup, owner_lookup, created_at, payload)
VALUES ($1, $2, $3, $4, $5::jsonb)`,
[
record.id,
shareTokenLookup(token),
makeOwnerLookup(userOwner(userId)),
record.createdAt,
JSON.stringify(encryptJsonPayload(record, shareAad(record.id))),
]
);
return token;
}
const uc = userClient(accessToken);
const { error } = await uc.from('shared_sessions').insert({
token,
owner_id: userId,
session_snapshot: s,
created_at: new Date().toISOString(),
});
return error ? null : token;
},
async resolveShareToken(token) {
if (isPostgresStorageMode()) {
const { rows } = await pgQuery(
'SELECT id, payload FROM session_shares WHERE token_lookup = $1',
[shareTokenLookup(token)]
);
const record = rows[0] ? decryptJsonPayload(rows[0].payload, shareAad(rows[0].id)) : null;
return record ? {
token,
owner_id: record.ownerId,
session_snapshot: record.sessionSnapshot,
created_at: record.createdAt,
} : null;
}
const uc = createClient(_SUPABASE_URL, _SUPABASE_ANON_KEY, { auth: { persistSession: false } });
const { data } = await uc.from('shared_sessions').select('*').eq('token', token).single();
return data || null;
},
async importSharedSession(userId, accessToken, token) {
const shared = await this.resolveShareToken(token);
if (!shared) return null;
const snap = shared.session_snapshot;
const newSession = {
...snap,
id: crypto.randomUUID(),
name: `${snap.name} (shared)`,
created: Date.now(),
};
if (isPostgresStorageMode()) await ensureUserLoaded(userId);
this._ensureUser(userId).sessions.set(newSession.id, newSession);
if (isPostgresStorageMode()) {
await persistSqlSession(userOwner(userId), 'user', newSession, null);
} else {
const uc = userClient(accessToken);
await this._persist(uc, userId, newSession).catch(() => {});
}
return newSession;
},
};
async function upsertSqlDeviceSession(session) {
const lookup = deviceTokenLookup(session.token);
await pgQuery(
`INSERT INTO device_sessions (token_lookup, user_lookup, active, created_at, last_seen_at, payload)
VALUES ($1, $2, $3, $4, $5, $6::jsonb)
ON CONFLICT (token_lookup)
DO UPDATE SET
user_lookup = EXCLUDED.user_lookup,
active = EXCLUDED.active,
created_at = EXCLUDED.created_at,
last_seen_at = EXCLUDED.last_seen_at,
payload = EXCLUDED.payload`,
[
lookup,
makeOwnerLookup(userOwner(session.userId)),
!!session.active,
session.createdAt,
session.lastSeen,
JSON.stringify(encryptJsonPayload(session, deviceSessionAad(lookup))),
]
);
}
async function loadSqlDeviceSession(token) {
const lookup = deviceTokenLookup(token);
const { rows } = await pgQuery(
'SELECT payload FROM device_sessions WHERE token_lookup = $1',
[lookup]
);
return rows[0] ? decryptJsonPayload(rows[0].payload, deviceSessionAad(lookup)) : null;
}
export const deviceSessionStore = {
async create(userId, ip, userAgent) {
const token = crypto.randomBytes(32).toString('hex');
const session = {
token,
userId,
ip,
userAgent,
createdAt: nowIso(),
lastSeen: nowIso(),
active: true,
};
devSessions.set(token, session);
if (isPostgresStorageMode()) {
await upsertSqlDeviceSession(session);
}
return token;
},
async getForUser(uid) {
if (isPostgresStorageMode()) {
const { rows } = await pgQuery(
'SELECT token_lookup, payload FROM device_sessions WHERE user_lookup = $1 AND active = TRUE ORDER BY last_seen_at DESC',
[makeOwnerLookup(userOwner(uid))]
);
const sessions = rows
.map((row) => decryptJsonPayload(row.payload, deviceSessionAad(row.token_lookup)))
.filter((session) => session?.userId === uid && session.active);
for (const session of sessions) devSessions.set(session.token, session);
return sessions;
}
return [...devSessions.values()].filter((s) => s.userId === uid && s.active);
},
async revoke(token) {
let session = devSessions.get(token) || null;
if (isPostgresStorageMode() && !session) {
session = await loadSqlDeviceSession(token);
}
if (!session) return null;
session.active = false;
devSessions.set(token, session);
if (isPostgresStorageMode()) {
await upsertSqlDeviceSession(session);
}
return session;
},
async revokeAllExcept(uid, except) {
if (isPostgresStorageMode()) {
const sessions = await this.getForUser(uid);
for (const session of sessions) {
if (session.token === except) continue;
session.active = false;
devSessions.set(session.token, session);
await upsertSqlDeviceSession(session);
}
return;
}
for (const [t, s] of devSessions) {
if (s.userId === uid && t !== except) s.active = false;
}
},
async validate(token) {
let session = devSessions.get(token) || null;
if (isPostgresStorageMode() && !session) {
session = await loadSqlDeviceSession(token);
}
if (!session || !session.active) return null;
session.lastSeen = nowIso();
devSessions.set(token, session);
if (isPostgresStorageMode()) {
await upsertSqlDeviceSession(session);
}
return session;
},
};