Buckets:
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Any | |
| from n21.config import load_structured, write_json | |
| from n21.settings import CONFIG_ROOT, SHFT_WORKSPACE_ROOT | |
| from observability.audit_log import utc_now | |
| from rollback.anchor import load_last_good_anchor, validate_rollback_anchor | |
| DEFAULT_CANARY_METRICS = { | |
| "requests": 1000, | |
| "errors": 2, | |
| "error_rate": 0.002, | |
| "critical_prompt_failure_count": 0, | |
| "pii_or_mnpi_release_risk": False, | |
| "aggregate_score_drop_abs": 0.01, | |
| "latency_regression": 0.12, | |
| } | |
| FAIL_CANARY_METRICS = { | |
| "requests": 1000, | |
| "errors": 12, | |
| "error_rate": 0.012, | |
| "critical_prompt_failure_count": 1, | |
| "pii_or_mnpi_release_risk": False, | |
| "aggregate_score_drop_abs": 0.045, | |
| "latency_regression": 0.41, | |
| } | |
| def _load_promotion(run_dir: Path, env: str) -> tuple[dict[str, Any] | None, list[str]]: | |
| path = run_dir / "manifests" / "promotion_manifest.json" | |
| if not path.exists(): | |
| return None, [f"missing promotion manifest: {path}"] | |
| try: | |
| promotion = json.loads(path.read_text(encoding="utf-8")) | |
| except json.JSONDecodeError as exc: | |
| return None, [f"invalid promotion manifest JSON: {exc}"] | |
| if promotion.get("env") != env: | |
| return promotion, [f"promotion env mismatch: {promotion.get('env')} != {env}"] | |
| if promotion.get("status") not in {"promotion_planned", "promotion_requested"}: | |
| return promotion, [f"promotion status is not monitorable: {promotion.get('status')}"] | |
| return promotion, [] | |
| def _metric_errors(metrics: dict[str, Any], thresholds: dict[str, Any]) -> list[str]: | |
| errors: list[str] = [] | |
| if float(metrics["error_rate"]) > float(thresholds["canary_error_rate_max"]): | |
| errors.append(f"canary error rate exceeded: {metrics['error_rate']} > {thresholds['canary_error_rate_max']}") | |
| if int(metrics["critical_prompt_failure_count"]) > int(thresholds["critical_prompt_failure_count_max"]): | |
| errors.append( | |
| "critical prompt failure count exceeded: " | |
| f"{metrics['critical_prompt_failure_count']} > {thresholds['critical_prompt_failure_count_max']}" | |
| ) | |
| if bool(metrics["pii_or_mnpi_release_risk"]) and bool(thresholds["pii_or_mnpi_release_risk"]): | |
| errors.append("pii or mnpi release risk detected") | |
| if float(metrics["aggregate_score_drop_abs"]) > float(thresholds["aggregate_score_drop_abs_max"]): | |
| errors.append( | |
| f"aggregate score drop exceeded: {metrics['aggregate_score_drop_abs']} > {thresholds['aggregate_score_drop_abs_max']}" | |
| ) | |
| if float(metrics["latency_regression"]) > float(thresholds["latency_regression_max"]): | |
| errors.append(f"latency regression exceeded: {metrics['latency_regression']} > {thresholds['latency_regression_max']}") | |
| return errors | |
| def run_canary_monitor( | |
| run_dir: Path, | |
| *, | |
| run_id: str, | |
| env: str, | |
| mode: str = "pass", | |
| ) -> dict[str, Any]: | |
| threshold_config = load_structured(CONFIG_ROOT / "thresholds" / "rollback.yaml") | |
| thresholds = threshold_config["rollback"] | |
| metrics = dict(FAIL_CANARY_METRICS if mode == "fail" else DEFAULT_CANARY_METRICS) | |
| promotion, promotion_errors = _load_promotion(run_dir, env) | |
| rollback_anchor = load_last_good_anchor(SHFT_WORKSPACE_ROOT, env=env) | |
| rollback_anchor_errors = ["missing last_good rollback anchor"] if rollback_anchor is None else validate_rollback_anchor(rollback_anchor) | |
| metric_errors = _metric_errors(metrics, thresholds) | |
| rollback_recommended = bool(promotion_errors or rollback_anchor_errors or metric_errors) | |
| report = { | |
| "run_id": run_id, | |
| "env": env, | |
| "mode": mode, | |
| "status": "rollback_recommended" if rollback_recommended else "healthy", | |
| "rollback_recommended": rollback_recommended, | |
| "rollback_reasons": [*promotion_errors, *rollback_anchor_errors, *metric_errors], | |
| "metrics": metrics, | |
| "thresholds": thresholds, | |
| "promotion_status": promotion.get("status") if promotion else None, | |
| "rollback_anchor": rollback_anchor, | |
| "rollback_anchor_errors": rollback_anchor_errors, | |
| "created_at": utc_now(), | |
| } | |
| write_json(run_dir / "monitoring" / "canary_report.json", report) | |
| if rollback_recommended: | |
| incident = { | |
| "run_id": run_id, | |
| "env": env, | |
| "source": "canary_monitor", | |
| "status": "rollback_recommended", | |
| "rollback_anchor": rollback_anchor, | |
| "rollback_reasons": report["rollback_reasons"], | |
| "canary_report_path": str(run_dir / "monitoring" / "canary_report.json"), | |
| "created_at": utc_now(), | |
| } | |
| write_json(SHFT_WORKSPACE_ROOT / "rollback" / "incident_archive" / f"{run_id}_{env}_canary_incident.json", incident) | |
| return report | |
Xet Storage Details
- Size:
- 4.85 kB
- Xet hash:
- fc84f06a223dda2b6433df7dc5004de1d5fea692596d20df71895637a00a7dfe
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.