"""Persistent caption store backed by a local JSON file.""" import json import os import time from typing import Optional # if os.path.exists("/data"): # CAPTION_STORE_PATH = "/data/captions.json" # else: # CAPTION_STORE_PATH = "./captions.json" def _load() -> dict: if not os.path.exists(CAPTION_STORE_PATH): return {} with open(CAPTION_STORE_PATH, "r", encoding="utf-8") as f: return json.load(f) def _save(store: dict) -> None: with open(CAPTION_STORE_PATH, "w", encoding="utf-8") as f: json.dump(store, f, indent=2, ensure_ascii=False) def _as_list(val) -> list: """Ensure a value is always a list — handles string/list/None from model output.""" if not val: return [] if isinstance(val, list): return val return [val] # single string → wrap def _try_parse_json(raw: str) -> dict | None: """Parse JSON, with a fallback that inserts missing commas between fields.""" try: return json.loads(raw) except json.JSONDecodeError: pass # Common model mistake: missing comma after a string value before next key import re fixed = re.sub(r'"\s*\n(\s*")', r'",\n\1', raw) try: return json.loads(fixed) except json.JSONDecodeError: return None def flatten_metadata(meta: dict) -> str: """Convert structured JSON metadata into a rich text string for embedding.""" parts = [] if meta.get("summary"): parts.append(meta["summary"]) subj = meta.get("subjects", {}) # NEW: Include headcount explicitly for numerical search queries (e.g., "two people") if subj.get("people_count") is not None: parts.append(f"People count: {subj['people_count']}") if _as_list(subj.get("attire")): parts.append("Attire: " + ", ".join(_as_list(subj["attire"]))) if _as_list(subj.get("primary_subjects")): parts.append("Subjects: " + ", ".join(_as_list(subj["primary_subjects"]))) if _as_list(subj.get("relationships")): parts.append("Relationships: " + ", ".join(_as_list(subj["relationships"]))) scene = meta.get("scene", {}) if scene.get("location_type"): parts.append("Location: " + scene["location_type"]) if scene.get("environment"): parts.append("Environment: " + scene["environment"]) if _as_list(scene.get("setting_details")): parts.append("Setting: " + ", ".join(_as_list(scene["setting_details"]))) actions = meta.get("actions", {}) if actions.get("primary_action"): parts.append("Action: " + actions["primary_action"]) if _as_list(actions.get("body_language")): parts.append("Body language: " + ", ".join(_as_list(actions["body_language"]))) lighting = meta.get("lighting", {}) if lighting.get("lighting_style"): parts.append("Lighting: " + lighting["lighting_style"]) if lighting.get("time_of_day_estimate"): parts.append("Time of day: " + lighting["time_of_day_estimate"]) mood = meta.get("mood", {}) if _as_list(mood.get("primary_emotions")): parts.append("Emotions: " + ", ".join(_as_list(mood["primary_emotions"]))) if mood.get("atmosphere"): parts.append("Atmosphere: " + mood["atmosphere"]) comp = meta.get("composition", {}) if comp.get("shot_type"): parts.append("Shot: " + comp["shot_type"]) if comp.get("camera_angle"): parts.append("Angle: " + comp["camera_angle"]) tech = meta.get("technical_cues", {}) if _as_list(tech.get("color_palette")): parts.append("Colors: " + ", ".join(_as_list(tech["color_palette"]))) # NEW: Missing depth of field (crucial for portrait photography search!) if tech.get("depth_of_field"): parts.append("Depth of field: " + tech["depth_of_field"]) if _as_list(meta.get("search_tags")): parts.append("Tags: " + ", ".join(_as_list(meta["search_tags"]))) if _as_list(meta.get("archive_keywords")): parts.append("Keywords: " + ", ".join(_as_list(meta["archive_keywords"]))) return " | ".join(parts) def get_entry(image_path: str) -> Optional[dict]: return _load().get(image_path) def upsert_entry(image_path: str, caption: str, mtime: float, collection: str = "General") -> None: """ caption may be raw JSON string (new structured format) or plain text (legacy). We store both the raw string and a flattened search_text. """ store = _load() # Try to parse as structured JSON (with comma-fix fallback) search_text = caption meta = _try_parse_json(caption) if meta: search_text = flatten_metadata(meta) store[image_path] = { "caption": caption, # raw (JSON string or plain text) "search_text": search_text, # flattened for embedding "mtime": mtime, "collection": collection, "ingested_at": time.time(), } _save(store) def mark_error(image_path: str, error: str, collection: str = "General") -> None: store = _load() store[image_path] = { "caption": None, "error": error, "mtime": os.path.getmtime(image_path) if os.path.exists(image_path) else 0, "collection": collection, "ingested_at": time.time(), } _save(store) def all_entries() -> dict: """Return only entries that have a valid caption.""" return { path: data for path, data in _load().items() if data.get("caption") } def entry_count() -> int: return len(all_entries()) def get_all_collections() -> list[str]: """Return a sorted list of all unique collection names.""" store = _load() collections = set() for entry in store.values(): coll = entry.get("collection") if coll: collections.add(coll) if not collections: return ["General"] return sorted(list(collections)) def get_entries_by_collection(collection: str) -> dict: """Return all valid entries belonging to a specific collection.""" return { path: data for path, data in all_entries().items() if data.get("collection") == collection }