File size: 9,396 Bytes
302b72d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 | """
tests/test_intent.py
====================
Contract tests for osint_core.intent.
Core invariants:
- Intent packets are immutable.
- Intent packets do not store raw indicators.
- Scope boundaries are explicit and validated.
- Forbidden operations cannot appear in allowed operations.
- Packets can be signed and verified.
- Signature tampering is detected.
- Risk and rollback helpers are deterministic.
"""
from __future__ import annotations
from dataclasses import FrozenInstanceError, replace
import pytest
from osint_core.intent import (
DEFAULT_FORBIDDEN_OPERATIONS,
IntentErrorCode,
IntentPacket,
IntentValidationError,
canonical_json,
create_intent_packet,
default_rollback_for_risk,
derive_risk_label,
find_raw_indicator_fields,
hash_manifest_payload,
intent_fingerprint,
make_scope,
risk_score,
sign_payload,
unsigned_intent_fingerprint,
validate_intent,
validate_scope,
verify_intent_signature,
)
TEST_SECRET = "test-intent-signing-secret"
TARGET_HASH = "a" * 64
MANIFEST_HASH = "b" * 64
def make_valid_scope(**overrides):
data = {
"target_hash": TARGET_HASH,
"indicator_type": "domain",
"allowed_operations": ["resource_links"],
"success_criteria": ["links_generated"],
}
data.update(overrides)
return make_scope(**data)
def make_valid_packet(**overrides):
scope = overrides.pop("scope", make_valid_scope())
data = {
"action": "enrich_indicator",
"purpose": "Generate passive OSINT source links for a validated indicator.",
"scope": scope,
"requested_modules": ["resource_links"],
"expected_side_effects": ["report_created", "audit_event_created"],
"rollback_strategy": "observe_only",
"risk_label": "low",
"manifest_hash": MANIFEST_HASH,
"signing_secret": TEST_SECRET,
}
data.update(overrides)
return create_intent_packet(**data)
def test_make_scope_adds_default_forbidden_operations():
scope = make_valid_scope()
for operation in DEFAULT_FORBIDDEN_OPERATIONS:
assert operation in scope.forbidden_operations
assert scope.target_hash == TARGET_HASH
assert scope.indicator_type == "domain"
assert scope.allowed_operations == ("resource_links",)
def test_scope_rejects_missing_target_hash():
result = validate_scope(make_valid_scope(target_hash="c" * 64))
assert result.ok is True
with pytest.raises(IntentValidationError) as exc:
make_valid_scope(target_hash="")
assert exc.value.code == IntentErrorCode.MISSING_FIELD
def test_scope_rejects_non_hash_target_identity():
with pytest.raises(IntentValidationError) as exc:
make_valid_scope(target_hash="example.com")
assert exc.value.code == IntentErrorCode.INVALID_SCOPE
def test_scope_rejects_empty_allowed_operations():
with pytest.raises(IntentValidationError) as exc:
make_valid_scope(allowed_operations=[])
assert exc.value.code == IntentErrorCode.MISSING_FIELD
def test_scope_rejects_forbidden_operation_overlap():
with pytest.raises(IntentValidationError) as exc:
make_valid_scope(allowed_operations=["resource_links", "port_scan"])
assert exc.value.code == IntentErrorCode.FORBIDDEN_OPERATION_REQUESTED
def test_scope_rejects_invalid_time_horizon():
with pytest.raises(IntentValidationError) as exc:
make_valid_scope(time_horizon_seconds=0)
assert exc.value.code == IntentErrorCode.INVALID_SCOPE
with pytest.raises(IntentValidationError) as exc:
make_valid_scope(time_horizon_seconds=90_000)
assert exc.value.code == IntentErrorCode.INVALID_SCOPE
def test_create_intent_packet_signs_and_verifies():
packet = make_valid_packet()
assert isinstance(packet, IntentPacket)
assert packet.signature is not None
assert verify_intent_signature(packet, secret=TEST_SECRET) is True
def test_intent_packet_is_immutable():
packet = make_valid_packet()
with pytest.raises(FrozenInstanceError):
packet.purpose = "mutated" # type: ignore[misc]
def test_unsigned_payload_excludes_signature():
packet = make_valid_packet()
payload = packet.unsigned_payload()
assert "signature" not in payload
assert packet.signature is not None
def test_signature_tampering_is_detected():
packet = make_valid_packet()
tampered = replace(packet, purpose="Changed purpose after signing.")
with pytest.raises(IntentValidationError) as exc:
verify_intent_signature(tampered, secret=TEST_SECRET)
assert exc.value.code == IntentErrorCode.SIGNATURE_MISMATCH
def test_unsigned_packet_fails_verification():
packet = create_intent_packet(
action="enrich_indicator",
purpose="Generate passive links.",
scope=make_valid_scope(),
requested_modules=["resource_links"],
expected_side_effects=["report_created"],
rollback_strategy="observe_only",
risk_label="low",
manifest_hash=MANIFEST_HASH,
sign=False,
)
assert packet.signature is None
with pytest.raises(IntentValidationError) as exc:
verify_intent_signature(packet, secret=TEST_SECRET)
assert exc.value.code == IntentErrorCode.UNSIGNED_PACKET
def test_packet_rejects_invalid_action():
with pytest.raises(IntentValidationError) as exc:
make_valid_packet(action="delete_everything") # type: ignore[arg-type]
assert exc.value.code == IntentErrorCode.INVALID_ACTION
def test_packet_rejects_invalid_risk_label():
with pytest.raises(IntentValidationError) as exc:
make_valid_packet(risk_label="extreme") # type: ignore[arg-type]
assert exc.value.code == IntentErrorCode.INVALID_RISK
def test_packet_rejects_invalid_rollback_strategy():
with pytest.raises(IntentValidationError) as exc:
make_valid_packet(rollback_strategy="YOLO") # type: ignore[arg-type]
assert exc.value.code == IntentErrorCode.INVALID_ROLLBACK
def test_packet_rejects_invalid_manifest_hash():
with pytest.raises(IntentValidationError) as exc:
make_valid_packet(manifest_hash="not-a-hash")
assert exc.value.code == IntentErrorCode.MISSING_FIELD
def test_packet_rejects_empty_purpose():
with pytest.raises(IntentValidationError) as exc:
make_valid_packet(purpose=" ")
assert exc.value.code == IntentErrorCode.MISSING_FIELD
def test_raw_indicator_field_detection():
payload = {
"safe": {"target_hash": TARGET_HASH},
"unsafe": {
"raw_indicator": "example.com",
"nested": {"email": "user@example.com"},
},
}
findings = find_raw_indicator_fields(payload)
assert "unsafe.raw_indicator" in findings
assert "unsafe.nested.email" in findings
def test_validate_intent_rejects_raw_indicator_like_fields():
packet = make_valid_packet()
unsafe_dict = packet.to_dict()
unsafe_dict["raw_indicator"] = "example.com"
findings = find_raw_indicator_fields(unsafe_dict)
assert "raw_indicator" in findings
def test_canonical_json_is_deterministic():
assert canonical_json({"b": 2, "a": 1}) == canonical_json({"a": 1, "b": 2})
def test_sign_payload_is_deterministic_for_same_payload_and_secret():
payload = {"a": 1, "b": 2}
assert sign_payload(payload, TEST_SECRET) == sign_payload(payload, TEST_SECRET)
assert sign_payload(payload, TEST_SECRET) != sign_payload(payload, "different-secret")
def test_hash_manifest_payload_is_stable():
payload = {"artifact": "test", "version": "1.0.0"}
assert hash_manifest_payload(payload) == hash_manifest_payload(payload)
assert len(hash_manifest_payload(payload)) == 64
def test_intent_fingerprints_are_stable_and_distinct():
packet = make_valid_packet()
signed_fp = intent_fingerprint(packet)
unsigned_fp = unsigned_intent_fingerprint(packet)
assert len(signed_fp) == 64
assert len(unsigned_fp) == 64
assert signed_fp != unsigned_fp
def test_validate_intent_accepts_valid_packet():
result = validate_intent(make_valid_packet())
assert result.ok is True
assert result.errors == ()
assert result.error_codes == ()
def test_risk_score_mapping():
assert risk_score("low") == 0.25
assert risk_score("medium") == 0.5
assert risk_score("high") == 0.75
assert risk_score("critical") == 1.0
def test_default_rollback_for_risk():
assert default_rollback_for_risk("low") == "observe_only"
assert default_rollback_for_risk("medium") == "disable_module"
assert default_rollback_for_risk("high") == "sandbox"
assert default_rollback_for_risk("critical") == "revert"
def test_derive_risk_label_for_low_risk_passive_modules():
assert derive_risk_label(
requested_modules=["resource_links"],
authorized_target=False,
) == "low"
def test_derive_risk_label_for_conditional_authorized_modules():
assert derive_risk_label(
requested_modules=["http_headers"],
authorized_target=True,
) == "medium"
def test_derive_risk_label_for_conditional_unauthorized_modules():
assert derive_risk_label(
requested_modules=["http_headers"],
authorized_target=False,
) == "high"
def test_derive_risk_label_for_forbidden_modules():
assert derive_risk_label(
requested_modules=["nmap"],
authorized_target=True,
) == "critical"
|