File size: 14,380 Bytes
5a2c9e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59dc700
 
 
 
5a2c9e0
 
59dc700
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5a2c9e0
59dc700
 
 
 
 
 
5a2c9e0
59dc700
 
 
 
 
 
 
 
 
 
5a2c9e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59dc700
 
 
 
 
 
 
 
 
 
 
 
 
 
5a2c9e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
"""
Rhodawk AI — Autonomous Fuzzing Engine
=======================================
Generates language-aware fuzzing harnesses using LLM then executes them.
Integrates with AFL++, libFuzzer (via atheris for Python), and Hypothesis.

Pipeline per target:
  1. LLM generates a harness tailored to the target function/API
  2. Harness is written to /tmp and compiled/instrumented
  3. Fuzzer runs for duration_s seconds with coverage feedback
  4. Crashes are triaged: unique crashes extracted, deduped by stack hash
  5. Results returned for exploit_primitives reasoning

Supported modes:
  - Python   → atheris (libFuzzer bindings for Python)
  - C/C++    → AFL++ subprocess (if installed)
  - JS/TS    → jsfuzz / fast-check property testing
  - Generic  → Hypothesis with AI-generated strategies
"""

from __future__ import annotations

import hashlib
import json
import os
import subprocess
import tempfile
import time
from dataclasses import dataclass, field
from typing import Optional

import requests

OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
FUZZ_MODEL         = os.getenv("HERMES_FAST_MODEL", "deepseek/deepseek-v3:free")
OPENROUTER_BASE    = "https://openrouter.ai/api/v1"

MAX_FUZZ_DURATION  = int(os.getenv("RHODAWK_MAX_FUZZ_DURATION", "120"))
FUZZ_CORPUS_DIR    = os.getenv("RHODAWK_FUZZ_CORPUS", "/data/fuzz_corpus")


@dataclass
class CrashRecord:
    crash_id: str
    target: str
    crash_input: str
    crash_output: str
    stack_hash: str
    crash_type: str      # segfault | assertion | exception | timeout | oom
    is_unique: bool
    reproducer_path: str
    timestamp: str = field(default_factory=lambda: time.strftime("%Y-%m-%dT%H:%M:%SZ"))


@dataclass
class FuzzResult:
    target: str
    language: str
    duration_s: int
    total_executions: int
    unique_crashes: list[CrashRecord]
    coverage_percent: float
    harness_code: str
    error: Optional[str] = None


def _llm_generate_harness(
    target: str,
    language: str,
    repo_dir: str,
    source_context: str,
) -> str:
    """Use LLM to generate a fuzzing harness for the target function."""
    if not OPENROUTER_API_KEY:
        return _fallback_harness(target, language)

    system = (
        "You are an expert fuzzing engineer. Generate a minimal, correct fuzzing harness "
        "for the given target. The harness must: "
        "1) Accept raw bytes as input, 2) Parse them into valid arguments, "
        "3) Call the target without crashing on invalid input (catch exceptions), "
        "4) Be as fast as possible (no I/O, no sleep). "
        "Return ONLY the harness code, no explanation."
    )

    if language == "python":
        prompt = (
            f"TARGET FUNCTION: {target}\n"
            f"LANGUAGE: Python (atheris/libFuzzer)\n"
            f"SOURCE CONTEXT:\n```python\n{source_context[:2000]}\n```\n\n"
            "Generate an atheris fuzzing harness. Import atheris and the target module. "
            "The TestOneInput function must accept bytes. Use FuzzedDataProvider to extract typed values."
        )
    elif language in ("javascript", "typescript"):
        prompt = (
            f"TARGET FUNCTION: {target}\n"
            f"LANGUAGE: {language} (jsfuzz)\n"
            f"SOURCE CONTEXT:\n```javascript\n{source_context[:2000]}\n```\n\n"
            "Generate a jsfuzz harness. Export a default async function that accepts Buffer."
        )
    elif language in ("go",):
        prompt = (
            f"TARGET FUNCTION: {target}\n"
            f"LANGUAGE: Go (native fuzzing)\n"
            f"SOURCE CONTEXT:\n```go\n{source_context[:2000]}\n```\n\n"
            "Generate a Go fuzz test using testing.F and f.Fuzz()."
        )
    else:
        prompt = (
            f"TARGET: {target}\n"
            f"LANGUAGE: {language}\n"
            f"SOURCE:\n```\n{source_context[:2000]}\n```\n\n"
            "Generate a Hypothesis property-based test that explores the target's input space."
        )

    headers = {
        "Authorization": f"Bearer {OPENROUTER_API_KEY}",
        "Content-Type": "application/json",
    }
    payload = {
        "model": FUZZ_MODEL,
        "messages": [
            {"role": "system", "content": system},
            {"role": "user", "content": prompt},
        ],
        "temperature": 0.1,
        "max_tokens": 1500,
    }
    try:
        resp = requests.post(
            f"{OPENROUTER_BASE}/chat/completions",
            headers=headers, json=payload, timeout=60,
        )
        resp.raise_for_status()
        content = resp.json()["choices"][0]["message"]["content"]
        code = _extract_code_block(content)
        return code or _fallback_harness(target, language)
    except Exception:
        return _fallback_harness(target, language)


def _extract_code_block(text: str) -> str:
    """Extract first code block from markdown."""
    import re
    m = re.search(r"```(?:python|javascript|go|typescript|rust)?\n(.*?)```", text, re.DOTALL)
    if m:
        return m.group(1).strip()
    lines = [l for l in text.splitlines() if not l.startswith("```")]
    return "\n".join(lines).strip()


def _fallback_harness(target: str, language: str) -> str:
    """Generic fallback harness when LLM is unavailable."""
    if language == "python":
        # FIX (Build Error): atheris requires Clang + libFuzzer at compile time and
        # fails to build on HuggingFace Spaces.  The fallback harness now uses
        # hypothesis which is available everywhere, matching the Hypothesis
        # fallback already in use when atheris is unavailable at runtime.
        return f"""
import sys
try:
    import atheris
    _ATHERIS_AVAILABLE = True
except ImportError:
    _ATHERIS_AVAILABLE = False

if _ATHERIS_AVAILABLE:
    import sys

    @atheris.instrument_func
    def fuzz_target(data):
        fdp = atheris.FuzzedDataProvider(data)
        try:
            val = fdp.ConsumeUnicodeNoSurrogates(128)
            # TODO: call {target}(val)
        except Exception:
            pass

    atheris.Setup(sys.argv, fuzz_target)
    atheris.Fuzz()
else:
    # Hypothesis-based fallback when atheris/libFuzzer is unavailable
    from hypothesis import given, settings, HealthCheck
    from hypothesis import strategies as st

    @given(st.text(max_size=128))
    @settings(max_examples=500, suppress_health_check=list(HealthCheck))
    def fuzz_target(val):
        try:
            # TODO: call {target}(val)
            pass
        except Exception:
            pass

    fuzz_target()
"""
    return f"# Fallback harness for {target} ({language})\n# Manual harness required\n"


def _get_source_context(repo_dir: str, target: str, language: str) -> str:
    """Extract relevant source code context around the target function."""
    import glob as _glob

    ext_map = {
        "python": ["*.py"], "javascript": ["*.js"], "typescript": ["*.ts"],
        "go": ["*.go"], "rust": ["*.rs"], "java": ["*.java"], "ruby": ["*.rb"],
    }
    extensions = ext_map.get(language, ["*.*"])

    for ext in extensions:
        for fpath in _glob.glob(f"{repo_dir}/**/{ext}", recursive=True):
            if "test" in fpath.lower() or "node_modules" in fpath:
                continue
            try:
                content = open(fpath).read()
                if target.split(".")[-1] in content or target.split("::")[-1] in content:
                    rel = os.path.relpath(fpath, repo_dir)
                    return f"# File: {rel}\n{content[:3000]}"
            except Exception:
                pass
    return f"# Could not find source for {target}"


def _run_python_atheris(harness_code: str, duration_s: int) -> list[CrashRecord]:
    """Run atheris fuzzer on a Python harness."""
    crashes = []
    harness_path = None
    corpus_dir = None

    try:
        fd, harness_path = tempfile.mkstemp(suffix="_fuzz.py")
        with os.fdopen(fd, "w") as f:
            f.write(harness_code)

        corpus_dir = tempfile.mkdtemp(prefix="fuzz_corpus_")
        seed_file = os.path.join(corpus_dir, "seed")
        with open(seed_file, "wb") as f:
            f.write(b"hello world\x00\xff")

        crash_dir = tempfile.mkdtemp(prefix="fuzz_crashes_")

        cmd = [
            "python", harness_path,
            f"-max_total_time={duration_s}",
            f"-artifact_prefix={crash_dir}/",
            corpus_dir,
        ]

        proc = subprocess.Popen(
            cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
            text=True, timeout=duration_s + 30,
        )
        try:
            stdout, stderr = proc.communicate(timeout=duration_s + 30)
        except subprocess.TimeoutExpired:
            proc.kill()
            stdout, stderr = proc.communicate()

        combined = stdout + stderr
        for crash_file in os.listdir(crash_dir):
            if crash_file.startswith("crash-") or crash_file.startswith("oom-"):
                crash_path = os.path.join(crash_dir, crash_file)
                try:
                    with open(crash_path, "rb") as f:
                        crash_bytes = f.read()
                    crash_input = crash_bytes.hex()[:500]
                    stack_hash = hashlib.sha256(crash_bytes[:64]).hexdigest()[:16]
                    crashes.append(CrashRecord(
                        crash_id=hashlib.sha256(crash_bytes).hexdigest()[:12],
                        target="python_harness",
                        crash_input=crash_input,
                        crash_output=combined[-1000:],
                        stack_hash=stack_hash,
                        crash_type="exception" if "crash-" in crash_file else "oom",
                        is_unique=True,
                        reproducer_path=crash_path,
                    ))
                except Exception:
                    pass

    except Exception as e:
        crashes.append(CrashRecord(
            crash_id="setup_error",
            target="python_harness",
            crash_input="",
            crash_output=str(e),
            stack_hash="error",
            crash_type="setup_error",
            is_unique=False,
            reproducer_path="",
        ))
    finally:
        if harness_path and os.path.exists(harness_path):
            os.unlink(harness_path)

    return crashes


def _run_hypothesis(repo_dir: str, target: str, harness_code: str, duration_s: int) -> list[CrashRecord]:
    """Run Hypothesis property-based testing as a fuzzing fallback."""
    crashes = []
    fd, test_path = tempfile.mkstemp(suffix="_hyp_test.py", dir="/tmp")
    try:
        with os.fdopen(fd, "w") as f:
            f.write(harness_code)

        proc = subprocess.Popen(
            ["python", "-m", "pytest", test_path, "-x", "--tb=short",
             f"--hypothesis-seed=0", "-q"],
            cwd=repo_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
            text=True,
        )
        try:
            stdout, stderr = proc.communicate(timeout=min(duration_s, 60))
        except subprocess.TimeoutExpired:
            proc.kill()
            stdout, stderr = proc.communicate()

        combined = (stdout or "") + (stderr or "")
        if "FAILED" in combined or "AssertionError" in combined or "Falsifying" in combined:
            stack_hash = hashlib.sha256(combined[:200].encode()).hexdigest()[:16]
            crashes.append(CrashRecord(
                crash_id=stack_hash,
                target=target,
                crash_input="see Hypothesis output",
                crash_output=combined[:2000],
                stack_hash=stack_hash,
                crash_type="assertion",
                is_unique=True,
                reproducer_path=test_path,
            ))
    except Exception as e:
        pass
    finally:
        try:
            os.unlink(test_path)
        except OSError:
            pass

    return crashes


def run_fuzzing_campaign(
    repo_dir: str,
    target: str,
    language: str = "python",
    duration_s: int = 60,
) -> dict:
    """
    Main entry point. Generate harness + run fuzzer + return triage results.
    """
    duration_s = min(duration_s, MAX_FUZZ_DURATION)
    print(f"[FUZZ] Starting campaign: {target} ({language}, {duration_s}s)")

    source_context = _get_source_context(repo_dir, target, language)
    harness_code = _llm_generate_harness(target, language, repo_dir, source_context)

    start = time.time()
    if language == "python" and "atheris" in harness_code:
        # FIX (Build Error): atheris may be unavailable; fall back to Hypothesis.
        try:
            import importlib.util
            _atheris_available = importlib.util.find_spec("atheris") is not None
        except Exception:
            _atheris_available = False
        if _atheris_available:
            crashes = _run_python_atheris(harness_code, duration_s)
        else:
            # Rewrite the harness to use Hypothesis if atheris is not installed
            harness_code = harness_code.replace(
                "import atheris", "# atheris unavailable — using Hypothesis fallback"
            )
            crashes = _run_hypothesis(repo_dir, target, harness_code, duration_s)
    else:
        crashes = _run_hypothesis(repo_dir, target, harness_code, duration_s)

    elapsed = time.time() - start

    unique_hashes = set()
    unique_crashes = []
    for c in crashes:
        if c.stack_hash not in unique_hashes:
            unique_hashes.add(c.stack_hash)
            unique_crashes.append(c)

    result = FuzzResult(
        target=target,
        language=language,
        duration_s=int(elapsed),
        total_executions=len(crashes),
        unique_crashes=unique_crashes,
        coverage_percent=0.0,
        harness_code=harness_code,
    )

    print(f"[FUZZ] Done: {len(unique_crashes)} unique crash(es) in {elapsed:.1f}s")

    return {
        "target": result.target,
        "language": result.language,
        "duration_s": result.duration_s,
        "unique_crashes": len(result.unique_crashes),
        "harness_code": result.harness_code[:500],
        "crashes": [
            {
                "id": c.crash_id, "type": c.crash_type,
                "input_hex": c.crash_input[:100],
                "output_snippet": c.crash_output[:500],
                "stack_hash": c.stack_hash,
            }
            for c in result.unique_crashes
        ],
        "has_crashes": len(result.unique_crashes) > 0,
    }