| """Persistent caption store backed by a local JSON file."""
|
|
|
| import json
|
| import os
|
| import time
|
| from typing import Optional
|
|
|
|
|
|
|
|
|
|
|
| CAPTION_STORE_PATH = "./captions.json"
|
|
|
|
|
| def _load() -> dict:
|
| if not os.path.exists(CAPTION_STORE_PATH):
|
| return {}
|
| with open(CAPTION_STORE_PATH, "r", encoding="utf-8") as f:
|
| return json.load(f)
|
|
|
|
|
| def _save(store: dict) -> None:
|
| with open(CAPTION_STORE_PATH, "w", encoding="utf-8") as f:
|
| json.dump(store, f, indent=2, ensure_ascii=False)
|
|
|
|
|
| def _as_list(val) -> list:
|
| """Ensure a value is always a list — handles string/list/None from model output."""
|
| if not val:
|
| return []
|
| if isinstance(val, list):
|
| return val
|
| return [val]
|
|
|
|
|
| def _try_parse_json(raw: str) -> dict | None:
|
| """Parse JSON, with a fallback that inserts missing commas between fields."""
|
| try:
|
| return json.loads(raw)
|
| except json.JSONDecodeError:
|
| pass
|
|
|
| import re
|
| fixed = re.sub(r'"\s*\n(\s*")', r'",\n\1', raw)
|
| try:
|
| return json.loads(fixed)
|
| except json.JSONDecodeError:
|
| return None
|
|
|
|
|
| def flatten_metadata(meta: dict) -> str:
|
| """Convert structured JSON metadata into a rich text string for embedding."""
|
| parts = []
|
|
|
| if meta.get("summary"):
|
| parts.append(meta["summary"])
|
|
|
| subj = meta.get("subjects", {})
|
|
|
| if subj.get("people_count") is not None:
|
| parts.append(f"People count: {subj['people_count']}")
|
| if _as_list(subj.get("attire")):
|
| parts.append("Attire: " + ", ".join(_as_list(subj["attire"])))
|
| if _as_list(subj.get("primary_subjects")):
|
| parts.append("Subjects: " + ", ".join(_as_list(subj["primary_subjects"])))
|
| if _as_list(subj.get("relationships")):
|
| parts.append("Relationships: " + ", ".join(_as_list(subj["relationships"])))
|
|
|
| scene = meta.get("scene", {})
|
| if scene.get("location_type"):
|
| parts.append("Location: " + scene["location_type"])
|
| if scene.get("environment"):
|
| parts.append("Environment: " + scene["environment"])
|
| if _as_list(scene.get("setting_details")):
|
| parts.append("Setting: " + ", ".join(_as_list(scene["setting_details"])))
|
|
|
| actions = meta.get("actions", {})
|
| if actions.get("primary_action"):
|
| parts.append("Action: " + actions["primary_action"])
|
| if _as_list(actions.get("body_language")):
|
| parts.append("Body language: " + ", ".join(_as_list(actions["body_language"])))
|
|
|
| lighting = meta.get("lighting", {})
|
| if lighting.get("lighting_style"):
|
| parts.append("Lighting: " + lighting["lighting_style"])
|
| if lighting.get("time_of_day_estimate"):
|
| parts.append("Time of day: " + lighting["time_of_day_estimate"])
|
|
|
| mood = meta.get("mood", {})
|
| if _as_list(mood.get("primary_emotions")):
|
| parts.append("Emotions: " + ", ".join(_as_list(mood["primary_emotions"])))
|
| if mood.get("atmosphere"):
|
| parts.append("Atmosphere: " + mood["atmosphere"])
|
|
|
| comp = meta.get("composition", {})
|
| if comp.get("shot_type"):
|
| parts.append("Shot: " + comp["shot_type"])
|
| if comp.get("camera_angle"):
|
| parts.append("Angle: " + comp["camera_angle"])
|
|
|
| tech = meta.get("technical_cues", {})
|
| if _as_list(tech.get("color_palette")):
|
| parts.append("Colors: " + ", ".join(_as_list(tech["color_palette"])))
|
|
|
| if tech.get("depth_of_field"):
|
| parts.append("Depth of field: " + tech["depth_of_field"])
|
|
|
| if _as_list(meta.get("search_tags")):
|
| parts.append("Tags: " + ", ".join(_as_list(meta["search_tags"])))
|
| if _as_list(meta.get("archive_keywords")):
|
| parts.append("Keywords: " + ", ".join(_as_list(meta["archive_keywords"])))
|
|
|
| return " | ".join(parts)
|
|
|
|
|
| def get_entry(image_path: str) -> Optional[dict]:
|
| return _load().get(image_path)
|
|
|
|
|
| def upsert_entry(image_path: str, caption: str, mtime: float, collection: str = "General") -> None:
|
| """
|
| caption may be raw JSON string (new structured format) or plain text (legacy).
|
| We store both the raw string and a flattened search_text.
|
| """
|
| store = _load()
|
|
|
|
|
| search_text = caption
|
| meta = _try_parse_json(caption)
|
| if meta:
|
| search_text = flatten_metadata(meta)
|
|
|
| store[image_path] = {
|
| "caption": caption,
|
| "search_text": search_text,
|
| "mtime": mtime,
|
| "collection": collection,
|
| "ingested_at": time.time(),
|
| }
|
| _save(store)
|
|
|
|
|
| def mark_error(image_path: str, error: str, collection: str = "General") -> None:
|
| store = _load()
|
| store[image_path] = {
|
| "caption": None,
|
| "error": error,
|
| "mtime": os.path.getmtime(image_path) if os.path.exists(image_path) else 0,
|
| "collection": collection,
|
| "ingested_at": time.time(),
|
| }
|
| _save(store)
|
|
|
|
|
| def all_entries() -> dict:
|
| """Return only entries that have a valid caption."""
|
| return {
|
| path: data
|
| for path, data in _load().items()
|
| if data.get("caption")
|
| }
|
|
|
|
|
| def entry_count() -> int:
|
| return len(all_entries())
|
|
|
|
|
| def get_all_collections() -> list[str]:
|
| """Return a sorted list of all unique collection names."""
|
| store = _load()
|
| collections = set()
|
| for entry in store.values():
|
| coll = entry.get("collection")
|
| if coll:
|
| collections.add(coll)
|
| if not collections:
|
| return ["General"]
|
| return sorted(list(collections))
|
|
|
|
|
| def get_entries_by_collection(collection: str) -> dict:
|
| """Return all valid entries belonging to a specific collection."""
|
| return {
|
| path: data
|
| for path, data in all_entries().items()
|
| if data.get("collection") == collection
|
| }
|
|
|