Spaces:
Running
Running
| """ | |
| Security Agent — detects OWASP vulnerabilities, hardcoded secrets, | |
| unsafe eval, SQL injection, and more using pattern matching + LLM. | |
| """ | |
| import asyncio | |
| import re | |
| from typing import AsyncGenerator | |
| # CWE mapping for common patterns | |
| CWE_MAP = { | |
| "sql_injection": "CWE-89", | |
| "hardcoded_secret": "CWE-798", | |
| "eval_usage": "CWE-95", | |
| "pickle_loads": "CWE-502", | |
| "md5_password": "CWE-328", | |
| "path_traversal": "CWE-22", | |
| "missing_csrf": "CWE-352", | |
| } | |
| PATTERNS = [ | |
| { | |
| "id": "SEC-001", | |
| "name": "SQL Injection", | |
| "pattern": r'(query|sql)\s*=\s*[f`"\'].*\{.*\}|SELECT.*\+.*req|execute\(.*\+', | |
| "severity": "critical", | |
| "cwe": "CWE-89", | |
| "suggestion": "Use parameterized queries or an ORM to prevent SQL injection.", | |
| "fixAvailable": True, | |
| }, | |
| { | |
| "id": "SEC-002", | |
| "name": "Hardcoded Secret", | |
| "pattern": r'(api_key|secret|password|token|API_KEY)\s*=\s*["\'][a-zA-Z0-9_\-]{12,}["\']', | |
| "severity": "high", | |
| "cwe": "CWE-798", | |
| "suggestion": "Move secrets to environment variables or a secrets manager.", | |
| "fixAvailable": True, | |
| }, | |
| { | |
| "id": "SEC-003", | |
| "name": "Unsafe eval()", | |
| "pattern": r'\beval\s*\(', | |
| "severity": "high", | |
| "cwe": "CWE-95", | |
| "suggestion": "Replace eval() with a safe expression parser.", | |
| "fixAvailable": True, | |
| }, | |
| { | |
| "id": "SEC-004", | |
| "name": "Insecure Deserialization (pickle)", | |
| "pattern": r'pickle\.loads?\s*\(', | |
| "severity": "critical", | |
| "cwe": "CWE-502", | |
| "suggestion": "Use safetensors or JSON instead of pickle for untrusted data.", | |
| "fixAvailable": True, | |
| }, | |
| { | |
| "id": "SEC-005", | |
| "name": "Weak Password Hashing (MD5)", | |
| "pattern": r"hashlib\.md5|createHash\('md5'\)|md5\(", | |
| "severity": "high", | |
| "cwe": "CWE-328", | |
| "suggestion": "Use bcrypt, scrypt, or Argon2 for password hashing.", | |
| "fixAvailable": True, | |
| }, | |
| ] | |
| class SecurityAgent: | |
| async def analyze(self, code: str, language: str) -> AsyncGenerator[tuple[str, dict], None]: | |
| lines = code.split("\n") | |
| total = len(lines) | |
| found = 0 | |
| for i, pattern_def in enumerate(PATTERNS): | |
| await asyncio.sleep(0.5) | |
| # Progress update | |
| pct = int((i / len(PATTERNS)) * 100) | |
| yield "progress", { | |
| "agent": "security", | |
| "percent": pct, | |
| "filesScanned": i + 1, | |
| "message": f"Scanning for {pattern_def['name']}...", | |
| } | |
| # Pattern scan | |
| for line_num, line in enumerate(lines, 1): | |
| if re.search(pattern_def["pattern"], line, re.IGNORECASE): | |
| found += 1 | |
| yield "finding", { | |
| "agent": "security", | |
| "id": pattern_def["id"], | |
| "title": pattern_def["name"], | |
| "severity": pattern_def["severity"], | |
| "cwe": pattern_def["cwe"], | |
| "description": f"Detected {pattern_def['name']} pattern at line {line_num}.", | |
| "file": "uploaded_code.py", | |
| "line": line_num, | |
| "code": line.strip(), | |
| "suggestion": pattern_def["suggestion"], | |
| "fixAvailable": pattern_def["fixAvailable"], | |
| } | |
| break # One finding per pattern | |
| yield "progress", { | |
| "agent": "security", | |
| "percent": 100, | |
| "filesScanned": len(PATTERNS), | |
| "message": f"Security scan complete — {found} issues found", | |
| } | |