Spaces:
Sleeping
Sleeping
File size: 3,579 Bytes
b62e029 cda6eee b62e029 cda6eee b62e029 cda6eee b62e029 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | # storage/sqlite_client.py
import sqlite3
from typing import Any, Dict, List
from core.exceptions import DatabaseError
from core.logger import setup_logger
logger = setup_logger("sqlite_client")
class SQLiteStorage:
def __init__(self, db_path: str):
self.db_path = db_path
try:
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
self.conn.row_factory = sqlite3.Row
logger.info(f"โ
Connected to SQLite at {self.db_path}")
except sqlite3.Error as e:
logger.critical(f"โ SQLite connection failed: {e}")
raise DatabaseError(f"Database connection failed: {e}")
def get_enriched_chunks_dict(self, chunk_ids: List[int]) -> Dict[int, Dict[str, Any]]:
"""
Given a list of chunk_ids, retrieves the corresponding text and metadata from the SQLite database.
- This is designed for O(1) access in the search service, where we need to quickly map chunk_ids from Qdrant results to their full text and metadata for reranking and final response construction.
- The returned dictionary is structured as { chunk_id: { "text": "...", "metadata": {...} } }, allowing for efficient lookups during the search pipeline.
- The SQL query uses a JOIN to combine data from the chunks and documents tables, ensuring we get all necessary information in a single query for performance optimization.
- If the list of chunk_ids is empty, it returns an empty dictionary immediately to avoid unnecessary database queries.
- Error handling is included to catch and log any database issues that arise during query execution.
"""
if not chunk_ids:
return {}
CHUNK_SIZE_LIMIT = 900 # SQLite has a default limit of 999 variables per query, so we use 900 to be safe
result_dict = {}
try:
cur = self.conn.cursor()
for i in range(0, len(chunk_ids), CHUNK_SIZE_LIMIT):
batch_ids = chunk_ids[i:i + CHUNK_SIZE_LIMIT]
placeholders = ",".join("?" * len(batch_ids))
query = f"""
SELECT
c.chunk_id, c.text AS chunk_text,
d.doc_id, d.title, d.lang, d.url, d.date_modified
FROM chunks c
JOIN documents d ON c.doc_id = d.doc_id
WHERE c.chunk_id IN ({placeholders})
"""
cur.execute(query, batch_ids)
rows = cur.fetchall()
# Transform the result into a dictionary for O(1) access: { chunk_id: { "text": "...", "metadata": {...} } }
for row in rows:
result_dict[row["chunk_id"]] = {
"text": row["chunk_text"],
"metadata": {
"doc_id": row["doc_id"],
"title": row["title"],
"lang": row["lang"],
"url": row["url"],
"date_modified": row["date_modified"]
}
}
return result_dict
except sqlite3.Error as e:
logger.error(f"Failed to fetch enriched chunks: {e}")
raise DatabaseError(f"Query execution failed: {e}")
def close(self):
if hasattr(self, 'conn') and self.conn:
self.conn.close()
logger.info("๐ SQLite connection closed.") |