Spaces:
Running
Running
File size: 3,786 Bytes
7b4f5dd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 | """
Security Agent — detects OWASP vulnerabilities, hardcoded secrets,
unsafe eval, SQL injection, and more using pattern matching + LLM.
"""
import asyncio
import re
from typing import AsyncGenerator
# CWE mapping for common patterns
CWE_MAP = {
"sql_injection": "CWE-89",
"hardcoded_secret": "CWE-798",
"eval_usage": "CWE-95",
"pickle_loads": "CWE-502",
"md5_password": "CWE-328",
"path_traversal": "CWE-22",
"missing_csrf": "CWE-352",
}
PATTERNS = [
{
"id": "SEC-001",
"name": "SQL Injection",
"pattern": r'(query|sql)\s*=\s*[f`"\'].*\{.*\}|SELECT.*\+.*req|execute\(.*\+',
"severity": "critical",
"cwe": "CWE-89",
"suggestion": "Use parameterized queries or an ORM to prevent SQL injection.",
"fixAvailable": True,
},
{
"id": "SEC-002",
"name": "Hardcoded Secret",
"pattern": r'(api_key|secret|password|token|API_KEY)\s*=\s*["\'][a-zA-Z0-9_\-]{12,}["\']',
"severity": "high",
"cwe": "CWE-798",
"suggestion": "Move secrets to environment variables or a secrets manager.",
"fixAvailable": True,
},
{
"id": "SEC-003",
"name": "Unsafe eval()",
"pattern": r'\beval\s*\(',
"severity": "high",
"cwe": "CWE-95",
"suggestion": "Replace eval() with a safe expression parser.",
"fixAvailable": True,
},
{
"id": "SEC-004",
"name": "Insecure Deserialization (pickle)",
"pattern": r'pickle\.loads?\s*\(',
"severity": "critical",
"cwe": "CWE-502",
"suggestion": "Use safetensors or JSON instead of pickle for untrusted data.",
"fixAvailable": True,
},
{
"id": "SEC-005",
"name": "Weak Password Hashing (MD5)",
"pattern": r"hashlib\.md5|createHash\('md5'\)|md5\(",
"severity": "high",
"cwe": "CWE-328",
"suggestion": "Use bcrypt, scrypt, or Argon2 for password hashing.",
"fixAvailable": True,
},
]
class SecurityAgent:
async def analyze(self, code: str, language: str) -> AsyncGenerator[tuple[str, dict], None]:
lines = code.split("\n")
total = len(lines)
found = 0
for i, pattern_def in enumerate(PATTERNS):
await asyncio.sleep(0.5)
# Progress update
pct = int((i / len(PATTERNS)) * 100)
yield "progress", {
"agent": "security",
"percent": pct,
"filesScanned": i + 1,
"message": f"Scanning for {pattern_def['name']}...",
}
# Pattern scan
for line_num, line in enumerate(lines, 1):
if re.search(pattern_def["pattern"], line, re.IGNORECASE):
found += 1
yield "finding", {
"agent": "security",
"id": pattern_def["id"],
"title": pattern_def["name"],
"severity": pattern_def["severity"],
"cwe": pattern_def["cwe"],
"description": f"Detected {pattern_def['name']} pattern at line {line_num}.",
"file": "uploaded_code.py",
"line": line_num,
"code": line.strip(),
"suggestion": pattern_def["suggestion"],
"fixAvailable": pattern_def["fixAvailable"],
}
break # One finding per pattern
yield "progress", {
"agent": "security",
"percent": 100,
"filesScanned": len(PATTERNS),
"message": f"Security scan complete — {found} issues found",
}
|