File size: 2,374 Bytes
b5cb5bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, Optional


def configure_structured_logging(name: str) -> logging.Logger:
    logger = logging.getLogger(name)
    if logger.handlers:
        return logger

    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    formatter = logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s")
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.propagate = False
    return logger


def _safe_json(payload: Dict[str, Any]) -> str:
    return json.dumps(payload, ensure_ascii=True, default=str)


def log_model_call(
    logger: logging.Logger,
    *,
    provider: str,
    model: str,
    endpoint: str,
    latency_ms: float,
    input_tokens: Optional[int],
    output_tokens: Optional[int],
    status: str,
    error_class: Optional[str] = None,
    error_message: Optional[str] = None,
    task_type: Optional[str] = None,
    request_tag: Optional[str] = None,
    retry_attempt: Optional[int] = None,
    fallback_depth: Optional[int] = None,
    route: Optional[str] = None,
) -> None:
    payload = {
        "ts": datetime.now(timezone.utc).isoformat(),
        "event": "model_call",
        "provider": provider,
        "model": model,
        "endpoint": endpoint,
        "latency_ms": round(latency_ms, 2),
        "input_tokens": input_tokens,
        "output_tokens": output_tokens,
        "status": status,
        "error_class": error_class,
        "error_message": error_message,
        "task_type": task_type,
        "request_tag": request_tag,
        "retry_attempt": retry_attempt,
        "fallback_depth": fallback_depth,
        "route": route,
    }
    if status == "ok":
        logger.info(_safe_json(payload))
    else:
        logger.error(_safe_json(payload))


def log_job_metric(
    logger: logging.Logger,
    *,
    job_name: str,
    run_id: str,
    metric_name: str,
    metric_value: Any,
    extras: Optional[Dict[str, Any]] = None,
) -> None:
    payload: Dict[str, Any] = {
        "ts": datetime.now(timezone.utc).isoformat(),
        "event": "job_metric",
        "job_name": job_name,
        "run_id": run_id,
        "metric_name": metric_name,
        "metric_value": metric_value,
    }
    if extras:
        payload.update(extras)
    logger.info(_safe_json(payload))