File size: 3,786 Bytes
7b4f5dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
"""
Security Agent — detects OWASP vulnerabilities, hardcoded secrets,
unsafe eval, SQL injection, and more using pattern matching + LLM.
"""

import asyncio
import re
from typing import AsyncGenerator

# CWE mapping for common patterns
CWE_MAP = {
    "sql_injection": "CWE-89",
    "hardcoded_secret": "CWE-798",
    "eval_usage": "CWE-95",
    "pickle_loads": "CWE-502",
    "md5_password": "CWE-328",
    "path_traversal": "CWE-22",
    "missing_csrf": "CWE-352",
}

PATTERNS = [
    {
        "id": "SEC-001",
        "name": "SQL Injection",
        "pattern": r'(query|sql)\s*=\s*[f`"\'].*\{.*\}|SELECT.*\+.*req|execute\(.*\+',
        "severity": "critical",
        "cwe": "CWE-89",
        "suggestion": "Use parameterized queries or an ORM to prevent SQL injection.",
        "fixAvailable": True,
    },
    {
        "id": "SEC-002",
        "name": "Hardcoded Secret",
        "pattern": r'(api_key|secret|password|token|API_KEY)\s*=\s*["\'][a-zA-Z0-9_\-]{12,}["\']',
        "severity": "high",
        "cwe": "CWE-798",
        "suggestion": "Move secrets to environment variables or a secrets manager.",
        "fixAvailable": True,
    },
    {
        "id": "SEC-003",
        "name": "Unsafe eval()",
        "pattern": r'\beval\s*\(',
        "severity": "high",
        "cwe": "CWE-95",
        "suggestion": "Replace eval() with a safe expression parser.",
        "fixAvailable": True,
    },
    {
        "id": "SEC-004",
        "name": "Insecure Deserialization (pickle)",
        "pattern": r'pickle\.loads?\s*\(',
        "severity": "critical",
        "cwe": "CWE-502",
        "suggestion": "Use safetensors or JSON instead of pickle for untrusted data.",
        "fixAvailable": True,
    },
    {
        "id": "SEC-005",
        "name": "Weak Password Hashing (MD5)",
        "pattern": r"hashlib\.md5|createHash\('md5'\)|md5\(",
        "severity": "high",
        "cwe": "CWE-328",
        "suggestion": "Use bcrypt, scrypt, or Argon2 for password hashing.",
        "fixAvailable": True,
    },
]


class SecurityAgent:
    async def analyze(self, code: str, language: str) -> AsyncGenerator[tuple[str, dict], None]:
        lines = code.split("\n")
        total = len(lines)
        found = 0

        for i, pattern_def in enumerate(PATTERNS):
            await asyncio.sleep(0.5)

            # Progress update
            pct = int((i / len(PATTERNS)) * 100)
            yield "progress", {
                "agent": "security",
                "percent": pct,
                "filesScanned": i + 1,
                "message": f"Scanning for {pattern_def['name']}...",
            }

            # Pattern scan
            for line_num, line in enumerate(lines, 1):
                if re.search(pattern_def["pattern"], line, re.IGNORECASE):
                    found += 1
                    yield "finding", {
                        "agent": "security",
                        "id": pattern_def["id"],
                        "title": pattern_def["name"],
                        "severity": pattern_def["severity"],
                        "cwe": pattern_def["cwe"],
                        "description": f"Detected {pattern_def['name']} pattern at line {line_num}.",
                        "file": "uploaded_code.py",
                        "line": line_num,
                        "code": line.strip(),
                        "suggestion": pattern_def["suggestion"],
                        "fixAvailable": pattern_def["fixAvailable"],
                    }
                    break  # One finding per pattern

        yield "progress", {
            "agent": "security",
            "percent": 100,
            "filesScanned": len(PATTERNS),
            "message": f"Security scan complete — {found} issues found",
        }