| """ |
| Rhodawk AI — Exploit Primitive Reasoner |
| ========================================= |
| Given a crash or vulnerability candidate, reasons about: |
| 1. Exploitability class (overflow, UAF, injection, race, crypto, logic) |
| 2. Control flow impact (can attacker redirect execution?) |
| 3. Data flow impact (can attacker read/write arbitrary memory?) |
| 4. Proof-of-Concept generation (minimal triggerable input) |
| 5. Severity and bounty tier estimate |
| |
| Uses DeepSeek-R1 (reasoning model) for deep exploit chain analysis. |
| Never auto-submits — all output goes to the disclosure pipeline for human review. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import json |
| import os |
| import time |
| import requests |
| from dataclasses import dataclass, field |
| from typing import Optional |
|
|
|
|
| OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") |
| EXPLOIT_MODEL = os.getenv("HERMES_MODEL", "deepseek/deepseek-r1:free") |
| OPENROUTER_BASE = "https://openrouter.ai/api/v1" |
|
|
|
|
| @dataclass |
| class ExploitAnalysis: |
| vulnerability_id: str |
| exploit_class: str |
| control_flow_impact: str |
| data_impact: str |
| auth_bypass_possible: bool |
| remote_exploitable: bool |
| exploit_complexity: str |
| estimated_cvss: float |
| bounty_tier: str |
| proof_of_concept: str |
| attack_scenario: str |
| mitigations_present: list[str] |
| confidence: float |
| reasoning: str |
|
|
|
|
| _EXPLOIT_SYSTEM = """You are a world-class exploit developer and vulnerability researcher. |
| Given a crash, vulnerability description, or suspicious code pattern, analyze its exploitability |
| with extreme precision. Think like an attacker trying to maximize impact. |
| |
| Consider: |
| - Can the attacker control the input that triggers this? |
| - Does this give code execution, information disclosure, or denial of service? |
| - What mitigations are in place (ASLR, stack canaries, bounds checking, authentication)? |
| - What is the minimal proof-of-concept that demonstrates the issue? |
| - What is the realistic attack scenario (network, local, authenticated, unauthenticated)? |
| |
| Be honest about uncertainty. If you cannot determine exploitability, say so clearly. |
| A false positive wastes a researcher's time and damages credibility. |
| |
| Respond in this exact JSON format: |
| { |
| "exploit_class": "memory_corruption|injection|logic|crypto|race|info_disclosure|denial_of_service", |
| "control_flow_impact": "full_control|partial_control|none", |
| "data_impact": "arbitrary_read_write|arbitrary_read|arbitrary_write|none", |
| "auth_bypass_possible": true/false, |
| "remote_exploitable": true/false, |
| "exploit_complexity": "LOW|MEDIUM|HIGH", |
| "estimated_cvss": 0.0-10.0, |
| "bounty_tier": "P1|P2|P3|P4", |
| "proof_of_concept": "minimal code or input that triggers the issue", |
| "attack_scenario": "step by step how an attacker would exploit this", |
| "mitigations_present": ["mitigation1", "mitigation2"], |
| "confidence": 0.0-1.0, |
| "reasoning": "your full chain of reasoning about exploitability" |
| } |
| """ |
|
|
| _CVSS_TO_TIER = { |
| (9.0, 10.0): "P1", |
| (7.0, 8.9): "P2", |
| (4.0, 6.9): "P3", |
| (0.0, 3.9): "P4", |
| } |
|
|
|
|
| def _estimate_bounty_tier(cvss: float) -> str: |
| for (lo, hi), tier in _CVSS_TO_TIER.items(): |
| if lo <= cvss <= hi: |
| return tier |
| return "P4" |
|
|
|
|
| def _call_exploit_reasoner(prompt: str) -> dict: |
| if not OPENROUTER_API_KEY: |
| return { |
| "exploit_class": "unknown", |
| "control_flow_impact": "unknown", |
| "data_impact": "unknown", |
| "auth_bypass_possible": False, |
| "remote_exploitable": False, |
| "exploit_complexity": "HIGH", |
| "estimated_cvss": 0.0, |
| "bounty_tier": "P4", |
| "proof_of_concept": "OPENROUTER_API_KEY not set — manual analysis required", |
| "attack_scenario": "", |
| "mitigations_present": [], |
| "confidence": 0.0, |
| "reasoning": "API key not available", |
| } |
|
|
| headers = { |
| "Authorization": f"Bearer {OPENROUTER_API_KEY}", |
| "Content-Type": "application/json", |
| "HTTP-Referer": "https://rhodawk.ai", |
| "X-Title": "Rhodawk Exploit Reasoner", |
| } |
| payload = { |
| "model": EXPLOIT_MODEL, |
| "messages": [ |
| {"role": "system", "content": _EXPLOIT_SYSTEM}, |
| {"role": "user", "content": prompt}, |
| ], |
| "temperature": 0.1, |
| "max_tokens": 2000, |
| "response_format": {"type": "json_object"}, |
| } |
| try: |
| resp = requests.post( |
| f"{OPENROUTER_BASE}/chat/completions", |
| headers=headers, json=payload, timeout=120, |
| ) |
| resp.raise_for_status() |
| content = resp.json()["choices"][0]["message"]["content"] |
| return json.loads(content) |
| except Exception as e: |
| return { |
| "exploit_class": "analysis_failed", |
| "control_flow_impact": "unknown", |
| "data_impact": "unknown", |
| "auth_bypass_possible": False, |
| "remote_exploitable": False, |
| "exploit_complexity": "HIGH", |
| "estimated_cvss": 0.0, |
| "bounty_tier": "P4", |
| "proof_of_concept": f"Analysis failed: {e}", |
| "attack_scenario": "", |
| "mitigations_present": [], |
| "confidence": 0.0, |
| "reasoning": str(e), |
| } |
|
|
|
|
| def reason_exploitability( |
| crash_input: str, |
| crash_output: str, |
| file_path: str, |
| vuln_type: str, |
| source_context: str = "", |
| ) -> dict: |
| """ |
| Main entry point for exploit primitive reasoning. |
| Returns a detailed exploitability analysis. |
| """ |
| vuln_id = hashlib.sha256(f"{file_path}{crash_input}{time.time()}".encode()).hexdigest()[:12] |
|
|
| prompt = ( |
| f"VULNERABILITY TYPE: {vuln_type}\n" |
| f"FILE: {file_path}\n\n" |
| f"CRASH INPUT / TRIGGER:\n```\n{crash_input[:1000]}\n```\n\n" |
| f"CRASH OUTPUT / STACK TRACE:\n```\n{crash_output[:2000]}\n```\n\n" |
| ) |
| if source_context: |
| prompt += f"SOURCE CONTEXT:\n```\n{source_context[:1500]}\n```\n\n" |
|
|
| prompt += "Analyze this vulnerability's exploitability in full detail." |
|
|
| print(f"[EXPLOIT] Reasoning about {vuln_type} in {file_path} (ID: {vuln_id})") |
| raw = _call_exploit_reasoner(prompt) |
|
|
| cvss = float(raw.get("estimated_cvss", 0.0)) |
| tier = raw.get("bounty_tier") or _estimate_bounty_tier(cvss) |
|
|
| analysis = ExploitAnalysis( |
| vulnerability_id=vuln_id, |
| exploit_class=raw.get("exploit_class", "unknown"), |
| control_flow_impact=raw.get("control_flow_impact", "none"), |
| data_impact=raw.get("data_impact", "none"), |
| auth_bypass_possible=bool(raw.get("auth_bypass_possible", False)), |
| remote_exploitable=bool(raw.get("remote_exploitable", False)), |
| exploit_complexity=raw.get("exploit_complexity", "HIGH"), |
| estimated_cvss=round(cvss, 1), |
| bounty_tier=tier, |
| proof_of_concept=raw.get("proof_of_concept", ""), |
| attack_scenario=raw.get("attack_scenario", ""), |
| mitigations_present=raw.get("mitigations_present", []), |
| confidence=float(raw.get("confidence", 0.0)), |
| reasoning=raw.get("reasoning", "")[:1000], |
| ) |
|
|
| print(f"[EXPLOIT] {vuln_id}: CVSS={analysis.estimated_cvss}, Tier={analysis.bounty_tier}, " |
| f"Confidence={analysis.confidence}") |
|
|
| return { |
| "vulnerability_id": analysis.vulnerability_id, |
| "exploit_class": analysis.exploit_class, |
| "control_flow_impact": analysis.control_flow_impact, |
| "data_impact": analysis.data_impact, |
| "auth_bypass_possible": analysis.auth_bypass_possible, |
| "remote_exploitable": analysis.remote_exploitable, |
| "exploit_complexity": analysis.exploit_complexity, |
| "estimated_cvss": analysis.estimated_cvss, |
| "bounty_tier": analysis.bounty_tier, |
| "proof_of_concept": analysis.proof_of_concept, |
| "attack_scenario": analysis.attack_scenario, |
| "mitigations_present": analysis.mitigations_present, |
| "confidence": analysis.confidence, |
| "reasoning": analysis.reasoning, |
| "disclosure_ready": analysis.confidence >= 0.7 and analysis.estimated_cvss >= 4.0, |
| "requires_human_review": True, |
| } |
|
|
|
|
| def generate_cve_draft( |
| finding_title: str, |
| finding_description: str, |
| exploit_analysis: dict, |
| affected_repo: str, |
| affected_versions: str = "unspecified", |
| ) -> dict: |
| """ |
| Generate a CVE draft report ready for human review and submission. |
| Output format follows CVE JSON 5.0 schema. |
| """ |
| cvss = exploit_analysis.get("estimated_cvss", 0.0) |
| tier = exploit_analysis.get("bounty_tier", "P4") |
| exploit_class = exploit_analysis.get("exploit_class", "unknown") |
|
|
| cwe_map = { |
| "memory_corruption": "CWE-119", |
| "injection": "CWE-74", |
| "logic": "CWE-840", |
| "crypto": "CWE-310", |
| "race": "CWE-362", |
| "info_disclosure": "CWE-200", |
| "denial_of_service": "CWE-400", |
| } |
| cwe = cwe_map.get(exploit_class, "CWE-UNKNOWN") |
|
|
| severity = "CRITICAL" if cvss >= 9.0 else "HIGH" if cvss >= 7.0 else "MEDIUM" if cvss >= 4.0 else "LOW" |
|
|
| draft = { |
| "dataType": "CVE_RECORD", |
| "dataVersion": "5.0", |
| "cveMetadata": { |
| "state": "DRAFT", |
| "assignerOrgId": "rhodawk-ai", |
| "dateReserved": time.strftime("%Y-%m-%dT%H:%M:%S.000Z"), |
| }, |
| "containers": { |
| "cna": { |
| "title": finding_title, |
| "descriptions": [ |
| { |
| "lang": "en", |
| "value": finding_description, |
| } |
| ], |
| "affected": [ |
| { |
| "repo": f"https://github.com/{affected_repo}", |
| "versions": [{"version": affected_versions, "status": "affected"}], |
| } |
| ], |
| "problemTypes": [ |
| {"descriptions": [{"type": "CWE", "cweId": cwe, "lang": "en"}]} |
| ], |
| "metrics": [ |
| { |
| "cvssV3_1": { |
| "version": "3.1", |
| "baseScore": cvss, |
| "baseSeverity": severity, |
| } |
| } |
| ], |
| "references": [ |
| {"url": f"https://github.com/{affected_repo}", "name": "Repository"} |
| ], |
| "x_rhodawk_metadata": { |
| "bounty_tier": tier, |
| "exploit_class": exploit_class, |
| "disclosure_status": "PENDING_HUMAN_APPROVAL", |
| "generated_by": "Rhodawk AI Security Research Engine", |
| "requires_human_verification": True, |
| "analyst_notes": exploit_analysis.get("reasoning", "")[:500], |
| }, |
| } |
| }, |
| } |
| return { |
| "cve_draft": draft, |
| "severity": severity, |
| "cvss": cvss, |
| "bounty_tier": tier, |
| "ready_for_human_review": True, |
| "auto_submit": False, |
| } |
|
|