S-Dreamer's picture
Upload 8 files
a71264b verified
"""
osint_core.observer
===================
Independent observer circuit for the Enterprise Drift-Aware OSINT Control Fabric.
The observer does not execute. It reconstructs expected behavior from intent,
policy, and executor trace, then emits dissent when reality does not match
declared constraints.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Mapping
class ObserverSeverity(str, Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass(frozen=True)
class ObserverCheck:
name: str
ok: bool
severity: ObserverSeverity
reason: str
evidence: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class ExecutionTrace:
intent_id: str
modules_requested: tuple[str, ...]
modules_executed: tuple[str, ...]
modules_blocked: tuple[str, ...]
observed_effects: tuple[str, ...]
output_schema_valid: bool
audit_payload: Mapping[str, Any]
errors: tuple[str, ...] = field(default_factory=tuple)
@dataclass(frozen=True)
class ObserverAssessment:
intent_id: str
checks: tuple[ObserverCheck, ...]
@property
def dissent(self) -> bool:
return any(not check.ok for check in self.checks)
@property
def has_critical_violation(self) -> bool:
return any((not check.ok) and check.severity == ObserverSeverity.CRITICAL for check in self.checks)
RAW_AUDIT_KEYS = {
"raw_indicator",
"raw_input",
"indicator",
"email",
"domain",
"username",
"url",
"ip",
}
def observe_execution(intent: IntentPacket, trace: ExecutionTrace, policy_result: PolicyEvaluation) -> ObserverAssessment:
checks = (
check_intent_trace_match(intent, trace),
check_modules_match_policy(trace, policy_result),
check_output_schema(trace),
check_no_raw_indicator_leak(trace),
check_expected_side_effects(intent, trace),
)
return ObserverAssessment(intent_id=trace.intent_id, checks=checks)
def check_intent_trace_match(intent: Any, trace: ExecutionTrace) -> ObserverCheck:
expected_intent_id = getattr(intent, "intent_id", None)
ok = expected_intent_id == trace.intent_id
return ObserverCheck(
name="intent_trace_match",
ok=ok,
severity=ObserverSeverity.CRITICAL,
reason="Execution trace must correspond to the intent packet.",
evidence={"expected": expected_intent_id, "actual": trace.intent_id},
)
def check_modules_match_policy(trace: ExecutionTrace, policy_result: Any) -> ObserverCheck:
if isinstance(policy_result, dict):
allowed = set(policy_result.get("allowed_modules", []))
else:
allowed = set(getattr(policy_result, "allowed_modules", []))
executed = set(trace.modules_executed)
unexpected = sorted(executed - allowed)
return ObserverCheck(
name="modules_match_policy",
ok=not unexpected,
severity=ObserverSeverity.CRITICAL,
reason="Executed modules must be allowed by policy.",
evidence={"unexpected_modules": unexpected},
)
def check_output_schema(trace: ExecutionTrace) -> ObserverCheck:
return ObserverCheck(
name="output_schema_valid",
ok=trace.output_schema_valid,
severity=ObserverSeverity.WARNING,
reason="Executor output should conform to expected schema.",
evidence={},
)
def check_no_raw_indicator_leak(trace: ExecutionTrace) -> ObserverCheck:
present = sorted(set(trace.audit_payload.keys()).intersection(RAW_AUDIT_KEYS))
return ObserverCheck(
name="no_raw_indicator_leak",
ok=not present,
severity=ObserverSeverity.CRITICAL,
reason="Audit payload must not contain raw indicator fields.",
evidence={"raw_fields": present},
)
def check_expected_side_effects(intent: Any, trace: ExecutionTrace) -> ObserverCheck:
expected = set(getattr(intent, "expected_side_effects", ()))
observed = set(trace.observed_effects)
missing = sorted(expected - observed)
return ObserverCheck(
name="expected_side_effects_present",
ok=not missing,
severity=ObserverSeverity.WARNING,
reason="Declared expected side effects should be observed or explained.",
evidence={"missing_effects": missing},
)