| """ |
| Passive OSINT Control Panel |
| Drift-aware, passive-first OSINT enrichment interface for Hugging Face Spaces. |
| |
| Design constraints: |
| - Passive by default. |
| - No scanning, brute forcing, exploitation, or credential testing. |
| - All inputs are validated, sanitised, normalised, and hashed before audit logging. |
| - Modules that touch a user-provided target require explicit authorization. |
| - Correction verbs are limited to: ADAPT, CONSTRAIN, REVERT. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import csv |
| import hashlib |
| import hmac |
| import html |
| import ipaddress |
| import json |
| import os |
| import re |
| import socket |
| import time |
| import uuid |
| from dataclasses import asdict, dataclass |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any, Literal |
| from urllib.parse import quote_plus, urlparse |
|
|
| import gradio as gr |
|
|
|
|
| |
| |
| |
|
|
| APP_NAME = "Passive OSINT Control Panel" |
| APP_VERSION = "0.1.0" |
|
|
| BASE_DIR = Path(__file__).resolve().parent |
| RUNS_DIR = BASE_DIR / "runs" |
| REPORTS_DIR = RUNS_DIR / "reports" |
| AUDIT_DIR = RUNS_DIR / "audit" |
|
|
| for directory in (RUNS_DIR, REPORTS_DIR, AUDIT_DIR): |
| directory.mkdir(parents=True, exist_ok=True) |
|
|
| MAX_INPUT_LENGTH = 256 |
| NETWORK_TIMEOUT_SECONDS = 4.0 |
|
|
| CorrectionVerb = Literal["ADAPT", "CONSTRAIN", "REVERT", "OBSERVE"] |
| RiskTier = Literal["T1", "T2", "T3", "T4"] |
| IndicatorType = Literal["domain", "username", "email", "ip", "url", "unknown"] |
|
|
|
|
| |
| |
| |
|
|
| OSINT_LINKS: dict[str, list[dict[str, str]]] = { |
| "domain": [ |
| { |
| "name": "crt.sh", |
| "url": "https://crt.sh/?q={query}", |
| "description": "Certificate Transparency search", |
| }, |
| { |
| "name": "SecurityTrails", |
| "url": "https://securitytrails.com/domain/{query}/dns", |
| "description": "DNS and historical domain intelligence", |
| }, |
| { |
| "name": "URLScan", |
| "url": "https://urlscan.io/search/#{query}", |
| "description": "Public URL scan search", |
| }, |
| { |
| "name": "VirusTotal", |
| "url": "https://www.virustotal.com/gui/domain/{query}", |
| "description": "Public domain reputation lookup", |
| }, |
| { |
| "name": "Wayback Machine", |
| "url": "https://web.archive.org/web/*/{query}", |
| "description": "Archived pages", |
| }, |
| ], |
| "username": [ |
| { |
| "name": "WhatsMyName", |
| "url": "https://whatsmyname.app/?q={query}", |
| "description": "Username presence search", |
| }, |
| { |
| "name": "Namechk", |
| "url": "https://namechk.com/{query}", |
| "description": "Username availability and footprinting", |
| }, |
| { |
| "name": "GitHub", |
| "url": "https://github.com/{query}", |
| "description": "GitHub profile lookup", |
| }, |
| { |
| "name": "Reddit", |
| "url": "https://www.reddit.com/user/{query}", |
| "description": "Reddit profile lookup", |
| }, |
| ], |
| "email": [ |
| { |
| "name": "Have I Been Pwned", |
| "url": "https://haveibeenpwned.com/", |
| "description": "Manual breach exposure check", |
| }, |
| { |
| "name": "EmailRep", |
| "url": "https://emailrep.io/query/{query}", |
| "description": "Email reputation lookup", |
| }, |
| ], |
| "ip": [ |
| { |
| "name": "AbuseIPDB", |
| "url": "https://www.abuseipdb.com/check/{query}", |
| "description": "IP abuse reputation", |
| }, |
| { |
| "name": "VirusTotal", |
| "url": "https://www.virustotal.com/gui/ip-address/{query}", |
| "description": "Public IP reputation lookup", |
| }, |
| { |
| "name": "Shodan", |
| "url": "https://www.shodan.io/host/{query}", |
| "description": "Public internet exposure data", |
| }, |
| ], |
| "url": [ |
| { |
| "name": "URLScan", |
| "url": "https://urlscan.io/search/#{query}", |
| "description": "Public URL scan search", |
| }, |
| { |
| "name": "VirusTotal", |
| "url": "https://www.virustotal.com/gui/search/{query}", |
| "description": "Public URL reputation lookup", |
| }, |
| { |
| "name": "Wayback Machine", |
| "url": "https://web.archive.org/web/*/{query}", |
| "description": "Archived page history", |
| }, |
| ], |
| } |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class Manifest: |
| artifact_id: str |
| version: str |
| assumptions: list[str] |
| invariants: list[str] |
| tier: RiskTier |
| manifest_hash: str |
|
|
|
|
| @dataclass |
| class TelemetryEvent: |
| run_id: str |
| timestamp: str |
| artifact_id: str |
| manifest_hash: str |
| indicator_type: IndicatorType |
| indicator_hash: str |
| authorized_target: bool |
| modules_requested: list[str] |
| modules_executed: list[str] |
| modules_blocked: list[str] |
| drift_vector: dict[str, float] |
| correction_verb: CorrectionVerb |
| duration_ms: int |
| errors: list[str] |
|
|
|
|
| @dataclass |
| class EnrichmentResult: |
| run_id: str |
| indicator_type: IndicatorType |
| normalized_indicator: str |
| indicator_hash: str |
| links_markdown: str |
| passive_results: dict[str, Any] |
| drift_vector: dict[str, float] |
| correction_verb: CorrectionVerb |
| report_path: str |
| audit_path: str |
| errors: list[str] |
|
|
|
|
| |
| |
| |
|
|
| def now_utc() -> str: |
| return datetime.now(timezone.utc).isoformat() |
|
|
|
|
| def get_hash_salt() -> str: |
| """ |
| Use OSINT_HASH_SALT in production. |
| In local/dev mode, allow a deterministic fallback only when explicitly enabled. |
| """ |
| salt = os.getenv("OSINT_HASH_SALT") |
| if salt: |
| return salt |
|
|
| if os.getenv("ALLOW_DEV_SALT", "").lower() == "true": |
| return "dev-only-change-me" |
|
|
| raise RuntimeError( |
| "Missing OSINT_HASH_SALT. Add it as a Hugging Face Space Secret. " |
| "For local testing only, set ALLOW_DEV_SALT=true." |
| ) |
|
|
|
|
| def hmac_sha256(value: str) -> str: |
| salt = get_hash_salt() |
| return hmac.new( |
| salt.encode("utf-8"), |
| value.strip().lower().encode("utf-8"), |
| hashlib.sha256, |
| ).hexdigest() |
|
|
|
|
| def make_manifest() -> Manifest: |
| assumptions = [ |
| "OSINT activity is passive unless authorization is explicitly provided.", |
| "Raw indicators are not written to audit logs.", |
| "External target interaction requires authorized_target=True.", |
| "All mutation decisions route through Correction verbs.", |
| ] |
| invariants = [ |
| "Reject unsupported or malformed inputs.", |
| "Escape HTML and remove control characters.", |
| "Hash indicators before audit persistence.", |
| "Block authorized-only modules without explicit confirmation.", |
| ] |
| body = { |
| "artifact_id": "passive_osint_control_panel", |
| "version": APP_VERSION, |
| "assumptions": assumptions, |
| "invariants": invariants, |
| "tier": "T2", |
| } |
| manifest_hash = hashlib.sha256(json.dumps(body, sort_keys=True).encode()).hexdigest() |
| return Manifest( |
| artifact_id=body["artifact_id"], |
| version=body["version"], |
| assumptions=assumptions, |
| invariants=invariants, |
| tier="T2", |
| manifest_hash=manifest_hash, |
| ) |
|
|
|
|
| MANIFEST = make_manifest() |
|
|
|
|
| |
| |
| |
|
|
| CONTROL_CHARS = re.compile(r"[\x00-\x1f\x7f]") |
| DOMAIN_RE = re.compile( |
| r"^(?=.{1,253}$)(?!-)([a-zA-Z0-9-]{1,63}\.)+[a-zA-Z]{2,63}$" |
| ) |
| USERNAME_RE = re.compile(r"^[a-zA-Z0-9_.-]{2,64}$") |
| EMAIL_RE = re.compile(r"^[^@\s]{1,64}@[^@\s]{1,255}\.[^@\s]{2,63}$") |
|
|
|
|
| def sanitize_text(value: str) -> str: |
| if value is None: |
| raise ValueError("Input is required.") |
|
|
| value = str(value).strip() |
| value = CONTROL_CHARS.sub("", value) |
| value = html.escape(value, quote=True) |
|
|
| if not value: |
| raise ValueError("Input is empty.") |
|
|
| if len(value) > MAX_INPUT_LENGTH: |
| raise ValueError(f"Input exceeds {MAX_INPUT_LENGTH} characters.") |
|
|
| return value |
|
|
|
|
| def classify_and_normalize(raw_value: str, forced_type: str = "Auto") -> tuple[IndicatorType, str]: |
| safe = sanitize_text(raw_value) |
| candidate = html.unescape(safe).strip() |
|
|
| if forced_type != "Auto": |
| wanted = forced_type.lower() |
| return validate_as_type(candidate, wanted) |
|
|
| |
| parsed = urlparse(candidate) |
| if parsed.scheme in {"http", "https"} and parsed.netloc: |
| host = parsed.netloc.lower() |
| return "url", f"{parsed.scheme.lower()}://{host}{parsed.path or ''}" |
|
|
| |
| try: |
| return "ip", str(ipaddress.ip_address(candidate)) |
| except ValueError: |
| pass |
|
|
| |
| lowered = candidate.lower() |
| if EMAIL_RE.fullmatch(lowered): |
| return "email", lowered |
|
|
| |
| domain = lowered.rstrip(".") |
| if DOMAIN_RE.fullmatch(domain): |
| return "domain", domain |
|
|
| |
| if USERNAME_RE.fullmatch(candidate): |
| return "username", candidate |
|
|
| raise ValueError( |
| "Unsupported or malformed indicator. Supported types: domain, username, email, IP, URL." |
| ) |
|
|
|
|
| def validate_as_type(candidate: str, wanted: str) -> tuple[IndicatorType, str]: |
| if wanted == "domain": |
| domain = candidate.lower().rstrip(".") |
| if DOMAIN_RE.fullmatch(domain): |
| return "domain", domain |
| raise ValueError("Invalid domain.") |
|
|
| if wanted == "username": |
| if USERNAME_RE.fullmatch(candidate): |
| return "username", candidate |
| raise ValueError("Invalid username.") |
|
|
| if wanted == "email": |
| lowered = candidate.lower() |
| if EMAIL_RE.fullmatch(lowered): |
| return "email", lowered |
| raise ValueError("Invalid email.") |
|
|
| if wanted == "ip": |
| try: |
| return "ip", str(ipaddress.ip_address(candidate)) |
| except ValueError as exc: |
| raise ValueError("Invalid IP address.") from exc |
|
|
| if wanted == "url": |
| parsed = urlparse(candidate) |
| if parsed.scheme in {"http", "https"} and parsed.netloc: |
| return "url", f"{parsed.scheme.lower()}://{parsed.netloc.lower()}{parsed.path or ''}" |
| raise ValueError("Invalid URL. Only http:// and https:// are supported.") |
|
|
| raise ValueError("Unknown indicator type.") |
|
|
|
|
| |
| |
| |
|
|
| AUTHORIZED_ONLY_MODULES = { |
| "HTTP Headers", |
| "Robots.txt", |
| } |
|
|
| PASSIVE_MODULES = { |
| "Resource Links", |
| "DNS Records", |
| "Local URL Parse", |
| "HTTP Headers", |
| "Robots.txt", |
| } |
|
|
|
|
| def build_links(indicator_type: IndicatorType, normalized: str) -> str: |
| links = OSINT_LINKS.get(indicator_type, []) |
| if not links: |
| return "_No source links registered for this indicator type._" |
|
|
| query = quote_plus(normalized) |
| rows = [] |
| for source in links: |
| url = source["url"].replace("{query}", query) |
| rows.append(f"- [{source['name']}]({url}) — {source['description']}") |
| return "\n".join(rows) |
|
|
|
|
| def resolve_dns(domain: str) -> dict[str, Any]: |
| """ |
| Uses local resolver. This is passive/low-impact but still a network lookup. |
| """ |
| result: dict[str, Any] = {"A": [], "AAAA": [], "MX": [], "NS": []} |
|
|
| try: |
| socket.setdefaulttimeout(NETWORK_TIMEOUT_SECONDS) |
| for family, key in ((socket.AF_INET, "A"), (socket.AF_INET6, "AAAA")): |
| try: |
| records = socket.getaddrinfo(domain, None, family, socket.SOCK_STREAM) |
| result[key] = sorted({record[4][0] for record in records}) |
| except socket.gaierror: |
| result[key] = [] |
| except Exception as exc: |
| result["error"] = str(exc) |
|
|
| return result |
|
|
|
|
| def parse_url_locally(url: str) -> dict[str, Any]: |
| parsed = urlparse(url) |
| return { |
| "scheme": parsed.scheme, |
| "hostname": parsed.hostname, |
| "path": parsed.path, |
| "query_present": bool(parsed.query), |
| "fragment_present": bool(parsed.fragment), |
| } |
|
|
|
|
| def fetch_http_headers(url_or_domain: str) -> dict[str, Any]: |
| """ |
| Conditional module. Requires explicit authorization. |
| Uses Python stdlib sockets? To avoid extra dependency, only emit a safe placeholder. |
| Add httpx to requirements.txt and implement a bounded HEAD request if desired. |
| """ |
| return { |
| "status": "not_implemented", |
| "reason": "HTTP header fetching is gated and intentionally stubbed in the base app. " |
| "Implement with httpx only after authorization/rate-limit review.", |
| } |
|
|
|
|
| def fetch_robots(url_or_domain: str) -> dict[str, Any]: |
| return { |
| "status": "not_implemented", |
| "reason": "robots.txt retrieval is gated and intentionally stubbed in the base app.", |
| } |
|
|
|
|
| |
| |
| |
|
|
| def detect_drift( |
| indicator_type: IndicatorType, |
| normalized: str, |
| modules_requested: list[str], |
| modules_blocked: list[str], |
| errors: list[str], |
| authorized_target: bool, |
| ) -> dict[str, float]: |
| """ |
| Lightweight initial drift vector. In later versions, persist baselines and compare windows. |
| """ |
| drift = { |
| "statistical": 0.0, |
| "behavioral": 0.0, |
| "structural": 0.0, |
| "adversarial": 0.0, |
| "operational": 0.0, |
| "policy": 0.0, |
| } |
|
|
| suspicious_patterns = [ |
| r"\.\./", |
| r"%2e%2e", |
| r";", |
| r"\|", |
| r"\$\(", |
| r"`", |
| r"<script", |
| r"127\.0\.0\.1", |
| r"localhost", |
| r"169\.254\.169\.254", |
| ] |
| lowered = normalized.lower() |
| if any(re.search(pattern, lowered) for pattern in suspicious_patterns): |
| drift["adversarial"] = 0.7 |
|
|
| if modules_blocked: |
| drift["policy"] = 0.4 |
|
|
| if errors: |
| drift["operational"] = min(0.2 * len(errors), 1.0) |
|
|
| if indicator_type == "unknown": |
| drift["statistical"] = 0.3 |
|
|
| if not authorized_target and any(m in AUTHORIZED_ONLY_MODULES for m in modules_requested): |
| drift["policy"] = max(drift["policy"], 0.6) |
|
|
| return drift |
|
|
|
|
| def choose_correction(drift: dict[str, float]) -> CorrectionVerb: |
| """ |
| Correction is intentionally conservative. |
| """ |
| if drift.get("policy", 0.0) >= 0.6: |
| return "REVERT" |
|
|
| if drift.get("structural", 0.0) >= 0.5 or drift.get("behavioral", 0.0) >= 0.5: |
| return "REVERT" |
|
|
| if drift.get("adversarial", 0.0) >= 0.3: |
| return "CONSTRAIN" |
|
|
| if drift.get("statistical", 0.0) >= 0.5 and drift.get("adversarial", 0.0) == 0: |
| return "ADAPT" |
|
|
| return "OBSERVE" |
|
|
|
|
| |
| |
| |
|
|
| def write_audit(event: TelemetryEvent) -> Path: |
| path = AUDIT_DIR / f"{event.run_id}.json" |
| path.write_text(json.dumps(asdict(event), indent=2, sort_keys=True), encoding="utf-8") |
| return path |
|
|
|
|
| def write_report(result: EnrichmentResult, manifest: Manifest) -> Path: |
| path = REPORTS_DIR / f"{result.run_id}.md" |
|
|
| passive_json = json.dumps(result.passive_results, indent=2, sort_keys=True) |
| drift_json = json.dumps(result.drift_vector, indent=2, sort_keys=True) |
|
|
| body = f"""# Passive OSINT Report |
| |
| ## Run |
| |
| - Run ID: `{result.run_id}` |
| - Timestamp: `{now_utc()}` |
| - Indicator Type: `{result.indicator_type}` |
| - Indicator Hash: `{result.indicator_hash}` |
| - Correction Verb: `{result.correction_verb}` |
| |
| ## Manifest |
| |
| - Artifact: `{manifest.artifact_id}` |
| - Version: `{manifest.version}` |
| - Manifest Hash: `{manifest.manifest_hash}` |
| - Tier: `{manifest.tier}` |
| |
| ## Source Links |
| |
| {result.links_markdown} |
| |
| ## Passive Results |
| |
| ```json |
| {passive_json} |
| ``` |
| |
| ## Drift Vector |
| |
| ```json |
| {drift_json} |
| ``` |
| |
| ## Errors |
| |
| {chr(10).join(f"- {error}" for error in result.errors) if result.errors else "- None"} |
| """ |
|
|
| path.write_text(body, encoding="utf-8") |
| return path |
|
|
|
|
| def format_result_markdown(result: EnrichmentResult) -> str: |
| passive_json = json.dumps(result.passive_results, indent=2, sort_keys=True) |
| drift_json = json.dumps(result.drift_vector, indent=2, sort_keys=True) |
|
|
| return f""" |
| ## Result |
| |
| **Run ID:** `{result.run_id}` |
| **Type:** `{result.indicator_type}` |
| **Indicator Hash:** `{result.indicator_hash}` |
| **Correction:** `{result.correction_verb}` |
| |
| ### Source Links |
| |
| {result.links_markdown} |
| |
| ### Passive Results |
| |
| ```json |
| {passive_json} |
| ``` |
| |
| ### Drift Vector |
| |
| ```json |
| {drift_json} |
| ``` |
| |
| ### Logs |
| |
| - Audit: `{result.audit_path}` |
| - Report: `{result.report_path}` |
| |
| ### Errors |
| |
| {chr(10).join(f"- {error}" for error in result.errors) if result.errors else "- None"} |
| """ |
|
|
|
|
| |
| |
| |
|
|
| def run_enrichment( |
| raw_indicator: str, |
| forced_type: str, |
| selected_modules: list[str], |
| authorized_target: bool, |
| ) -> tuple[str, str | None, str | None]: |
| started = time.perf_counter() |
| run_id = f"run_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}" |
|
|
| errors: list[str] = [] |
| modules_executed: list[str] = [] |
| modules_blocked: list[str] = [] |
| passive_results: dict[str, Any] = {} |
|
|
| try: |
| indicator_type, normalized = classify_and_normalize(raw_indicator, forced_type) |
| indicator_hash = hmac_sha256(normalized) |
| except Exception as exc: |
| return f"## Input rejected\n\n{str(exc)}", None, None |
|
|
| selected_modules = selected_modules or ["Resource Links"] |
|
|
| |
| for module in selected_modules: |
| if module in AUTHORIZED_ONLY_MODULES and not authorized_target: |
| modules_blocked.append(module) |
|
|
| executable_modules = [m for m in selected_modules if m not in modules_blocked] |
|
|
| links_markdown = "" |
| if "Resource Links" in executable_modules: |
| links_markdown = build_links(indicator_type, normalized) |
| modules_executed.append("Resource Links") |
| else: |
| links_markdown = "_Resource link generation not selected._" |
|
|
| if "DNS Records" in executable_modules: |
| if indicator_type == "domain": |
| passive_results["dns"] = resolve_dns(normalized) |
| modules_executed.append("DNS Records") |
| else: |
| errors.append("DNS Records module requires a domain indicator.") |
|
|
| if "Local URL Parse" in executable_modules: |
| if indicator_type == "url": |
| passive_results["url_parse"] = parse_url_locally(normalized) |
| modules_executed.append("Local URL Parse") |
| else: |
| errors.append("Local URL Parse module requires a URL indicator.") |
|
|
| if "HTTP Headers" in executable_modules: |
| passive_results["http_headers"] = fetch_http_headers(normalized) |
| modules_executed.append("HTTP Headers") |
|
|
| if "Robots.txt" in executable_modules: |
| passive_results["robots"] = fetch_robots(normalized) |
| modules_executed.append("Robots.txt") |
|
|
| if modules_blocked: |
| errors.append( |
| "Blocked authorized-only module(s): " |
| + ", ".join(modules_blocked) |
| + ". Confirm target authorization to enable them." |
| ) |
|
|
| drift = detect_drift( |
| indicator_type=indicator_type, |
| normalized=normalized, |
| modules_requested=selected_modules, |
| modules_blocked=modules_blocked, |
| errors=errors, |
| authorized_target=authorized_target, |
| ) |
| correction = choose_correction(drift) |
|
|
| duration_ms = int((time.perf_counter() - started) * 1000) |
|
|
| event = TelemetryEvent( |
| run_id=run_id, |
| timestamp=now_utc(), |
| artifact_id=MANIFEST.artifact_id, |
| manifest_hash=MANIFEST.manifest_hash, |
| indicator_type=indicator_type, |
| indicator_hash=indicator_hash, |
| authorized_target=authorized_target, |
| modules_requested=selected_modules, |
| modules_executed=modules_executed, |
| modules_blocked=modules_blocked, |
| drift_vector=drift, |
| correction_verb=correction, |
| duration_ms=duration_ms, |
| errors=errors, |
| ) |
| audit_path = write_audit(event) |
|
|
| result = EnrichmentResult( |
| run_id=run_id, |
| indicator_type=indicator_type, |
| normalized_indicator="[redacted]", |
| indicator_hash=indicator_hash, |
| links_markdown=links_markdown, |
| passive_results=passive_results, |
| drift_vector=drift, |
| correction_verb=correction, |
| report_path="", |
| audit_path=str(audit_path), |
| errors=errors, |
| ) |
| report_path = write_report(result, MANIFEST) |
| result.report_path = str(report_path) |
|
|
| return format_result_markdown(result), str(report_path), str(audit_path) |
|
|
|
|
| def export_audit_index() -> str | None: |
| audit_files = sorted(AUDIT_DIR.glob("*.json")) |
| if not audit_files: |
| return None |
|
|
| csv_path = RUNS_DIR / "audit_index.csv" |
|
|
| rows = [] |
| for path in audit_files: |
| try: |
| data = json.loads(path.read_text(encoding="utf-8")) |
| rows.append( |
| { |
| "timestamp": data.get("timestamp"), |
| "run_id": data.get("run_id"), |
| "artifact_id": data.get("artifact_id"), |
| "indicator_type": data.get("indicator_type"), |
| "indicator_hash": data.get("indicator_hash"), |
| "authorized_target": data.get("authorized_target"), |
| "correction_verb": data.get("correction_verb"), |
| "duration_ms": data.get("duration_ms"), |
| } |
| ) |
| except Exception: |
| continue |
|
|
| with csv_path.open("w", newline="", encoding="utf-8") as handle: |
| writer = csv.DictWriter(handle, fieldnames=list(rows[0].keys())) |
| writer.writeheader() |
| writer.writerows(rows) |
|
|
| return str(csv_path) |
|
|
|
|
| def show_manifest() -> str: |
| return f"""```json |
| {json.dumps(asdict(MANIFEST), indent=2, sort_keys=True)} |
| ```""" |
|
|
|
|
| |
| |
| |
|
|
| DESCRIPTION = """ |
| This Space is a passive, drift-aware OSINT control panel. |
| |
| It validates, sanitises, normalises, and hashes indicators before audit persistence. |
| Authorized-only modules are blocked unless explicitly confirmed. |
| Correction decisions are limited to ADAPT, CONSTRAIN, REVERT, or OBSERVE. |
| """ |
|
|
| with gr.Blocks(title=APP_NAME) as demo: |
| gr.Markdown(f"# {APP_NAME}") |
| gr.Markdown(DESCRIPTION) |
|
|
| with gr.Tab("Control Panel"): |
| with gr.Row(): |
| raw_indicator = gr.Textbox( |
| label="Indicator", |
| placeholder="example.com, username, user@example.com, 8.8.8.8, https://example.com/path", |
| max_lines=1, |
| ) |
| forced_type = gr.Dropdown( |
| ["Auto", "Domain", "Username", "Email", "IP", "URL"], |
| value="Auto", |
| label="Indicator Type", |
| ) |
|
|
| selected_modules = gr.CheckboxGroup( |
| choices=sorted(PASSIVE_MODULES), |
| value=["Resource Links"], |
| label="Modules", |
| ) |
|
|
| authorized_target = gr.Checkbox( |
| label="I confirm this target is authorized for conditional interaction", |
| value=False, |
| ) |
|
|
| run_button = gr.Button("Run Passive Enrichment", variant="primary") |
|
|
| output = gr.Markdown(label="Output") |
| report_file = gr.File(label="Markdown Report") |
| audit_file = gr.File(label="Audit JSON") |
|
|
| run_button.click( |
| fn=run_enrichment, |
| inputs=[raw_indicator, forced_type, selected_modules, authorized_target], |
| outputs=[output, report_file, audit_file], |
| ) |
|
|
| with gr.Tab("Manifest"): |
| gr.Markdown( |
| "The manifest declares the assumptions and invariants this artifact was built under." |
| ) |
| manifest_output = gr.Markdown(value=show_manifest()) |
|
|
| with gr.Tab("Audit Export"): |
| gr.Markdown("Export a CSV index of audit records. Raw indicators are not included.") |
| export_button = gr.Button("Export Audit Index") |
| audit_index_file = gr.File(label="Audit Index CSV") |
| export_button.click(fn=export_audit_index, inputs=[], outputs=[audit_index_file]) |
|
|
| with gr.Tab("Policy"): |
| gr.Markdown( |
| """ |
| ## Operating Policy |
| |
| - Passive by default. |
| - No scanning, brute forcing, exploitation, credential testing, or directory fuzzing. |
| - No raw indicators in audit logs. |
| - Authorized-only modules require explicit confirmation. |
| - Correction is the only state mutation authority. |
| - Correction cannot rewrite its own policy. |
| |
| ## Correction Verbs |
| |
| - **ADAPT**: assumptions may be safely updated. |
| - **CONSTRAIN**: reduce capability under uncertainty or adversarial pressure. |
| - **REVERT**: restore prior known-good state. |
| - **OBSERVE**: log only; no mutation. |
| """ |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|