| """ |
| Video Intelligence Platform — Query Engine |
| Handles natural language queries with boolean decomposition, |
| dual-channel search (visual + caption), and result fusion. |
| """ |
| import numpy as np |
| from typing import List, Dict, Optional, Tuple, Set |
| from collections import defaultdict |
|
|
| from .index_store import VideoIndex |
| from .gemini_client import GeminiClient |
| from .visual_encoders import SigLIPEncoder |
|
|
|
|
| class QueryResult: |
| """A single search result with timestamp and relevance info.""" |
|
|
| def __init__(self, frame_id: int, timestamp_sec: float, score: float, |
| caption: str = "", detections: List[str] = None, |
| match_source: str = ""): |
| self.frame_id = frame_id |
| self.timestamp_sec = timestamp_sec |
| self.score = score |
| self.caption = caption |
| self.detections = detections or [] |
| self.match_source = match_source |
|
|
| @property |
| def time_str(self) -> str: |
| """Format timestamp as HH:MM:SS.""" |
| ts = self.timestamp_sec |
| hrs = int(ts // 3600) |
| mins = int((ts % 3600) // 60) |
| secs = int(ts % 60) |
| return f"{hrs:02d}:{mins:02d}:{secs:02d}" |
|
|
| def to_dict(self) -> Dict: |
| return { |
| "frame_id": self.frame_id, |
| "timestamp_sec": self.timestamp_sec, |
| "time_str": self.time_str, |
| "score": self.score, |
| "caption": self.caption, |
| "detections": self.detections, |
| "match_source": self.match_source, |
| } |
|
|
| def __repr__(self): |
| cap = self.caption[:80] if self.caption else "" |
| return f"[{self.time_str}] score={self.score:.3f} ({self.match_source}) {cap}" |
|
|
|
|
| class QueryEngine: |
| """ |
| Multi-channel query engine: |
| 1. Visual search: SigLIP2 text→frame embedding similarity |
| 2. Caption search: Gemini embedding text→caption similarity |
| 3. Detection search: SQL structured search on detected objects |
| 4. Fusion: merge results from all channels with score weighting |
| 5. Boolean ops: AND (intersect timestamps), OR (union), NOT (exclude) |
| """ |
|
|
| def __init__(self, index: VideoIndex, gemini: GeminiClient, |
| siglip: SigLIPEncoder, top_k: int = 20): |
| self.index = index |
| self.gemini = gemini |
| self.siglip = siglip |
| self.top_k = top_k |
|
|
| |
| self.weights = { |
| "visual": 0.35, |
| "caption": 0.35, |
| "detection": 0.30, |
| } |
|
|
| def search(self, query: str, top_k: Optional[int] = None) -> List[QueryResult]: |
| """ |
| Full search pipeline: |
| 1. Decompose query (detect boolean operators) |
| 2. Search each sub-query across all channels |
| 3. Apply boolean operations |
| 4. Return fused, ranked results |
| """ |
| top_k = top_k or self.top_k |
|
|
| |
| decomposed = self.gemini.decompose_query(query) |
| sub_queries = decomposed.get("sub_queries", [query]) |
| operator = decomposed.get("operator", "SINGLE") |
|
|
| print(f"🔍 Query: '{query}'") |
| print(f" Decomposed: {sub_queries} [{operator}]") |
|
|
| |
| sub_results = [] |
| for sq in sub_queries: |
| results = self._search_single(sq, top_k=top_k * 2) |
| sub_results.append(results) |
|
|
| |
| if operator == "AND" and len(sub_results) > 1: |
| final = self._boolean_and(sub_results) |
| elif operator == "OR" and len(sub_results) > 1: |
| final = self._boolean_or(sub_results) |
| else: |
| final = sub_results[0] if sub_results else [] |
|
|
| |
| final = self._deduplicate_temporal(final, window_sec=3.0) |
| final.sort(key=lambda r: r.score, reverse=True) |
| return final[:top_k] |
|
|
| def _search_single(self, query: str, top_k: int = 40) -> List[QueryResult]: |
| """Search a single query across all channels and fuse results.""" |
| results_by_frame: Dict[int, Dict] = defaultdict(lambda: { |
| "scores": {}, "caption": "", "detections": [], "timestamp_sec": 0 |
| }) |
|
|
| |
| try: |
| text_emb = self.siglip.embed_texts([query]) |
| if text_emb.size > 0: |
| visual_hits = self.index.search_visual(text_emb[0], top_k=top_k) |
| for frame_id, score in visual_hits: |
| results_by_frame[frame_id]["scores"]["visual"] = score |
| frame = self.index.get_frame(frame_id) |
| if frame: |
| results_by_frame[frame_id]["timestamp_sec"] = frame["timestamp_sec"] |
| results_by_frame[frame_id]["caption"] = frame.get("caption", "") |
| except Exception as e: |
| print(f" ⚠️ Visual search failed: {e}") |
|
|
| |
| try: |
| query_emb = self.gemini.embed_query(query) |
| if query_emb: |
| caption_hits = self.index.search_captions( |
| np.array(query_emb), top_k=top_k |
| ) |
| for frame_id, score in caption_hits: |
| results_by_frame[frame_id]["scores"]["caption"] = score |
| frame = self.index.get_frame(frame_id) |
| if frame: |
| results_by_frame[frame_id]["timestamp_sec"] = frame["timestamp_sec"] |
| results_by_frame[frame_id]["caption"] = frame.get("caption", "") |
| except Exception as e: |
| print(f" ⚠️ Caption search failed: {e}") |
|
|
| |
| try: |
| detection_hits = self.index.search_detections(query) |
| for det in detection_hits[:top_k]: |
| fid = det["frame_id"] |
| |
| det_score = det["confidence"] |
| existing = results_by_frame[fid]["scores"].get("detection", 0) |
| results_by_frame[fid]["scores"]["detection"] = max(existing, det_score) |
| results_by_frame[fid]["timestamp_sec"] = det["timestamp_sec"] |
| results_by_frame[fid]["caption"] = det.get("caption", "") |
| results_by_frame[fid]["detections"].append(det["label"]) |
| except Exception as e: |
| print(f" ⚠️ Detection search failed: {e}") |
|
|
| |
| fused_results = [] |
| for frame_id, data in results_by_frame.items(): |
| |
| total_score = 0 |
| total_weight = 0 |
| sources = [] |
| for channel, weight in self.weights.items(): |
| if channel in data["scores"]: |
| total_score += data["scores"][channel] * weight |
| total_weight += weight |
| sources.append(channel) |
|
|
| final_score = total_score / total_weight if total_weight > 0 else 0 |
|
|
| fused_results.append(QueryResult( |
| frame_id=frame_id, |
| timestamp_sec=data["timestamp_sec"], |
| score=final_score, |
| caption=data["caption"], |
| detections=list(set(data["detections"])), |
| match_source="+".join(sources), |
| )) |
|
|
| return fused_results |
|
|
| def _boolean_and(self, sub_results: List[List[QueryResult]]) -> List[QueryResult]: |
| """ |
| AND operation: find timestamps where ALL sub-queries match. |
| Uses a temporal window (±5 seconds) for fuzzy timestamp matching. |
| """ |
| if not sub_results: |
| return [] |
|
|
| window = 5.0 |
|
|
| |
| def get_timestamp_set(results: List[QueryResult]) -> List[Tuple[float, QueryResult]]: |
| return [(r.timestamp_sec, r) for r in results] |
|
|
| sets = [get_timestamp_set(sr) for sr in sub_results] |
|
|
| |
| merged = [] |
| for ts1, r1 in sets[0]: |
| all_match = True |
| combined_score = r1.score |
| combined_detections = list(r1.detections) |
|
|
| for other_set in sets[1:]: |
| |
| best_match = None |
| best_dist = float("inf") |
| for ts2, r2 in other_set: |
| dist = abs(ts1 - ts2) |
| if dist < window and dist < best_dist: |
| best_dist = dist |
| best_match = r2 |
|
|
| if best_match is None: |
| all_match = False |
| break |
| else: |
| combined_score = (combined_score + best_match.score) / 2 |
| combined_detections.extend(best_match.detections) |
|
|
| if all_match: |
| merged.append(QueryResult( |
| frame_id=r1.frame_id, |
| timestamp_sec=r1.timestamp_sec, |
| score=combined_score, |
| caption=r1.caption, |
| detections=list(set(combined_detections)), |
| match_source="fused_AND", |
| )) |
|
|
| return merged |
|
|
| def _boolean_or(self, sub_results: List[List[QueryResult]]) -> List[QueryResult]: |
| """OR operation: union of all results.""" |
| seen_frames: Set[int] = set() |
| merged = [] |
|
|
| for result_list in sub_results: |
| for r in result_list: |
| if r.frame_id not in seen_frames: |
| seen_frames.add(r.frame_id) |
| r.match_source += "_OR" |
| merged.append(r) |
|
|
| return merged |
|
|
| def _deduplicate_temporal(self, results: List[QueryResult], |
| window_sec: float = 3.0) -> List[QueryResult]: |
| """Remove results that are too close in time (keep highest score).""" |
| if not results: |
| return [] |
|
|
| results.sort(key=lambda r: r.timestamp_sec) |
| deduped = [results[0]] |
|
|
| for r in results[1:]: |
| if abs(r.timestamp_sec - deduped[-1].timestamp_sec) > window_sec: |
| deduped.append(r) |
| elif r.score > deduped[-1].score: |
| deduped[-1] = r |
|
|
| return deduped |
|
|