linvest21's picture
download
raw
9.54 kB
from __future__ import annotations
from pathlib import Path
from eval.certification import DEFAULT_BASELINE
from eval.evidence import write_iteration_evidence
from eval.improvement_report import build_improvement_report
from n21.config import write_json
from model_policy.roadmap import candidate_evidence
from observability.audit_log import AuditLogger, utc_now
from observability.heartbeat import heartbeat
from observability.progress import ProgressState
from training.iteration_policy import should_stop
STAGES = [
("observe", "collect health, dataset, and baseline signals"),
("diagnose", "classify current improvement opportunity or blocker"),
("repair", "apply bounded dry-run repair plan"),
("retrain", "plan next adapter-tuning iteration"),
("reeval", "score candidate against baseline and previous iteration"),
]
def _scores_for_iteration(iteration: int) -> dict[str, float]:
# Deterministic dry-run progression: orchestration evidence only.
return {
"aggregate": min(round(0.884 + (0.0095 * iteration), 3), 0.999),
"financebench": min(round(0.891 + (0.0045 * iteration), 3), 0.999),
"convfinqa": min(round(0.862 + (0.0045 * iteration), 3), 0.999),
"phrasebank_macro_f1": min(round(0.921 + (0.0035 * iteration), 3), 0.999),
"private_prompt_replay": min(round(0.940 + (0.014 * iteration), 3), 0.999),
}
def run_self_healing_cycles(
run_dir: Path,
*,
run_id: str,
model_candidate: str,
train_provider: str,
infer_provider: str,
max_cycles: int = 3,
stop_on_certified: bool = True,
) -> dict[str, object]:
logger = AuditLogger(run_dir / "logs" / "audit.jsonl")
cycle_summaries: list[dict[str, object]] = []
previous_scores = DEFAULT_BASELINE
failure_count = 0
roadmap_evidence = candidate_evidence(model_candidate)
for cycle in range(1, max_cycles + 1):
cycle_pct_base = ((cycle - 1) / max_cycles) * 100
for idx, (stage, next_action) in enumerate(STAGES, start=1):
stage_pct = (idx / len(STAGES)) * 100
cycle_pct = min(99.0, cycle_pct_base + (stage_pct / max_cycles))
heartbeat(
logger,
ProgressState(
run_id=run_id,
cycle=cycle,
stage=stage,
step=next_action.split(" ", 1)[0],
progress_step_pct=100.0,
progress_stage_pct=stage_pct,
progress_cycle_pct=cycle_pct,
train_provider=train_provider,
infer_provider=infer_provider,
model_candidate=model_candidate,
status="running",
blocker=None,
next_action=next_action,
),
)
current_scores = _scores_for_iteration(cycle)
report_vs_prod = build_improvement_report(DEFAULT_BASELINE, current_scores)
report_vs_previous = build_improvement_report(previous_scores, current_scores)
aggregate_abs = report_vs_prod["improvements"]["aggregate"]["abs"]
fixture_ready = aggregate_abs >= 0.005 and current_scores["private_prompt_replay"] >= 0.95
stop, stop_reason = should_stop(cycle, failure_count, fixture_ready)
gate_result = "fixture_pass" if fixture_ready else "fixture_hold"
if not fixture_ready:
failure_count += 1
evidence = {
"run_id": run_id,
"iteration_id": f"iter_{cycle:03d}",
"parent_iteration_id": None if cycle == 1 else f"iter_{cycle - 1:03d}",
"cycle": cycle,
"base_model": model_candidate,
"model_roadmap_evidence": roadmap_evidence,
"train_provider": train_provider,
"infer_provider": infer_provider,
"repair_reason": "bootstrap_improvement_cycle" if cycle == 1 else "targeted_prompt_replay_hardening",
"changes": {
"dataset_delta": "dry-run: add targeted prompt replay examples for observed weak spots",
"training_delta": "dry-run: plan adapter continuation iteration with bounded budget",
"repair_delta": "dry-run: no unsafe live change applied",
},
"scoring": {
"mode": "dry_run_fixture_scores",
"quality_signal": "orchestration_only",
"promotion_eligible": False,
"notes": [
"Cycle scores are deterministic fixtures for state-machine validation.",
"Do not use these cycle scores as model-quality or production-promotion evidence.",
"Use paired_eval_report.json for live candidate-vs-baseline evidence.",
],
},
"scores": {
"baseline_prod": DEFAULT_BASELINE,
"previous_iteration": previous_scores,
"current_iteration": current_scores,
},
"improvement_vs_prod": report_vs_prod["improvements"],
"improvement_vs_previous": report_vs_previous["improvements"],
"gate_result": gate_result,
"stop_reason": stop_reason,
"promotion_recommendation": "fixture_only_await_paired_eval" if fixture_ready else "continue_cycle",
"rationale": [
*report_vs_prod["rationale"],
f"Cycle {cycle} fixture gate result: {gate_result}.",
f"Stop decision: {stop_reason}.",
"This is orchestration evidence only, not model-quality evidence.",
],
"created_at": utc_now(),
}
evidence_dir = run_dir / "iterations" / f"iter_{cycle:03d}"
write_iteration_evidence(evidence_dir, evidence)
report_vs_prod_with_model = {
**report_vs_prod,
"model_roadmap_evidence": roadmap_evidence,
}
write_json(evidence_dir / "improvement_report.json", report_vs_prod_with_model)
print(
"[SHFT cycle evidence] "
f"run={run_id} cycle={cycle}/{max_cycles} iteration=iter_{cycle:03d} "
f"fixture_gate={gate_result} aggregate_fixture={current_scores['aggregate']:.3f} "
f"aggregate_delta_abs={report_vs_prod['improvements']['aggregate']['abs']:.3f} "
f"aggregate_delta_pct={report_vs_prod['improvements']['aggregate']['pct']:.4f}% "
f"private_replay={current_scores['private_prompt_replay']:.3f} "
f"private_replay_delta_pct={report_vs_prod['improvements']['private_prompt_replay']['pct']:.4f}% "
"quality_signal=orchestration_only "
f"evidence={evidence_dir / 'iteration_evidence.json'}"
)
print(
"[SHFT cycle evidence] "
"MODEL_QUALITY_PENDING=true "
"fixture_scores_are_not_model_eval=true "
"required_live_artifact=eval/paired_eval_report.json"
)
print(
"[SHFT repair trace] "
f"run={run_id} cycle={cycle} observe=complete diagnose=complete "
f"repair_reason={evidence['repair_reason']} retrain=planned reeval=complete "
f"next={'await_paired_eval' if stop else 'continue'}"
)
cycle_summaries.append(
{
"cycle": cycle,
"iteration_id": evidence["iteration_id"],
"gate_result": gate_result,
"scoring_mode": "dry_run_fixture_scores",
"quality_signal": "orchestration_only",
"promotion_recommendation": evidence["promotion_recommendation"],
"stop_reason": stop_reason,
"model_roadmap_evidence": roadmap_evidence,
"aggregate_improvement_pct": report_vs_prod["improvements"]["aggregate"]["pct"],
"private_replay_improvement_pct": report_vs_prod["improvements"]["private_prompt_replay"]["pct"],
"evidence_path": str(evidence_dir / "iteration_evidence.json"),
}
)
heartbeat(
logger,
ProgressState(
run_id=run_id,
cycle=cycle,
stage="evidence",
step="write_iteration_evidence",
progress_step_pct=100.0,
progress_stage_pct=100.0,
progress_cycle_pct=100.0 if stop else min(99.0, (cycle / max_cycles) * 100),
train_provider=train_provider,
infer_provider=infer_provider,
model_candidate=model_candidate,
status="orchestration_fixture_ready" if fixture_ready else "continuing",
blocker=None if fixture_ready else "fixture_candidate_not_ready",
next_action="await paired model-quality evaluation" if stop else "continue next self-healing cycle",
),
)
previous_scores = current_scores
if stop and stop_on_certified:
break
summary = {
"run_id": run_id,
"model_candidate": model_candidate,
"model_roadmap_evidence": roadmap_evidence,
"train_provider": train_provider,
"infer_provider": infer_provider,
"max_cycles": max_cycles,
"stop_on_certified": stop_on_certified,
"cycles_completed": len(cycle_summaries),
"cycles": cycle_summaries,
"created_at": utc_now(),
}
write_json(run_dir / "heal_decisions" / "cycle_summary.json", summary)
return summary

Xet Storage Details

Size:
9.54 kB
·
Xet hash:
20a8052ed73e748b26af3526549377b313f0e0975a1ec70a53be3b382a1efb0b

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.