from __future__ import annotations import re from typing import Any, Optional from .models import CommitGuardAction def _first(tag: str, text: str) -> Optional[str]: # Robust case-insensitive search with optional whitespace inside tags pattern = rf"<[ \t]*{re.escape(tag)}[ \t]*>(.*?)" m = re.search(pattern, text, flags=re.DOTALL | re.IGNORECASE) if not m: return None return m.group(1).strip() def _parse_bool(v: Optional[str]) -> Optional[bool]: if v is None: return None s = v.strip().lower() if s in {"true", "1", "yes"}: return True if s in {"false", "0", "no"}: return False return None def parse_action(raw_action: str) -> CommitGuardAction: """ Parse XML-tag free-text action. Never raises. Expected shape: ...... """ try: action_type = (_first("action_type", raw_action) or "").strip().lower() if action_type not in {"request_context", "analyze", "verdict"}: return CommitGuardAction( action_type="analyze", raw_action=raw_action, parse_error="missing_or_invalid_action_type", ) if action_type == "request_context": file_path = _first("file_path", raw_action) return CommitGuardAction( action_type="request_context", file_path=file_path, raw_action=raw_action, ) if action_type == "analyze": reasoning = _first("reasoning", raw_action) return CommitGuardAction(action_type="analyze", reasoning=reasoning, raw_action=raw_action) is_vulnerable = _parse_bool(_first("is_vulnerable", raw_action)) vuln_type = _first("vuln_type", raw_action) exploit_sketch = _first("exploit_sketch", raw_action) return CommitGuardAction( action_type="verdict", is_vulnerable=is_vulnerable, vuln_type=vuln_type, exploit_sketch=exploit_sketch, raw_action=raw_action, ) except Exception as e: # defensive: model output must never crash server return CommitGuardAction( action_type="analyze", raw_action=raw_action, parse_error=f"parser_exception:{type(e).__name__}", ) def action_from_json(payload: dict[str, Any]) -> CommitGuardAction: """ Convenience for curl/json clients: accept either {action: ""} or direct fields matching CommitGuardAction. """ if isinstance(payload.get("action"), str): return parse_action(payload["action"]) action_type = (payload.get("action_type") or "analyze").strip().lower() if action_type not in {"request_context", "analyze", "verdict"}: action_type = "analyze" return CommitGuardAction( action_type=action_type, # type: ignore[arg-type] file_path=payload.get("file_path"), reasoning=payload.get("reasoning"), is_vulnerable=payload.get("is_vulnerable"), vuln_type=payload.get("vuln_type"), exploit_sketch=payload.get("exploit_sketch"), raw_action=None, parse_error=None, )