S-Dreamer commited on
Commit
1e4e9aa
·
verified ·
1 Parent(s): a5c3ce8

Create osint_core/policy.py

Browse files
Files changed (1) hide show
  1. osint_core/policy.py +397 -0
osint_core/policy.py ADDED
@@ -0,0 +1,397 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ osint_core.policy
3
+ =================
4
+
5
+ Policy enforcement for the Passive OSINT Control Panel.
6
+
7
+ This module is the authorization boundary between validated input and execution.
8
+
9
+ Design constraints:
10
+ - Passive by default.
11
+ - No module execution decision should be made outside this layer.
12
+ - Authorized-only modules must be blocked unless explicit authorization is present.
13
+ - Forbidden capabilities are always denied.
14
+ - Correction verbs are closed over a fixed allowlist.
15
+ - Policy evaluation is side-effect free: it returns a decision, it does not execute.
16
+ """
17
+
18
+ from __future__ import annotations
19
+
20
+ from dataclasses import dataclass, field
21
+ from enum import Enum
22
+ from typing import Iterable, Literal
23
+
24
+
25
+ CorrectionVerb = Literal["ADAPT", "CONSTRAIN", "REVERT", "OBSERVE"]
26
+ RiskLevel = Literal["low", "conditional", "forbidden"]
27
+ PolicyTier = Literal["T1", "T2", "T3", "T4"]
28
+
29
+
30
+ class PolicyDecision(str, Enum):
31
+ ALLOW = "allow"
32
+ BLOCK = "block"
33
+ CONSTRAIN = "constrain"
34
+
35
+
36
+ class PolicyErrorCode(str, Enum):
37
+ UNKNOWN_MODULE = "unknown_module"
38
+ AUTHORIZATION_REQUIRED = "authorization_required"
39
+ FORBIDDEN_MODULE = "forbidden_module"
40
+ INVALID_CORRECTION_VERB = "invalid_correction_verb"
41
+ POLICY_MUTATION_BLOCKED = "policy_mutation_blocked"
42
+ RAW_LOGGING_BLOCKED = "raw_logging_blocked"
43
+
44
+
45
+ @dataclass(frozen=True)
46
+ class ModulePolicy:
47
+ name: str
48
+ canonical_name: str
49
+ risk: RiskLevel
50
+ tier: PolicyTier
51
+ description: str
52
+ requires_authorization: bool = False
53
+
54
+
55
+ @dataclass(frozen=True)
56
+ class PolicyViolation:
57
+ code: PolicyErrorCode
58
+ message: str
59
+ module: str | None = None
60
+
61
+
62
+ @dataclass(frozen=True)
63
+ class PolicyEvaluation:
64
+ decision: PolicyDecision
65
+ allowed_modules: list[str] = field(default_factory=list)
66
+ blocked_modules: list[str] = field(default_factory=list)
67
+ violations: list[PolicyViolation] = field(default_factory=list)
68
+ correction_verbs_allowed: list[CorrectionVerb] = field(default_factory=list)
69
+
70
+
71
+ ALLOWED_CORRECTION_VERBS: tuple[CorrectionVerb, ...] = (
72
+ "ADAPT",
73
+ "CONSTRAIN",
74
+ "REVERT",
75
+ "OBSERVE",
76
+ )
77
+
78
+ # Canonical module registry.
79
+ # Keep this small and explicit. New capabilities should be added deliberately.
80
+ MODULE_POLICIES: dict[str, ModulePolicy] = {
81
+ "resource_links": ModulePolicy(
82
+ name="Resource Links",
83
+ canonical_name="resource_links",
84
+ risk="low",
85
+ tier="T4",
86
+ description="Generate links to external OSINT resources without contacting the target.",
87
+ ),
88
+ "dns_records": ModulePolicy(
89
+ name="DNS Records",
90
+ canonical_name="dns_records",
91
+ risk="low",
92
+ tier="T3",
93
+ description="Resolve DNS records using a resolver. Low-impact, but still a network lookup.",
94
+ ),
95
+ "local_url_parse": ModulePolicy(
96
+ name="Local URL Parse",
97
+ canonical_name="local_url_parse",
98
+ risk="low",
99
+ tier="T4",
100
+ description="Parse a URL locally without contacting the target.",
101
+ ),
102
+ "http_headers": ModulePolicy(
103
+ name="HTTP Headers",
104
+ canonical_name="http_headers",
105
+ risk="conditional",
106
+ tier="T2",
107
+ description="Fetch HTTP headers from an explicitly authorized target.",
108
+ requires_authorization=True,
109
+ ),
110
+ "robots_txt": ModulePolicy(
111
+ name="Robots.txt",
112
+ canonical_name="robots_txt",
113
+ risk="conditional",
114
+ tier="T2",
115
+ description="Fetch robots.txt from an explicitly authorized target.",
116
+ requires_authorization=True,
117
+ ),
118
+ "screenshot": ModulePolicy(
119
+ name="Screenshot",
120
+ canonical_name="screenshot",
121
+ risk="conditional",
122
+ tier="T2",
123
+ description="Render a screenshot of an explicitly authorized URL.",
124
+ requires_authorization=True,
125
+ ),
126
+ "port_scan": ModulePolicy(
127
+ name="Port Scan",
128
+ canonical_name="port_scan",
129
+ risk="forbidden",
130
+ tier="T1",
131
+ description="Port scanning is outside the passive OSINT boundary.",
132
+ ),
133
+ "brute_force": ModulePolicy(
134
+ name="Brute Force",
135
+ canonical_name="brute_force",
136
+ risk="forbidden",
137
+ tier="T1",
138
+ description="Credential or username brute forcing is forbidden.",
139
+ ),
140
+ "credential_testing": ModulePolicy(
141
+ name="Credential Testing",
142
+ canonical_name="credential_testing",
143
+ risk="forbidden",
144
+ tier="T1",
145
+ description="Credential testing is forbidden.",
146
+ ),
147
+ "exploitation": ModulePolicy(
148
+ name="Exploitation",
149
+ canonical_name="exploitation",
150
+ risk="forbidden",
151
+ tier="T1",
152
+ description="Exploit execution is forbidden.",
153
+ ),
154
+ }
155
+
156
+
157
+ ALIASES: dict[str, str] = {
158
+ "resource links": "resource_links",
159
+ "links": "resource_links",
160
+ "source links": "resource_links",
161
+ "dns": "dns_records",
162
+ "dns records": "dns_records",
163
+ "local url parse": "local_url_parse",
164
+ "url parse": "local_url_parse",
165
+ "http headers": "http_headers",
166
+ "headers": "http_headers",
167
+ "robots.txt": "robots_txt",
168
+ "robots": "robots_txt",
169
+ "screenshot": "screenshot",
170
+ "port scan": "port_scan",
171
+ "nmap": "port_scan",
172
+ "masscan": "port_scan",
173
+ "brute force": "brute_force",
174
+ "bruteforce": "brute_force",
175
+ "credential testing": "credential_testing",
176
+ "creds": "credential_testing",
177
+ "exploitation": "exploitation",
178
+ "exploit": "exploitation",
179
+ }
180
+
181
+
182
+ def canonicalize_module_name(module_name: str) -> str:
183
+ """
184
+ Convert a UI label or alias to canonical module name.
185
+ """
186
+ key = str(module_name or "").strip().lower().replace("-", " ").replace("_", " ")
187
+ return ALIASES.get(key, key.replace(" ", "_"))
188
+
189
+
190
+ def get_module_policy(module_name: str) -> ModulePolicy | None:
191
+ return MODULE_POLICIES.get(canonicalize_module_name(module_name))
192
+
193
+
194
+ def evaluate_modules(
195
+ requested_modules: Iterable[str],
196
+ *,
197
+ authorized_target: bool = False,
198
+ passive_only: bool = True,
199
+ allow_unknown_modules: bool = False,
200
+ ) -> PolicyEvaluation:
201
+ """
202
+ Evaluate requested modules against the policy.
203
+
204
+ Parameters
205
+ ----------
206
+ requested_modules:
207
+ Module names from UI/API.
208
+ authorized_target:
209
+ Explicit confirmation that the target is authorized for conditional interaction.
210
+ passive_only:
211
+ When True, conditional modules are blocked even if authorization is present.
212
+ Use False only for an authorized execution mode.
213
+ allow_unknown_modules:
214
+ Should remain False in production.
215
+
216
+ Returns
217
+ -------
218
+ PolicyEvaluation
219
+ Side-effect-free decision describing what may execute.
220
+ """
221
+ allowed: list[str] = []
222
+ blocked: list[str] = []
223
+ violations: list[PolicyViolation] = []
224
+
225
+ for raw_name in requested_modules:
226
+ canonical = canonicalize_module_name(raw_name)
227
+ policy = MODULE_POLICIES.get(canonical)
228
+
229
+ if policy is None:
230
+ if allow_unknown_modules:
231
+ allowed.append(canonical)
232
+ else:
233
+ blocked.append(canonical)
234
+ violations.append(
235
+ PolicyViolation(
236
+ code=PolicyErrorCode.UNKNOWN_MODULE,
237
+ message=f"Unknown module blocked: {raw_name}",
238
+ module=canonical,
239
+ )
240
+ )
241
+ continue
242
+
243
+ if policy.risk == "forbidden":
244
+ blocked.append(policy.canonical_name)
245
+ violations.append(
246
+ PolicyViolation(
247
+ code=PolicyErrorCode.FORBIDDEN_MODULE,
248
+ message=f"Forbidden module blocked: {policy.name}",
249
+ module=policy.canonical_name,
250
+ )
251
+ )
252
+ continue
253
+
254
+ if policy.requires_authorization:
255
+ if passive_only:
256
+ blocked.append(policy.canonical_name)
257
+ violations.append(
258
+ PolicyViolation(
259
+ code=PolicyErrorCode.AUTHORIZATION_REQUIRED,
260
+ message=f"Conditional module blocked in passive-only mode: {policy.name}",
261
+ module=policy.canonical_name,
262
+ )
263
+ )
264
+ continue
265
+
266
+ if not authorized_target:
267
+ blocked.append(policy.canonical_name)
268
+ violations.append(
269
+ PolicyViolation(
270
+ code=PolicyErrorCode.AUTHORIZATION_REQUIRED,
271
+ message=f"Authorization required for module: {policy.name}",
272
+ module=policy.canonical_name,
273
+ )
274
+ )
275
+ continue
276
+
277
+ allowed.append(policy.canonical_name)
278
+
279
+ if violations:
280
+ # Any T1 forbidden issue or policy/auth issue should constrain execution.
281
+ decision = PolicyDecision.CONSTRAIN
282
+ else:
283
+ decision = PolicyDecision.ALLOW
284
+
285
+ return PolicyEvaluation(
286
+ decision=decision,
287
+ allowed_modules=dedupe_preserve_order(allowed),
288
+ blocked_modules=dedupe_preserve_order(blocked),
289
+ violations=violations,
290
+ correction_verbs_allowed=list(ALLOWED_CORRECTION_VERBS),
291
+ )
292
+
293
+
294
+ def enforce_correction_verb(verb: str) -> CorrectionVerb:
295
+ """
296
+ Validate that a correction verb is part of the closed mutation vocabulary.
297
+ """
298
+ normalized = str(verb or "").strip().upper()
299
+ if normalized not in ALLOWED_CORRECTION_VERBS:
300
+ raise PolicyViolationException(
301
+ PolicyViolation(
302
+ code=PolicyErrorCode.INVALID_CORRECTION_VERB,
303
+ message=f"Invalid correction verb: {verb}",
304
+ )
305
+ )
306
+ return normalized # type: ignore[return-value]
307
+
308
+
309
+ def may_mutate_policy(*, out_of_band_approval: bool = False) -> bool:
310
+ """
311
+ Policy cannot rewrite itself. Mutation requires an out-of-band gate.
312
+ """
313
+ return bool(out_of_band_approval)
314
+
315
+
316
+ def enforce_policy_mutation_gate(*, out_of_band_approval: bool = False) -> None:
317
+ if not may_mutate_policy(out_of_band_approval=out_of_band_approval):
318
+ raise PolicyViolationException(
319
+ PolicyViolation(
320
+ code=PolicyErrorCode.POLICY_MUTATION_BLOCKED,
321
+ message="Policy mutation requires out-of-band approval.",
322
+ )
323
+ )
324
+
325
+
326
+ def enforce_audit_payload(payload: dict) -> None:
327
+ """
328
+ Prevent raw sensitive indicators from appearing in audit payloads.
329
+
330
+ This is a defensive check. The audit module should already avoid raw values.
331
+ """
332
+ forbidden_keys = {
333
+ "raw_indicator",
334
+ "raw_input",
335
+ "indicator",
336
+ "email",
337
+ "domain",
338
+ "username",
339
+ "url",
340
+ "ip",
341
+ }
342
+
343
+ present = forbidden_keys.intersection(payload.keys())
344
+ if present:
345
+ raise PolicyViolationException(
346
+ PolicyViolation(
347
+ code=PolicyErrorCode.RAW_LOGGING_BLOCKED,
348
+ message=f"Audit payload contains forbidden raw field(s): {sorted(present)}",
349
+ )
350
+ )
351
+
352
+
353
+ def module_catalog() -> list[dict[str, str | bool]]:
354
+ """
355
+ Return a serializable catalog suitable for UI display.
356
+ """
357
+ return [
358
+ {
359
+ "name": policy.name,
360
+ "canonical_name": policy.canonical_name,
361
+ "risk": policy.risk,
362
+ "tier": policy.tier,
363
+ "requires_authorization": policy.requires_authorization,
364
+ "description": policy.description,
365
+ }
366
+ for policy in MODULE_POLICIES.values()
367
+ ]
368
+
369
+
370
+ def allowed_ui_modules(*, include_conditional: bool = True) -> list[str]:
371
+ """
372
+ Return user-facing modules, excluding forbidden capabilities.
373
+ """
374
+ names: list[str] = []
375
+ for policy in MODULE_POLICIES.values():
376
+ if policy.risk == "forbidden":
377
+ continue
378
+ if policy.risk == "conditional" and not include_conditional:
379
+ continue
380
+ names.append(policy.name)
381
+ return names
382
+
383
+
384
+ def dedupe_preserve_order(values: Iterable[str]) -> list[str]:
385
+ seen: set[str] = set()
386
+ output: list[str] = []
387
+ for value in values:
388
+ if value not in seen:
389
+ output.append(value)
390
+ seen.add(value)
391
+ return output
392
+
393
+
394
+ class PolicyViolationException(PermissionError):
395
+ def __init__(self, violation: PolicyViolation):
396
+ super().__init__(violation.message)
397
+ self.violation = violation