""" backend/app/services/github_log.py Durable interaction log backed by a JSONL file in the PersonaBot GitHub repo. HuggingFace Spaces free tier destroys in-Space storage (SQLite, /data/) on every restart, maintenance window, and idle reclamation. Every interaction written only to SQLite is silently reset to zero — the self-improvement loop accumulates nothing across restarts. This service appends each interaction as a single JSON line to a committed file in the PersonaBot repo via the GitHub Contents API, using PERSONABOT_WRITE_TOKEN. The file survives Space restarts because it lives in Git, not on the Space filesystem. On Space startup, if SQLite is empty (< 10 rows), the last 500 lines are fetched from this file and replayed into SQLite so conversation history and training signals are available immediately without a full log replay on every request. Negative feedback (mark_last_negative) is durably recorded by appending a correction record {type:"feedback", feedback:-1, session_id:...} that data_prep.py interprets when building training triplets. Failure modes ───────────── If the GitHub API call fails (rate limit, network error, 409 SHA conflict), the error is logged at WARNING level and the interaction is NOT lost — it is always written to SQLite first. The durable log is a best-effort durability layer, not a primary store. """ from __future__ import annotations import asyncio import base64 import json import logging from datetime import datetime, timezone import httpx logger = logging.getLogger(__name__) # Fixed path inside the PersonaBot repository. The retrain workflow reads this # file directly from the repo checkout — no admin endpoint download required. _LOG_PATH = "data/interactions.jsonl" _API_TIMEOUT = 20 class GithubLog: """ Append-only JSONL log backed by the PersonaBot GitHub repo. All writes are fire-and-forget background tasks so they never add latency to the SSE stream. This object is created once at startup and shared across all requests via app.state.github_log. """ def __init__(self, write_token: str, repo: str) -> None: self._token = write_token self._repo = repo self._api_url = f"https://api.github.com/repos/{repo}/contents/{_LOG_PATH}" self._headers = { "Authorization": f"Bearer {write_token}", "Accept": "application/vnd.github+json", } @property def enabled(self) -> bool: return bool(self._token) def append(self, record: dict) -> None: """ Schedule a background task to append `record` to the durable JSONL log. Returns immediately — never blocks the request path. """ if not self.enabled: return # asyncio.create_task requires a running event loop; log_eval is async so this is safe. asyncio.create_task(self._append_bg(record)) async def _append_bg(self, record: dict) -> None: try: async with httpx.AsyncClient(timeout=_API_TIMEOUT) as client: get_r = await client.get(self._api_url, headers=self._headers) if get_r.status_code == 200: data = get_r.json() sha: str | None = data["sha"] current = base64.b64decode( data["content"].replace("\n", "") ).decode("utf-8") elif get_r.status_code == 404: sha = None current = "" else: logger.warning( "GithubLog GET failed (%d) — interaction not logged durably.", get_r.status_code, ) return new_content = current.rstrip("\n") + "\n" + json.dumps(record) + "\n" encoded = base64.b64encode(new_content.encode("utf-8")).decode("ascii") payload: dict = { "message": "log: append interaction [skip ci]", "content": encoded, } if sha: payload["sha"] = sha put_r = await client.put( self._api_url, headers=self._headers, json=payload ) if put_r.status_code not in (200, 201): # 409 = SHA conflict (two concurrent appends) — rare for a portfolio bot. # The interaction is safe in SQLite; this is a best-effort durability layer. logger.warning( "GithubLog PUT failed (%d) — interaction not logged durably.", put_r.status_code, ) except Exception as exc: logger.warning("GithubLog.append error: %s", exc) async def load_recent(self, n: int = 500) -> list[dict]: """ Fetch the last `n` interaction records from the durable log. Used at Space startup to reconstruct SQLite after an ephemeral restart. Returns [] if the file doesn't exist or if the token is not configured. """ if not self.enabled: return [] try: async with httpx.AsyncClient(timeout=_API_TIMEOUT) as client: r = await client.get(self._api_url, headers=self._headers) if r.status_code == 404: return [] if r.status_code != 200: logger.warning("GithubLog.load_recent GET failed (%d).", r.status_code) return [] content = base64.b64decode( r.json()["content"].replace("\n", "") ).decode("utf-8") lines = [ln.strip() for ln in content.splitlines() if ln.strip()] records: list[dict] = [] for line in lines[-n:]: try: records.append(json.loads(line)) except json.JSONDecodeError: pass return records except Exception as exc: logger.warning("GithubLog.load_recent error: %s", exc) return [] def append_feedback(self, session_id: str, feedback: int) -> None: """ Durably record a feedback update without rewriting an existing line. data_prep.py applies these correction records when building triplets. """ if not self.enabled: return record = { "type": "feedback", "session_id": session_id, "feedback": feedback, "timestamp": datetime.now(tz=timezone.utc).isoformat(), } asyncio.create_task(self._append_bg(record))