Spaces:
Running
Running
File size: 9,861 Bytes
85a0eea aecadf5 85a0eea | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
# =============================================================================
# app/db_sync.py
# Internal SQLite IPC β app/* state & communication
# Universal MCP Hub (Sandboxed) - based on PyFundaments Architecture
# Copyright 2026 - Volkan KΓΌcΓΌkbudak
# Apache License V. 2 + ESOL 1.1
# Repo: https://github.com/VolkanSah/Universal-MCP-Hub-sandboxed
# =============================================================================
# ARCHITECTURE NOTE:
# This file lives exclusively in app/ and is ONLY started by app/app.py.
# NO direct access to fundaments/*, .env, or Guardian (main.py).
# DB path comes from app/.pyfun [DB_SYNC] β SQLITE_PATH via app/config.py.
#
# CRITICAL RULES:
# - This is NOT postgresql.py β cloud DB is Guardian-only!
# - db_sync ONLY manages its own tables (hub_state, tool_cache)
# - NEVER touch Guardian tables (users, sessions) β those belong to user_handler.py
# - SQLite path is shared with user_handler.py via SQLITE_PATH
# - app/* modules call db_sync.write() / db_sync.read() β never aiosqlite directly
#
# TABLE OWNERSHIP:
# users, sessions β Guardian (fundaments/user_handler.py) β DO NOT TOUCH!
# hub_state β db_sync (app/* internal state)
# tool_cache β db_sync (app/* tool response cache)
# =============================================================================
import aiosqlite
import logging
import json
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from . import config
logger = logging.getLogger("db_sync")
# =============================================================================
# Internal State
# =============================================================================
_db_path: Optional[str] = None
_initialized: bool = False
# =============================================================================
# Initialization β called by app/app.py (parameterless, sandboxed)
# =============================================================================
async def initialize() -> None:
global _db_path, _initialized
if _initialized:
return
db_cfg = config.get_db_sync()
raw_path = db_cfg.get("SQLITE_PATH", "app/.hub_state.db")
# HF Spaces: SPACE_ID is set β filesystem is read-only except /tmp/
import os
if os.getenv("SPACE_ID"):
filename = os.path.basename(raw_path)
_db_path = f"/tmp/{filename}"
logger.info(f"HF Space detected β SQLite relocated to {_db_path}")
else:
_db_path = raw_path
await _init_tables()
_initialized = True
logger.info(f"db_sync initialized β path: {_db_path}")
# =============================================================================
# SECTION 1 β Table Setup (app/* tables only!)
# =============================================================================
async def _init_tables() -> None:
"""
Creates app/* internal tables if they don't exist.
NEVER modifies Guardian tables (users, sessions).
"""
async with aiosqlite.connect(_db_path) as db:
# hub_state β generic key/value store for app/* modules
await db.execute("""
CREATE TABLE IF NOT EXISTS hub_state (
key TEXT PRIMARY KEY,
value TEXT,
updated_at TEXT
)
""")
# tool_cache β cached tool responses to reduce API calls
await db.execute("""
CREATE TABLE IF NOT EXISTS tool_cache (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tool_name TEXT NOT NULL,
prompt TEXT NOT NULL,
response TEXT NOT NULL,
provider TEXT,
model TEXT,
created_at TEXT
)
""")
await db.execute("""
CREATE INDEX IF NOT EXISTS idx_tool_cache_tool
ON tool_cache(tool_name)
""")
await db.commit()
logger.info("db_sync tables ready.")
# =============================================================================
# SECTION 2 β Key/Value Store (hub_state table)
# =============================================================================
async def write(key: str, value: Any) -> None:
"""
Write a value to hub_state key/value store.
Value is JSON-serialized β supports dicts, lists, strings, numbers.
Args:
key: Unique key string (e.g. 'scheduler.last_run').
value: Any JSON-serializable value.
"""
_check_init()
now = datetime.now(timezone.utc).isoformat()
async with aiosqlite.connect(_db_path) as db:
await db.execute("""
INSERT OR REPLACE INTO hub_state (key, value, updated_at)
VALUES (?, ?, ?)
""", (key, json.dumps(value), now))
await db.commit()
async def read(key: str, default: Any = None) -> Any:
"""
Read a value from hub_state key/value store.
Returns default if key does not exist.
Args:
key: Key string to look up.
default: Default value if key not found. Default: None.
Returns:
Deserialized value, or default if not found.
"""
_check_init()
async with aiosqlite.connect(_db_path) as db:
cursor = await db.execute(
"SELECT value FROM hub_state WHERE key = ?", (key,)
)
row = await cursor.fetchone()
if row is None:
return default
try:
return json.loads(row[0])
except (json.JSONDecodeError, TypeError):
return row[0]
async def delete(key: str) -> None:
"""
Delete a key from hub_state.
Args:
key: Key string to delete.
"""
_check_init()
async with aiosqlite.connect(_db_path) as db:
await db.execute("DELETE FROM hub_state WHERE key = ?", (key,))
await db.commit()
# =============================================================================
# SECTION 3 β Tool Cache (tool_cache table)
# =============================================================================
async def cache_write(
tool_name: str,
prompt: str,
response: str,
provider: str = None,
model: str = None,
) -> None:
"""
Cache a tool response to reduce redundant API calls.
Args:
tool_name: Tool name (e.g. 'llm_complete', 'web_search').
prompt: The input prompt/query that was used.
response: The response to cache.
provider: Provider name used (optional).
model: Model name used (optional).
"""
_check_init()
db_cfg = config.get_db_sync()
max_entries = int(db_cfg.get("MAX_CACHE_ENTRIES", "1000"))
now = datetime.now(timezone.utc).isoformat()
async with aiosqlite.connect(_db_path) as db:
await db.execute("""
INSERT INTO tool_cache (tool_name, prompt, response, provider, model, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (tool_name, prompt, response, provider, model, now))
# Enforce MAX_CACHE_ENTRIES β delete oldest if exceeded
await db.execute("""
DELETE FROM tool_cache WHERE id NOT IN (
SELECT id FROM tool_cache ORDER BY created_at DESC LIMIT ?
)
""", (max_entries,))
await db.commit()
async def cache_read(tool_name: str, prompt: str) -> Optional[str]:
"""
Read a cached tool response.
Returns None if no cache entry exists.
Args:
tool_name: Tool name to look up.
prompt: The exact prompt/query to match.
Returns:
Cached response string, or None if not found.
"""
_check_init()
async with aiosqlite.connect(_db_path) as db:
cursor = await db.execute("""
SELECT response FROM tool_cache
WHERE tool_name = ? AND prompt = ?
ORDER BY created_at DESC LIMIT 1
""", (tool_name, prompt))
row = await cursor.fetchone()
return row[0] if row else None
# =============================================================================
# SECTION 4 β Read-Only Query (for mcp.py db_query tool)
# =============================================================================
async def query(sql: str) -> List[Dict]:
"""
Execute a read-only SELECT query on the internal hub state database.
Only SELECT statements are permitted β write operations are blocked.
Called by mcp.py db_query tool when db_sync.py is active.
Args:
sql: SQL SELECT statement to execute.
Returns:
List of result rows as dicts.
Raises:
ValueError: If the query is not a SELECT statement.
"""
_check_init()
if not sql.strip().upper().startswith("SELECT"):
raise ValueError("Only SELECT queries are permitted in db_query tool.")
async with aiosqlite.connect(_db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(sql)
rows = await cursor.fetchall()
return [dict(r) for r in rows]
# =============================================================================
# SECTION 5 β Helpers
# =============================================================================
def _check_init() -> None:
"""Raise RuntimeError if db_sync was not initialized."""
if not _initialized or not _db_path:
raise RuntimeError("db_sync not initialized β call initialize() first.")
def is_ready() -> bool:
"""Returns True if db_sync is initialized and ready."""
return _initialized and _db_path is not None
# =============================================================================
# Direct execution guard
# =============================================================================
if __name__ == "__main__":
print("WARNING: Run via main.py β app.py, not directly.")
|