Spaces:
Sleeping
Sleeping
| """ | |
| Deterministic Auditor policy backed by the rule-based Track A / Track B graders. | |
| The Auditor is intentionally **deterministic by design**. It consumes | |
| the full audit-phase observation, runs every rule-based audit in | |
| ``graders/auditor_track_a.py`` (rationale citation, calibration, | |
| cross-ad consistency, bias, rationale↔verdict coherence) and | |
| ``graders/auditor_track_b.py`` (intrinsic, grounding, real-world, | |
| signal-realism, novelty plausibility), queues one | |
| ``flag_investigator`` / ``flag_fraudster`` action per flag, and | |
| submits an audit report containing the flag payloads and aggregated | |
| audit scores. | |
| Why deterministic | |
| ----------------- | |
| The Auditor's flag stream is the **reward source** for both the | |
| Investigator (Track A flags drive the rationale-quality penalty in | |
| ``multi_agent_rewards.investigator_reward``) and the Fraudster (Track B | |
| plausibility drives the survival credit in | |
| ``multi_agent_rewards.fraudster_reward``). Keeping it deterministic | |
| means the reward function is interpretable, inspectable, and free of | |
| LLM noise / cost / latency. It also mirrors how real ad-policy review | |
| teams operate: rule-based scorecards layered on top of model verdicts. | |
| Future scope | |
| ------------ | |
| Replacing this with an LLM-backed Auditor is a clean drop-in: the | |
| ``run_full_audit`` orchestrator in | |
| :mod:`counterfeint.graders.auditor_pipeline` returns a typed | |
| ``FullAuditResult`` that any LLM policy could synthesise in one shot. | |
| That swap is **out of scope for the hackathon submission** — we keep | |
| the deterministic path so the reward signal is reproducible. | |
| """ | |
| from __future__ import annotations | |
| from typing import Any, Dict, List, Optional | |
| from ..graders.auditor_pipeline import run_full_audit | |
| from ..graders.base_grader import ( | |
| EpisodeRecord, | |
| LinkResult, | |
| VerdictResult, | |
| ) | |
| from ..models import AuditorAction | |
| from ._base import PolicyBase | |
| class HeuristicAuditor(PolicyBase): | |
| """Scripted Auditor — runs rule-based graders and submits a final report.""" | |
| def __init__(self) -> None: | |
| self._queued: List[AuditorAction] = [] | |
| self._submitted: bool = False | |
| self._report: Optional[Dict[str, Any]] = None | |
| def reset(self) -> None: | |
| self._queued = [] | |
| self._submitted = False | |
| self._report = None | |
| def act(self, observation: Dict[str, Any]) -> AuditorAction: | |
| if observation.get("phase") == "done" or self._submitted: | |
| return AuditorAction( | |
| action_type="submit_audit_report", | |
| audit_report=self._report or {}, | |
| note="HeuristicAuditor: episode already done.", | |
| ) | |
| if not self._queued: | |
| self._queued, self._report = self._plan(observation) | |
| if self._queued: | |
| action = self._queued.pop(0) | |
| if action.action_type == "submit_audit_report": | |
| self._submitted = True | |
| return action | |
| self._submitted = True | |
| return AuditorAction( | |
| action_type="submit_audit_report", | |
| audit_report=self._report or {}, | |
| note="HeuristicAuditor: queue empty, submitting final report.", | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Internal planning | |
| # ------------------------------------------------------------------ | |
| def _plan( | |
| self, observation: Dict[str, Any] | |
| ) -> tuple[List[AuditorAction], Dict[str, Any]]: | |
| record = self._build_record(observation) | |
| investigator_actions = observation.get("investigator_actions", []) or [] | |
| fraudster_proposals = observation.get("fraudster_proposals", []) or [] | |
| investigation_data_seen = observation.get("investigation_data_seen", {}) or {} | |
| audit = run_full_audit( | |
| record=record, | |
| investigator_action_log=investigator_actions, | |
| investigation_data_seen=investigation_data_seen, | |
| fraudster_proposal_log=fraudster_proposals, | |
| ) | |
| actions: List[AuditorAction] = [] | |
| for f in audit.track_b_flags: | |
| actions.append( | |
| AuditorAction( | |
| action_type="flag_fraudster", | |
| target_ad_id=f.target_ad_id, | |
| flag_type=f.flag_type, | |
| severity=f.severity, | |
| note=f.note, | |
| ) | |
| ) | |
| for f in audit.track_a_flags: | |
| actions.append( | |
| AuditorAction( | |
| action_type="flag_investigator", | |
| target_ad_id=f.target_ad_id, | |
| flag_type=f.flag_type, | |
| severity=f.severity, | |
| note=f.note, | |
| ) | |
| ) | |
| report: Dict[str, Any] = { | |
| "track_a_flags": [f.model_dump() for f in audit.track_a_flags], | |
| "track_b_flags": [f.model_dump() for f in audit.track_b_flags], | |
| "investigator_audit_score": round(audit.investigator_audit_score, 4), | |
| "fraudster_plausibility_score": round(audit.fraudster_plausibility_score, 4), | |
| "per_ad_plausibility": { | |
| k: round(v, 4) for k, v in audit.per_ad_plausibility.items() | |
| }, | |
| "notes": ( | |
| f"HeuristicAuditor: {len(audit.track_a_flags)} Track A flag(s), " | |
| f"{len(audit.track_b_flags)} Track B flag(s); " | |
| f"inv_audit={audit.investigator_audit_score:.2f}, " | |
| f"fraud_plaus={audit.fraudster_plausibility_score:.2f}." | |
| ), | |
| } | |
| actions.append( | |
| AuditorAction( | |
| action_type="submit_audit_report", | |
| audit_report=report, | |
| note="HeuristicAuditor: submitting derived report.", | |
| ) | |
| ) | |
| return actions, report | |
| # ------------------------------------------------------------------ | |
| # EpisodeRecord reconstruction from the auditor observation | |
| # ------------------------------------------------------------------ | |
| def _build_record(self, observation: Dict[str, Any]) -> Optional[EpisodeRecord]: | |
| record_payload = observation.get("full_episode_record") or {} | |
| if not record_payload: | |
| return None | |
| verdict_entries = record_payload.get("verdicts") or [] | |
| link_entries = record_payload.get("links") or [] | |
| ad_entries = record_payload.get("ads") or [] | |
| verdicts: List[VerdictResult] = [] | |
| for v in verdict_entries: | |
| if "verdict" not in v: | |
| continue | |
| verdicts.append( | |
| VerdictResult( | |
| ad_id=v.get("ad_id", ""), | |
| verdict=v.get("verdict", "approve"), | |
| confidence=float(v.get("confidence", 0.5) or 0.5), | |
| ground_truth=v.get("ground_truth", "legit"), | |
| auto_approved=bool(v.get("auto_approved", False)), | |
| ) | |
| ) | |
| links: List[LinkResult] = [ | |
| LinkResult( | |
| ad_id_1=l.get("ad_id_1", ""), | |
| ad_id_2=l.get("ad_id_2", ""), | |
| correct=bool(l.get("correct", False)), | |
| ) | |
| for l in link_entries | |
| if l.get("ad_id_1") and l.get("ad_id_2") | |
| ] | |
| ads_metadata: List[Dict[str, Any]] = [] | |
| for ad in ad_entries: | |
| if not ad.get("ad_id"): | |
| continue | |
| ads_metadata.append( | |
| { | |
| "ad_id": ad.get("ad_id"), | |
| "ground_truth": ad.get("ground_truth", "legit"), | |
| "severity": float(ad.get("severity", 0.5) or 0.5), | |
| "fraud_type": ad.get("fraud_type", ""), | |
| "category": ad.get("category", ""), | |
| "country": ad.get("country", ""), | |
| } | |
| ) | |
| return EpisodeRecord( | |
| task_id=record_payload.get("task_id", ""), | |
| total_steps=int(record_payload.get("total_steps", 0) or 0), | |
| action_budget=int(record_payload.get("action_budget", 0) or 0), | |
| verdicts=verdicts, | |
| links=links, | |
| ads_metadata=ads_metadata, | |
| ) | |
| __all__ = ["HeuristicAuditor"] | |