""" osint_core.observer =================== Independent observer circuit for the Enterprise Drift-Aware OSINT Control Fabric. The observer does not execute. It reconstructs expected behavior from intent, policy, and executor trace, then emits dissent when reality does not match declared constraints. """ from __future__ import annotations from dataclasses import dataclass, field from enum import Enum from typing import Any, Mapping class ObserverSeverity(str, Enum): INFO = "info" WARNING = "warning" CRITICAL = "critical" @dataclass(frozen=True) class ObserverCheck: name: str ok: bool severity: ObserverSeverity reason: str evidence: dict[str, Any] = field(default_factory=dict) @dataclass(frozen=True) class ExecutionTrace: intent_id: str modules_requested: tuple[str, ...] modules_executed: tuple[str, ...] modules_blocked: tuple[str, ...] observed_effects: tuple[str, ...] output_schema_valid: bool audit_payload: Mapping[str, Any] errors: tuple[str, ...] = field(default_factory=tuple) @dataclass(frozen=True) class ObserverAssessment: intent_id: str checks: tuple[ObserverCheck, ...] @property def dissent(self) -> bool: return any(not check.ok for check in self.checks) @property def has_critical_violation(self) -> bool: return any((not check.ok) and check.severity == ObserverSeverity.CRITICAL for check in self.checks) RAW_AUDIT_KEYS = { "raw_indicator", "raw_input", "indicator", "email", "domain", "username", "url", "ip", } def observe_execution(intent: IntentPacket, trace: ExecutionTrace, policy_result: PolicyEvaluation) -> ObserverAssessment: checks = ( check_intent_trace_match(intent, trace), check_modules_match_policy(trace, policy_result), check_output_schema(trace), check_no_raw_indicator_leak(trace), check_expected_side_effects(intent, trace), ) return ObserverAssessment(intent_id=trace.intent_id, checks=checks) def check_intent_trace_match(intent: Any, trace: ExecutionTrace) -> ObserverCheck: expected_intent_id = getattr(intent, "intent_id", None) ok = expected_intent_id == trace.intent_id return ObserverCheck( name="intent_trace_match", ok=ok, severity=ObserverSeverity.CRITICAL, reason="Execution trace must correspond to the intent packet.", evidence={"expected": expected_intent_id, "actual": trace.intent_id}, ) def check_modules_match_policy(trace: ExecutionTrace, policy_result: Any) -> ObserverCheck: if isinstance(policy_result, dict): allowed = set(policy_result.get("allowed_modules", [])) else: allowed = set(getattr(policy_result, "allowed_modules", [])) executed = set(trace.modules_executed) unexpected = sorted(executed - allowed) return ObserverCheck( name="modules_match_policy", ok=not unexpected, severity=ObserverSeverity.CRITICAL, reason="Executed modules must be allowed by policy.", evidence={"unexpected_modules": unexpected}, ) def check_output_schema(trace: ExecutionTrace) -> ObserverCheck: return ObserverCheck( name="output_schema_valid", ok=trace.output_schema_valid, severity=ObserverSeverity.WARNING, reason="Executor output should conform to expected schema.", evidence={}, ) def check_no_raw_indicator_leak(trace: ExecutionTrace) -> ObserverCheck: present = sorted(set(trace.audit_payload.keys()).intersection(RAW_AUDIT_KEYS)) return ObserverCheck( name="no_raw_indicator_leak", ok=not present, severity=ObserverSeverity.CRITICAL, reason="Audit payload must not contain raw indicator fields.", evidence={"raw_fields": present}, ) def check_expected_side_effects(intent: Any, trace: ExecutionTrace) -> ObserverCheck: expected = set(getattr(intent, "expected_side_effects", ())) observed = set(trace.observed_effects) missing = sorted(expected - observed) return ObserverCheck( name="expected_side_effects_present", ok=not missing, severity=ObserverSeverity.WARNING, reason="Declared expected side effects should be observed or explained.", evidence={"missing_effects": missing}, )