linvest21's picture
download
raw
4.85 kB
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from n21.config import load_structured, write_json
from n21.settings import CONFIG_ROOT, SHFT_WORKSPACE_ROOT
from observability.audit_log import utc_now
from rollback.anchor import load_last_good_anchor, validate_rollback_anchor
DEFAULT_CANARY_METRICS = {
"requests": 1000,
"errors": 2,
"error_rate": 0.002,
"critical_prompt_failure_count": 0,
"pii_or_mnpi_release_risk": False,
"aggregate_score_drop_abs": 0.01,
"latency_regression": 0.12,
}
FAIL_CANARY_METRICS = {
"requests": 1000,
"errors": 12,
"error_rate": 0.012,
"critical_prompt_failure_count": 1,
"pii_or_mnpi_release_risk": False,
"aggregate_score_drop_abs": 0.045,
"latency_regression": 0.41,
}
def _load_promotion(run_dir: Path, env: str) -> tuple[dict[str, Any] | None, list[str]]:
path = run_dir / "manifests" / "promotion_manifest.json"
if not path.exists():
return None, [f"missing promotion manifest: {path}"]
try:
promotion = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
return None, [f"invalid promotion manifest JSON: {exc}"]
if promotion.get("env") != env:
return promotion, [f"promotion env mismatch: {promotion.get('env')} != {env}"]
if promotion.get("status") not in {"promotion_planned", "promotion_requested"}:
return promotion, [f"promotion status is not monitorable: {promotion.get('status')}"]
return promotion, []
def _metric_errors(metrics: dict[str, Any], thresholds: dict[str, Any]) -> list[str]:
errors: list[str] = []
if float(metrics["error_rate"]) > float(thresholds["canary_error_rate_max"]):
errors.append(f"canary error rate exceeded: {metrics['error_rate']} > {thresholds['canary_error_rate_max']}")
if int(metrics["critical_prompt_failure_count"]) > int(thresholds["critical_prompt_failure_count_max"]):
errors.append(
"critical prompt failure count exceeded: "
f"{metrics['critical_prompt_failure_count']} > {thresholds['critical_prompt_failure_count_max']}"
)
if bool(metrics["pii_or_mnpi_release_risk"]) and bool(thresholds["pii_or_mnpi_release_risk"]):
errors.append("pii or mnpi release risk detected")
if float(metrics["aggregate_score_drop_abs"]) > float(thresholds["aggregate_score_drop_abs_max"]):
errors.append(
f"aggregate score drop exceeded: {metrics['aggregate_score_drop_abs']} > {thresholds['aggregate_score_drop_abs_max']}"
)
if float(metrics["latency_regression"]) > float(thresholds["latency_regression_max"]):
errors.append(f"latency regression exceeded: {metrics['latency_regression']} > {thresholds['latency_regression_max']}")
return errors
def run_canary_monitor(
run_dir: Path,
*,
run_id: str,
env: str,
mode: str = "pass",
) -> dict[str, Any]:
threshold_config = load_structured(CONFIG_ROOT / "thresholds" / "rollback.yaml")
thresholds = threshold_config["rollback"]
metrics = dict(FAIL_CANARY_METRICS if mode == "fail" else DEFAULT_CANARY_METRICS)
promotion, promotion_errors = _load_promotion(run_dir, env)
rollback_anchor = load_last_good_anchor(SHFT_WORKSPACE_ROOT, env=env)
rollback_anchor_errors = ["missing last_good rollback anchor"] if rollback_anchor is None else validate_rollback_anchor(rollback_anchor)
metric_errors = _metric_errors(metrics, thresholds)
rollback_recommended = bool(promotion_errors or rollback_anchor_errors or metric_errors)
report = {
"run_id": run_id,
"env": env,
"mode": mode,
"status": "rollback_recommended" if rollback_recommended else "healthy",
"rollback_recommended": rollback_recommended,
"rollback_reasons": [*promotion_errors, *rollback_anchor_errors, *metric_errors],
"metrics": metrics,
"thresholds": thresholds,
"promotion_status": promotion.get("status") if promotion else None,
"rollback_anchor": rollback_anchor,
"rollback_anchor_errors": rollback_anchor_errors,
"created_at": utc_now(),
}
write_json(run_dir / "monitoring" / "canary_report.json", report)
if rollback_recommended:
incident = {
"run_id": run_id,
"env": env,
"source": "canary_monitor",
"status": "rollback_recommended",
"rollback_anchor": rollback_anchor,
"rollback_reasons": report["rollback_reasons"],
"canary_report_path": str(run_dir / "monitoring" / "canary_report.json"),
"created_at": utc_now(),
}
write_json(SHFT_WORKSPACE_ROOT / "rollback" / "incident_archive" / f"{run_id}_{env}_canary_incident.json", incident)
return report

Xet Storage Details

Size:
4.85 kB
·
Xet hash:
fc84f06a223dda2b6433df7dc5004de1d5fea692596d20df71895637a00a7dfe

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.