File size: 10,495 Bytes
1d1d7ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82f0d5d
 
1d1d7ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82f0d5d
1d1d7ae
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
"""
Video Intelligence Platform — Query Engine
Handles natural language queries with boolean decomposition,
dual-channel search (visual + caption), and result fusion.
"""
import numpy as np
from typing import List, Dict, Optional, Tuple, Set
from collections import defaultdict

from .index_store import VideoIndex
from .gemini_client import GeminiClient
from .visual_encoders import SigLIPEncoder


class QueryResult:
    """A single search result with timestamp and relevance info."""

    def __init__(self, frame_id: int, timestamp_sec: float, score: float,
                 caption: str = "", detections: List[str] = None,
                 match_source: str = ""):
        self.frame_id = frame_id
        self.timestamp_sec = timestamp_sec
        self.score = score
        self.caption = caption
        self.detections = detections or []
        self.match_source = match_source  # "visual", "caption", "detection", "fused"

    @property
    def time_str(self) -> str:
        """Format timestamp as HH:MM:SS."""
        ts = self.timestamp_sec
        hrs = int(ts // 3600)
        mins = int((ts % 3600) // 60)
        secs = int(ts % 60)
        return f"{hrs:02d}:{mins:02d}:{secs:02d}"

    def to_dict(self) -> Dict:
        return {
            "frame_id": self.frame_id,
            "timestamp_sec": self.timestamp_sec,
            "time_str": self.time_str,
            "score": self.score,
            "caption": self.caption,
            "detections": self.detections,
            "match_source": self.match_source,
        }

    def __repr__(self):
        cap = self.caption[:80] if self.caption else ""
        return f"[{self.time_str}] score={self.score:.3f} ({self.match_source}) {cap}"


class QueryEngine:
    """
    Multi-channel query engine:
    1. Visual search: SigLIP2 text→frame embedding similarity
    2. Caption search: Gemini embedding text→caption similarity
    3. Detection search: SQL structured search on detected objects
    4. Fusion: merge results from all channels with score weighting
    5. Boolean ops: AND (intersect timestamps), OR (union), NOT (exclude)
    """

    def __init__(self, index: VideoIndex, gemini: GeminiClient,
                 siglip: SigLIPEncoder, top_k: int = 20):
        self.index = index
        self.gemini = gemini
        self.siglip = siglip
        self.top_k = top_k

        # Channel weights for fusion
        self.weights = {
            "visual": 0.35,
            "caption": 0.35,
            "detection": 0.30,
        }

    def search(self, query: str, top_k: Optional[int] = None) -> List[QueryResult]:
        """
        Full search pipeline:
        1. Decompose query (detect boolean operators)
        2. Search each sub-query across all channels
        3. Apply boolean operations
        4. Return fused, ranked results
        """
        top_k = top_k or self.top_k

        # Step 1: Decompose query
        decomposed = self.gemini.decompose_query(query)
        sub_queries = decomposed.get("sub_queries", [query])
        operator = decomposed.get("operator", "SINGLE")

        print(f"🔍 Query: '{query}'")
        print(f"   Decomposed: {sub_queries} [{operator}]")

        # Step 2: Search each sub-query
        sub_results = []
        for sq in sub_queries:
            results = self._search_single(sq, top_k=top_k * 2)  # Over-fetch for fusion
            sub_results.append(results)

        # Step 3: Apply boolean operations
        if operator == "AND" and len(sub_results) > 1:
            final = self._boolean_and(sub_results)
        elif operator == "OR" and len(sub_results) > 1:
            final = self._boolean_or(sub_results)
        else:
            final = sub_results[0] if sub_results else []

        # Step 4: Sort by score, deduplicate nearby timestamps, limit
        final = self._deduplicate_temporal(final, window_sec=3.0)
        final.sort(key=lambda r: r.score, reverse=True)
        return final[:top_k]

    def _search_single(self, query: str, top_k: int = 40) -> List[QueryResult]:
        """Search a single query across all channels and fuse results."""
        results_by_frame: Dict[int, Dict] = defaultdict(lambda: {
            "scores": {}, "caption": "", "detections": [], "timestamp_sec": 0
        })

        # Channel 1: Visual search (SigLIP2)
        try:
            text_emb = self.siglip.embed_texts([query])
            if text_emb.size > 0:
                visual_hits = self.index.search_visual(text_emb[0], top_k=top_k)
                for frame_id, score in visual_hits:
                    results_by_frame[frame_id]["scores"]["visual"] = score
                    frame = self.index.get_frame(frame_id)
                    if frame:
                        results_by_frame[frame_id]["timestamp_sec"] = frame["timestamp_sec"]
                        results_by_frame[frame_id]["caption"] = frame.get("caption", "")
        except Exception as e:
            print(f"   ⚠️ Visual search failed: {e}")

        # Channel 2: Caption search (Gemini embeddings)
        try:
            query_emb = self.gemini.embed_query(query)
            if query_emb:
                caption_hits = self.index.search_captions(
                    np.array(query_emb), top_k=top_k
                )
                for frame_id, score in caption_hits:
                    results_by_frame[frame_id]["scores"]["caption"] = score
                    frame = self.index.get_frame(frame_id)
                    if frame:
                        results_by_frame[frame_id]["timestamp_sec"] = frame["timestamp_sec"]
                        results_by_frame[frame_id]["caption"] = frame.get("caption", "")
        except Exception as e:
            print(f"   ⚠️ Caption search failed: {e}")

        # Channel 3: Detection search (structured SQL)
        try:
            detection_hits = self.index.search_detections(query)
            for det in detection_hits[:top_k]:
                fid = det["frame_id"]
                # Score based on detection confidence
                det_score = det["confidence"]
                existing = results_by_frame[fid]["scores"].get("detection", 0)
                results_by_frame[fid]["scores"]["detection"] = max(existing, det_score)
                results_by_frame[fid]["timestamp_sec"] = det["timestamp_sec"]
                results_by_frame[fid]["caption"] = det.get("caption", "")
                results_by_frame[fid]["detections"].append(det["label"])
        except Exception as e:
            print(f"   ⚠️ Detection search failed: {e}")

        # Fuse scores
        fused_results = []
        for frame_id, data in results_by_frame.items():
            # Weighted score fusion
            total_score = 0
            total_weight = 0
            sources = []
            for channel, weight in self.weights.items():
                if channel in data["scores"]:
                    total_score += data["scores"][channel] * weight
                    total_weight += weight
                    sources.append(channel)

            final_score = total_score / total_weight if total_weight > 0 else 0

            fused_results.append(QueryResult(
                frame_id=frame_id,
                timestamp_sec=data["timestamp_sec"],
                score=final_score,
                caption=data["caption"],
                detections=list(set(data["detections"])),
                match_source="+".join(sources),
            ))

        return fused_results

    def _boolean_and(self, sub_results: List[List[QueryResult]]) -> List[QueryResult]:
        """
        AND operation: find timestamps where ALL sub-queries match.
        Uses a temporal window (±5 seconds) for fuzzy timestamp matching.
        """
        if not sub_results:
            return []

        window = 5.0  # seconds tolerance for "same moment"

        # Get timestamp sets for each sub-query
        def get_timestamp_set(results: List[QueryResult]) -> List[Tuple[float, QueryResult]]:
            return [(r.timestamp_sec, r) for r in results]

        sets = [get_timestamp_set(sr) for sr in sub_results]

        # Find timestamps in first set that have matches in all other sets
        merged = []
        for ts1, r1 in sets[0]:
            all_match = True
            combined_score = r1.score
            combined_detections = list(r1.detections)

            for other_set in sets[1:]:
                # Find closest match within window
                best_match = None
                best_dist = float("inf")
                for ts2, r2 in other_set:
                    dist = abs(ts1 - ts2)
                    if dist < window and dist < best_dist:
                        best_dist = dist
                        best_match = r2

                if best_match is None:
                    all_match = False
                    break
                else:
                    combined_score = (combined_score + best_match.score) / 2
                    combined_detections.extend(best_match.detections)

            if all_match:
                merged.append(QueryResult(
                    frame_id=r1.frame_id,
                    timestamp_sec=r1.timestamp_sec,
                    score=combined_score,
                    caption=r1.caption,
                    detections=list(set(combined_detections)),
                    match_source="fused_AND",
                ))

        return merged

    def _boolean_or(self, sub_results: List[List[QueryResult]]) -> List[QueryResult]:
        """OR operation: union of all results."""
        seen_frames: Set[int] = set()
        merged = []

        for result_list in sub_results:
            for r in result_list:
                if r.frame_id not in seen_frames:
                    seen_frames.add(r.frame_id)
                    r.match_source += "_OR"
                    merged.append(r)

        return merged

    def _deduplicate_temporal(self, results: List[QueryResult],
                              window_sec: float = 3.0) -> List[QueryResult]:
        """Remove results that are too close in time (keep highest score)."""
        if not results:
            return []

        results.sort(key=lambda r: r.timestamp_sec)
        deduped = [results[0]]

        for r in results[1:]:
            if abs(r.timestamp_sec - deduped[-1].timestamp_sec) > window_sec:
                deduped.append(r)
            elif r.score > deduped[-1].score:
                deduped[-1] = r

        return deduped