rhodawk-ai-devops-engine / exploit_primitives.py
Architect8999's picture
feat: Hermes Zero-Day Research Engine - 9 new modules + UI tab
5a2c9e0 verified
"""
Rhodawk AI — Exploit Primitive Reasoner
=========================================
Given a crash or vulnerability candidate, reasons about:
1. Exploitability class (overflow, UAF, injection, race, crypto, logic)
2. Control flow impact (can attacker redirect execution?)
3. Data flow impact (can attacker read/write arbitrary memory?)
4. Proof-of-Concept generation (minimal triggerable input)
5. Severity and bounty tier estimate
Uses DeepSeek-R1 (reasoning model) for deep exploit chain analysis.
Never auto-submits — all output goes to the disclosure pipeline for human review.
"""
from __future__ import annotations
import hashlib
import json
import os
import time
import requests
from dataclasses import dataclass, field
from typing import Optional
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
EXPLOIT_MODEL = os.getenv("HERMES_MODEL", "deepseek/deepseek-r1:free")
OPENROUTER_BASE = "https://openrouter.ai/api/v1"
@dataclass
class ExploitAnalysis:
vulnerability_id: str
exploit_class: str # memory_corruption | injection | logic | crypto | race | info_disclosure
control_flow_impact: str # full_control | partial_control | none
data_impact: str # arbitrary_read_write | arbitrary_read | arbitrary_write | none
auth_bypass_possible: bool
remote_exploitable: bool
exploit_complexity: str # LOW | MEDIUM | HIGH
estimated_cvss: float
bounty_tier: str # P1 ($10k+) | P2 ($5k) | P3 ($2k) | P4 (<$1k)
proof_of_concept: str
attack_scenario: str
mitigations_present: list[str]
confidence: float
reasoning: str
_EXPLOIT_SYSTEM = """You are a world-class exploit developer and vulnerability researcher.
Given a crash, vulnerability description, or suspicious code pattern, analyze its exploitability
with extreme precision. Think like an attacker trying to maximize impact.
Consider:
- Can the attacker control the input that triggers this?
- Does this give code execution, information disclosure, or denial of service?
- What mitigations are in place (ASLR, stack canaries, bounds checking, authentication)?
- What is the minimal proof-of-concept that demonstrates the issue?
- What is the realistic attack scenario (network, local, authenticated, unauthenticated)?
Be honest about uncertainty. If you cannot determine exploitability, say so clearly.
A false positive wastes a researcher's time and damages credibility.
Respond in this exact JSON format:
{
"exploit_class": "memory_corruption|injection|logic|crypto|race|info_disclosure|denial_of_service",
"control_flow_impact": "full_control|partial_control|none",
"data_impact": "arbitrary_read_write|arbitrary_read|arbitrary_write|none",
"auth_bypass_possible": true/false,
"remote_exploitable": true/false,
"exploit_complexity": "LOW|MEDIUM|HIGH",
"estimated_cvss": 0.0-10.0,
"bounty_tier": "P1|P2|P3|P4",
"proof_of_concept": "minimal code or input that triggers the issue",
"attack_scenario": "step by step how an attacker would exploit this",
"mitigations_present": ["mitigation1", "mitigation2"],
"confidence": 0.0-1.0,
"reasoning": "your full chain of reasoning about exploitability"
}
"""
_CVSS_TO_TIER = {
(9.0, 10.0): "P1",
(7.0, 8.9): "P2",
(4.0, 6.9): "P3",
(0.0, 3.9): "P4",
}
def _estimate_bounty_tier(cvss: float) -> str:
for (lo, hi), tier in _CVSS_TO_TIER.items():
if lo <= cvss <= hi:
return tier
return "P4"
def _call_exploit_reasoner(prompt: str) -> dict:
if not OPENROUTER_API_KEY:
return {
"exploit_class": "unknown",
"control_flow_impact": "unknown",
"data_impact": "unknown",
"auth_bypass_possible": False,
"remote_exploitable": False,
"exploit_complexity": "HIGH",
"estimated_cvss": 0.0,
"bounty_tier": "P4",
"proof_of_concept": "OPENROUTER_API_KEY not set — manual analysis required",
"attack_scenario": "",
"mitigations_present": [],
"confidence": 0.0,
"reasoning": "API key not available",
}
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
"HTTP-Referer": "https://rhodawk.ai",
"X-Title": "Rhodawk Exploit Reasoner",
}
payload = {
"model": EXPLOIT_MODEL,
"messages": [
{"role": "system", "content": _EXPLOIT_SYSTEM},
{"role": "user", "content": prompt},
],
"temperature": 0.1,
"max_tokens": 2000,
"response_format": {"type": "json_object"},
}
try:
resp = requests.post(
f"{OPENROUTER_BASE}/chat/completions",
headers=headers, json=payload, timeout=120,
)
resp.raise_for_status()
content = resp.json()["choices"][0]["message"]["content"]
return json.loads(content)
except Exception as e:
return {
"exploit_class": "analysis_failed",
"control_flow_impact": "unknown",
"data_impact": "unknown",
"auth_bypass_possible": False,
"remote_exploitable": False,
"exploit_complexity": "HIGH",
"estimated_cvss": 0.0,
"bounty_tier": "P4",
"proof_of_concept": f"Analysis failed: {e}",
"attack_scenario": "",
"mitigations_present": [],
"confidence": 0.0,
"reasoning": str(e),
}
def reason_exploitability(
crash_input: str,
crash_output: str,
file_path: str,
vuln_type: str,
source_context: str = "",
) -> dict:
"""
Main entry point for exploit primitive reasoning.
Returns a detailed exploitability analysis.
"""
vuln_id = hashlib.sha256(f"{file_path}{crash_input}{time.time()}".encode()).hexdigest()[:12]
prompt = (
f"VULNERABILITY TYPE: {vuln_type}\n"
f"FILE: {file_path}\n\n"
f"CRASH INPUT / TRIGGER:\n```\n{crash_input[:1000]}\n```\n\n"
f"CRASH OUTPUT / STACK TRACE:\n```\n{crash_output[:2000]}\n```\n\n"
)
if source_context:
prompt += f"SOURCE CONTEXT:\n```\n{source_context[:1500]}\n```\n\n"
prompt += "Analyze this vulnerability's exploitability in full detail."
print(f"[EXPLOIT] Reasoning about {vuln_type} in {file_path} (ID: {vuln_id})")
raw = _call_exploit_reasoner(prompt)
cvss = float(raw.get("estimated_cvss", 0.0))
tier = raw.get("bounty_tier") or _estimate_bounty_tier(cvss)
analysis = ExploitAnalysis(
vulnerability_id=vuln_id,
exploit_class=raw.get("exploit_class", "unknown"),
control_flow_impact=raw.get("control_flow_impact", "none"),
data_impact=raw.get("data_impact", "none"),
auth_bypass_possible=bool(raw.get("auth_bypass_possible", False)),
remote_exploitable=bool(raw.get("remote_exploitable", False)),
exploit_complexity=raw.get("exploit_complexity", "HIGH"),
estimated_cvss=round(cvss, 1),
bounty_tier=tier,
proof_of_concept=raw.get("proof_of_concept", ""),
attack_scenario=raw.get("attack_scenario", ""),
mitigations_present=raw.get("mitigations_present", []),
confidence=float(raw.get("confidence", 0.0)),
reasoning=raw.get("reasoning", "")[:1000],
)
print(f"[EXPLOIT] {vuln_id}: CVSS={analysis.estimated_cvss}, Tier={analysis.bounty_tier}, "
f"Confidence={analysis.confidence}")
return {
"vulnerability_id": analysis.vulnerability_id,
"exploit_class": analysis.exploit_class,
"control_flow_impact": analysis.control_flow_impact,
"data_impact": analysis.data_impact,
"auth_bypass_possible": analysis.auth_bypass_possible,
"remote_exploitable": analysis.remote_exploitable,
"exploit_complexity": analysis.exploit_complexity,
"estimated_cvss": analysis.estimated_cvss,
"bounty_tier": analysis.bounty_tier,
"proof_of_concept": analysis.proof_of_concept,
"attack_scenario": analysis.attack_scenario,
"mitigations_present": analysis.mitigations_present,
"confidence": analysis.confidence,
"reasoning": analysis.reasoning,
"disclosure_ready": analysis.confidence >= 0.7 and analysis.estimated_cvss >= 4.0,
"requires_human_review": True,
}
def generate_cve_draft(
finding_title: str,
finding_description: str,
exploit_analysis: dict,
affected_repo: str,
affected_versions: str = "unspecified",
) -> dict:
"""
Generate a CVE draft report ready for human review and submission.
Output format follows CVE JSON 5.0 schema.
"""
cvss = exploit_analysis.get("estimated_cvss", 0.0)
tier = exploit_analysis.get("bounty_tier", "P4")
exploit_class = exploit_analysis.get("exploit_class", "unknown")
cwe_map = {
"memory_corruption": "CWE-119",
"injection": "CWE-74",
"logic": "CWE-840",
"crypto": "CWE-310",
"race": "CWE-362",
"info_disclosure": "CWE-200",
"denial_of_service": "CWE-400",
}
cwe = cwe_map.get(exploit_class, "CWE-UNKNOWN")
severity = "CRITICAL" if cvss >= 9.0 else "HIGH" if cvss >= 7.0 else "MEDIUM" if cvss >= 4.0 else "LOW"
draft = {
"dataType": "CVE_RECORD",
"dataVersion": "5.0",
"cveMetadata": {
"state": "DRAFT",
"assignerOrgId": "rhodawk-ai",
"dateReserved": time.strftime("%Y-%m-%dT%H:%M:%S.000Z"),
},
"containers": {
"cna": {
"title": finding_title,
"descriptions": [
{
"lang": "en",
"value": finding_description,
}
],
"affected": [
{
"repo": f"https://github.com/{affected_repo}",
"versions": [{"version": affected_versions, "status": "affected"}],
}
],
"problemTypes": [
{"descriptions": [{"type": "CWE", "cweId": cwe, "lang": "en"}]}
],
"metrics": [
{
"cvssV3_1": {
"version": "3.1",
"baseScore": cvss,
"baseSeverity": severity,
}
}
],
"references": [
{"url": f"https://github.com/{affected_repo}", "name": "Repository"}
],
"x_rhodawk_metadata": {
"bounty_tier": tier,
"exploit_class": exploit_class,
"disclosure_status": "PENDING_HUMAN_APPROVAL",
"generated_by": "Rhodawk AI Security Research Engine",
"requires_human_verification": True,
"analyst_notes": exploit_analysis.get("reasoning", "")[:500],
},
}
},
}
return {
"cve_draft": draft,
"severity": severity,
"cvss": cvss,
"bounty_tier": tier,
"ready_for_human_review": True,
"auto_submit": False,
}