File size: 3,288 Bytes
95cbc5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from __future__ import annotations

import re
from typing import Any, Optional

from .models import CommitGuardAction


def _first(tag: str, text: str) -> Optional[str]:
    # Robust case-insensitive search with optional whitespace inside tags
    pattern = rf"<[ \t]*{re.escape(tag)}[ \t]*>(.*?)</[ \t]*{re.escape(tag)}[ \t]*>"
    m = re.search(pattern, text, flags=re.DOTALL | re.IGNORECASE)
    if not m:
        return None
    return m.group(1).strip()


def _parse_bool(v: Optional[str]) -> Optional[bool]:
    if v is None:
        return None
    s = v.strip().lower()
    if s in {"true", "1", "yes"}:
        return True
    if s in {"false", "0", "no"}:
        return False
    return None


def parse_action(raw_action: str) -> CommitGuardAction:
    """
    Parse XML-tag free-text action. Never raises.

    Expected shape:
    <action><action_type>...</action_type><fields>...</fields></action>
    """
    try:
        action_type = (_first("action_type", raw_action) or "").strip().lower()
        if action_type not in {"request_context", "analyze", "verdict"}:
            return CommitGuardAction(
                action_type="analyze",
                raw_action=raw_action,
                parse_error="missing_or_invalid_action_type",
            )

        if action_type == "request_context":
            file_path = _first("file_path", raw_action)
            return CommitGuardAction(
                action_type="request_context",
                file_path=file_path,
                raw_action=raw_action,
            )

        if action_type == "analyze":
            reasoning = _first("reasoning", raw_action)
            return CommitGuardAction(action_type="analyze", reasoning=reasoning, raw_action=raw_action)

        is_vulnerable = _parse_bool(_first("is_vulnerable", raw_action))
        vuln_type = _first("vuln_type", raw_action)
        exploit_sketch = _first("exploit_sketch", raw_action)
        return CommitGuardAction(
            action_type="verdict",
            is_vulnerable=is_vulnerable,
            vuln_type=vuln_type,
            exploit_sketch=exploit_sketch,
            raw_action=raw_action,
        )
    except Exception as e:  # defensive: model output must never crash server
        return CommitGuardAction(
            action_type="analyze",
            raw_action=raw_action,
            parse_error=f"parser_exception:{type(e).__name__}",
        )


def action_from_json(payload: dict[str, Any]) -> CommitGuardAction:
    """
    Convenience for curl/json clients: accept either {action: "<xml>"} or
    direct fields matching CommitGuardAction.
    """
    if isinstance(payload.get("action"), str):
        return parse_action(payload["action"])

    action_type = (payload.get("action_type") or "analyze").strip().lower()
    if action_type not in {"request_context", "analyze", "verdict"}:
        action_type = "analyze"

    return CommitGuardAction(
        action_type=action_type,  # type: ignore[arg-type]
        file_path=payload.get("file_path"),
        reasoning=payload.get("reasoning"),
        is_vulnerable=payload.get("is_vulnerable"),
        vuln_type=payload.get("vuln_type"),
        exploit_sketch=payload.get("exploit_sketch"),
        raw_action=None,
        parse_error=None,
    )