CounterFeint / scripted /auditor.py
QuantumTransformer's picture
Upload folder using huggingface_hub
26bf1c9 verified
"""
Deterministic Auditor policy backed by the rule-based Track A / Track B graders.
The Auditor is intentionally **deterministic by design**. It consumes
the full audit-phase observation, runs every rule-based audit in
``graders/auditor_track_a.py`` (rationale citation, calibration,
cross-ad consistency, bias, rationale↔verdict coherence) and
``graders/auditor_track_b.py`` (intrinsic, grounding, real-world,
signal-realism, novelty plausibility), queues one
``flag_investigator`` / ``flag_fraudster`` action per flag, and
submits an audit report containing the flag payloads and aggregated
audit scores.
Why deterministic
-----------------
The Auditor's flag stream is the **reward source** for both the
Investigator (Track A flags drive the rationale-quality penalty in
``multi_agent_rewards.investigator_reward``) and the Fraudster (Track B
plausibility drives the survival credit in
``multi_agent_rewards.fraudster_reward``). Keeping it deterministic
means the reward function is interpretable, inspectable, and free of
LLM noise / cost / latency. It also mirrors how real ad-policy review
teams operate: rule-based scorecards layered on top of model verdicts.
Future scope
------------
Replacing this with an LLM-backed Auditor is a clean drop-in: the
``run_full_audit`` orchestrator in
:mod:`counterfeint.graders.auditor_pipeline` returns a typed
``FullAuditResult`` that any LLM policy could synthesise in one shot.
That swap is **out of scope for the hackathon submission** — we keep
the deterministic path so the reward signal is reproducible.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from ..graders.auditor_pipeline import run_full_audit
from ..graders.base_grader import (
EpisodeRecord,
LinkResult,
VerdictResult,
)
from ..models import AuditorAction
from ._base import PolicyBase
class HeuristicAuditor(PolicyBase):
"""Scripted Auditor — runs rule-based graders and submits a final report."""
def __init__(self) -> None:
self._queued: List[AuditorAction] = []
self._submitted: bool = False
self._report: Optional[Dict[str, Any]] = None
def reset(self) -> None:
self._queued = []
self._submitted = False
self._report = None
def act(self, observation: Dict[str, Any]) -> AuditorAction:
if observation.get("phase") == "done" or self._submitted:
return AuditorAction(
action_type="submit_audit_report",
audit_report=self._report or {},
note="HeuristicAuditor: episode already done.",
)
if not self._queued:
self._queued, self._report = self._plan(observation)
if self._queued:
action = self._queued.pop(0)
if action.action_type == "submit_audit_report":
self._submitted = True
return action
self._submitted = True
return AuditorAction(
action_type="submit_audit_report",
audit_report=self._report or {},
note="HeuristicAuditor: queue empty, submitting final report.",
)
# ------------------------------------------------------------------
# Internal planning
# ------------------------------------------------------------------
def _plan(
self, observation: Dict[str, Any]
) -> tuple[List[AuditorAction], Dict[str, Any]]:
record = self._build_record(observation)
investigator_actions = observation.get("investigator_actions", []) or []
fraudster_proposals = observation.get("fraudster_proposals", []) or []
investigation_data_seen = observation.get("investigation_data_seen", {}) or {}
audit = run_full_audit(
record=record,
investigator_action_log=investigator_actions,
investigation_data_seen=investigation_data_seen,
fraudster_proposal_log=fraudster_proposals,
)
actions: List[AuditorAction] = []
for f in audit.track_b_flags:
actions.append(
AuditorAction(
action_type="flag_fraudster",
target_ad_id=f.target_ad_id,
flag_type=f.flag_type,
severity=f.severity,
note=f.note,
)
)
for f in audit.track_a_flags:
actions.append(
AuditorAction(
action_type="flag_investigator",
target_ad_id=f.target_ad_id,
flag_type=f.flag_type,
severity=f.severity,
note=f.note,
)
)
report: Dict[str, Any] = {
"track_a_flags": [f.model_dump() for f in audit.track_a_flags],
"track_b_flags": [f.model_dump() for f in audit.track_b_flags],
"investigator_audit_score": round(audit.investigator_audit_score, 4),
"fraudster_plausibility_score": round(audit.fraudster_plausibility_score, 4),
"per_ad_plausibility": {
k: round(v, 4) for k, v in audit.per_ad_plausibility.items()
},
"notes": (
f"HeuristicAuditor: {len(audit.track_a_flags)} Track A flag(s), "
f"{len(audit.track_b_flags)} Track B flag(s); "
f"inv_audit={audit.investigator_audit_score:.2f}, "
f"fraud_plaus={audit.fraudster_plausibility_score:.2f}."
),
}
actions.append(
AuditorAction(
action_type="submit_audit_report",
audit_report=report,
note="HeuristicAuditor: submitting derived report.",
)
)
return actions, report
# ------------------------------------------------------------------
# EpisodeRecord reconstruction from the auditor observation
# ------------------------------------------------------------------
def _build_record(self, observation: Dict[str, Any]) -> Optional[EpisodeRecord]:
record_payload = observation.get("full_episode_record") or {}
if not record_payload:
return None
verdict_entries = record_payload.get("verdicts") or []
link_entries = record_payload.get("links") or []
ad_entries = record_payload.get("ads") or []
verdicts: List[VerdictResult] = []
for v in verdict_entries:
if "verdict" not in v:
continue
verdicts.append(
VerdictResult(
ad_id=v.get("ad_id", ""),
verdict=v.get("verdict", "approve"),
confidence=float(v.get("confidence", 0.5) or 0.5),
ground_truth=v.get("ground_truth", "legit"),
auto_approved=bool(v.get("auto_approved", False)),
)
)
links: List[LinkResult] = [
LinkResult(
ad_id_1=l.get("ad_id_1", ""),
ad_id_2=l.get("ad_id_2", ""),
correct=bool(l.get("correct", False)),
)
for l in link_entries
if l.get("ad_id_1") and l.get("ad_id_2")
]
ads_metadata: List[Dict[str, Any]] = []
for ad in ad_entries:
if not ad.get("ad_id"):
continue
ads_metadata.append(
{
"ad_id": ad.get("ad_id"),
"ground_truth": ad.get("ground_truth", "legit"),
"severity": float(ad.get("severity", 0.5) or 0.5),
"fraud_type": ad.get("fraud_type", ""),
"category": ad.get("category", ""),
"country": ad.get("country", ""),
}
)
return EpisodeRecord(
task_id=record_payload.get("task_id", ""),
total_steps=int(record_payload.get("total_steps", 0) or 0),
action_budget=int(record_payload.get("action_budget", 0) or 0),
verdicts=verdicts,
links=links,
ads_metadata=ads_metadata,
)
__all__ = ["HeuristicAuditor"]