File size: 3,579 Bytes
b62e029
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cda6eee
 
b62e029
 
 
cda6eee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b62e029
cda6eee
b62e029
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# storage/sqlite_client.py

import sqlite3
from typing import Any, Dict, List

from core.exceptions import DatabaseError
from core.logger import setup_logger

logger = setup_logger("sqlite_client")

class SQLiteStorage:
    def __init__(self, db_path: str):
        self.db_path = db_path
        try:
            self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
            self.conn.row_factory = sqlite3.Row 
            logger.info(f"โœ… Connected to SQLite at {self.db_path}")
        except sqlite3.Error as e:
            logger.critical(f"โŒ SQLite connection failed: {e}")
            raise DatabaseError(f"Database connection failed: {e}")

    def get_enriched_chunks_dict(self, chunk_ids: List[int]) -> Dict[int, Dict[str, Any]]:
        """
        Given a list of chunk_ids, retrieves the corresponding text and metadata from the SQLite database.
         - This is designed for O(1) access in the search service, where we need to quickly map chunk_ids from Qdrant results to their full text and metadata for reranking and final response construction.
         - The returned dictionary is structured as { chunk_id: { "text": "...", "metadata": {...} } }, allowing for efficient lookups during the search pipeline.
         - The SQL query uses a JOIN to combine data from the chunks and documents tables, ensuring we get all necessary information in a single query for performance optimization.
         - If the list of chunk_ids is empty, it returns an empty dictionary immediately to avoid unnecessary database queries.
         - Error handling is included to catch and log any database issues that arise during query execution.
        """
        if not chunk_ids:
            return {}

        CHUNK_SIZE_LIMIT = 900  # SQLite has a default limit of 999 variables per query, so we use 900 to be safe
        result_dict = {}
        
        try:
            cur = self.conn.cursor()

            for i in range(0, len(chunk_ids), CHUNK_SIZE_LIMIT):
                batch_ids = chunk_ids[i:i + CHUNK_SIZE_LIMIT]
                placeholders = ",".join("?" * len(batch_ids))
                query = f"""
                    SELECT 
                        c.chunk_id, c.text AS chunk_text,
                        d.doc_id, d.title, d.lang, d.url, d.date_modified
                    FROM chunks c
                    JOIN documents d ON c.doc_id = d.doc_id
                    WHERE c.chunk_id IN ({placeholders})
                """

                cur.execute(query, batch_ids)
                rows = cur.fetchall()
                
                # Transform the result into a dictionary for O(1) access: { chunk_id: { "text": "...", "metadata": {...} } }
                for row in rows:
                    result_dict[row["chunk_id"]] = {
                        "text": row["chunk_text"],
                        "metadata": {
                            "doc_id": row["doc_id"],
                            "title": row["title"],
                            "lang": row["lang"],
                            "url": row["url"],
                            "date_modified": row["date_modified"]
                        }
                    }
                    
            return result_dict
            
        except sqlite3.Error as e:
            logger.error(f"Failed to fetch enriched chunks: {e}")
            raise DatabaseError(f"Query execution failed: {e}")

    def close(self):
        if hasattr(self, 'conn') and self.conn:
            self.conn.close()
            logger.info("๐Ÿ›‘ SQLite connection closed.")