ground-zero / src /engine /turn_logger.py
jefffffff9
Per-turn telemetry + RAG few-shot on phrasebook miss
064d08b
"""TurnLogger — per-turn JSONL telemetry for the minimal baseline.
Every voice or text turn writes one self-contained line to
`data/field_turns.jsonl` (path overridable via FIELD_TURNS_PATH).
This is the foundation for:
- field-test review (read the JSONL after a session)
- phrasebook hit-rate measurement
- LLM A/B comparisons
- eventually, Stage-4 LoRA training-data curation
(every line already pairs an English/French input with a vetted
Bambara/Pular reply; we'll filter on phrasebook-hit + user-confirmed
turns later).
Schema (one JSON object per line):
{
"ts": "<ISO-8601 UTC>",
"tab": "voice" | "text",
"input_lang": "bam" | "ful" | "fr" | "en" | null,
"output_lang": "bam" | "ful" | "fr" | "en",
"user_text": "<raw input from text tab, or transcript for voice tab>",
"transcript": "<whisper output, voice tab only>" | null,
"transcribe_ms": <int> | null,
"phrasebook": { match, score, category, source, target } | null,
"llm_model": "<model id>" | null,
"llm_ms": <int> | null,
"reply_text": "<final reply that fed TTS>",
"tts_ms": <int> | null,
"total_ms": <int>,
"error": "<short error string>" | null
}
Notes:
- File path is gitignored (data/ is excluded by .gitignore).
- Append mode + line-buffered + lock — safe for the single-process Gradio
server. Not designed for multi-worker writes.
"""
from __future__ import annotations
import json
import logging
import os
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
logger = logging.getLogger(__name__)
_DEFAULT_PATH = (
Path(__file__).resolve().parent.parent.parent / "data" / "field_turns.jsonl"
)
def _utcnow_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
class TurnLogger:
"""Append-only JSONL logger. Thread-safe for one process."""
def __init__(self, path: Optional[str] = None) -> None:
env_path = os.environ.get("FIELD_TURNS_PATH")
self.path = Path(path or env_path or _DEFAULT_PATH)
self.path.parent.mkdir(parents=True, exist_ok=True)
self._lock = threading.Lock()
logger.info("TurnLogger writing to %s", self.path)
def log(self, **fields: Any) -> None:
"""Write one row. Always sets ts; leaves the rest to the caller.
Never raises — telemetry must not break the user-facing pipeline.
"""
row = {"ts": _utcnow_iso(), **fields}
try:
line = json.dumps(row, ensure_ascii=False)
with self._lock, self.path.open("a", encoding="utf-8") as fh:
fh.write(line + "\n")
except Exception as exc: # pragma: no cover
logger.warning("TurnLogger.log failed: %s", exc)