Spaces:
Build error
Build error
File size: 11,166 Bytes
c8e832f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 | """Deterministic static-review helpers for arbitrary Python code.
Unlike the benchmark grader, this module does not compare against hidden rubric
items. Instead, it performs direct AST-based review on arbitrary snippets so it
can be used for manual testing, examples, and future dataset generation.
"""
from __future__ import annotations
import ast
from typing import List, Optional
try:
from models import DirectReviewResponse, ReviewFinding
except ModuleNotFoundError: # pragma: no cover
from ..models import DirectReviewResponse, ReviewFinding
class _StaticAnalyzer(ast.NodeVisitor):
"""AST visitor that emits structured review findings.
The visitor intentionally focuses on a small set of high-signal patterns so
the direct-review endpoint stays predictable and easy to understand.
"""
def __init__(self) -> None:
self.issues: List[ReviewFinding] = []
def visit_FunctionDef(self, node: ast.FunctionDef) -> None: # noqa: N802
"""Flag mutable default arguments in function definitions."""
for default in list(node.args.defaults):
if isinstance(default, (ast.List, ast.Dict, ast.Set)):
self.issues.append(
ReviewFinding(
title="Mutable default argument",
line=getattr(default, "lineno", node.lineno),
category="bug",
severity="warning",
rationale=(
"Mutable defaults persist across calls and can leak state "
"between unrelated requests."
),
recommendation="Use None as the default and create the object inside the function.",
rule_id="mutable-default-list",
)
)
self.generic_visit(node)
def visit_Call(self, node: ast.Call) -> None: # noqa: N802
"""Inspect function calls for obviously unsafe or noisy patterns."""
func_name = self._call_name(node)
if func_name in {"eval", "exec"}:
self.issues.append(
ReviewFinding(
title=f"Avoid {func_name} on untrusted input",
line=node.lineno,
category="security",
severity="critical",
rationale=(
f"{func_name} executes arbitrary code and is unsafe on "
"user-controlled input."
),
recommendation="Use a safe parser or a whitelist-based evaluator.",
rule_id="avoid-eval" if func_name == "eval" else "avoid-exec",
)
)
if func_name.endswith("check_output") or func_name.endswith("run"):
for keyword in node.keywords:
# `shell=True` is only a problem when the command comes from a
# shell-parsed string, but this heuristic is high value for
# review and intentionally conservative.
if keyword.arg == "shell" and isinstance(keyword.value, ast.Constant) and keyword.value.value is True:
self.issues.append(
ReviewFinding(
title="shell=True with dynamic input",
line=node.lineno,
category="security",
severity="critical",
rationale=(
"shell=True executes through the shell and can allow "
"command injection when the command string is interpolated."
),
recommendation="Pass a list of arguments and keep shell=False.",
rule_id="shell-true-command-injection",
)
)
if func_name == "print":
self.issues.append(
ReviewFinding(
title="Print statement in application logic",
line=node.lineno,
category="style",
severity="info",
rationale="Production services should prefer structured logging over print statements.",
recommendation="Use the logging module or return the value to the caller.",
rule_id="print-statement",
)
)
self.generic_visit(node)
def visit_ExceptHandler(self, node: ast.ExceptHandler) -> None: # noqa: N802
"""Flag bare exception handlers that hide failures."""
if node.type is None:
self.issues.append(
ReviewFinding(
title="Bare except",
line=node.lineno,
category="maintainability",
severity="warning",
rationale="Bare except catches KeyboardInterrupt and other system-level exceptions.",
recommendation="Catch a specific exception and record the failure.",
rule_id="bare-except",
)
)
self.generic_visit(node)
def visit_For(self, node: ast.For) -> None: # noqa: N802
"""Look for list-membership checks nested in loops."""
for child in ast.walk(node):
if isinstance(child, ast.Compare) and any(
isinstance(operator, (ast.In, ast.NotIn)) for operator in child.ops
):
if isinstance(child.comparators[0], ast.Name):
self.issues.append(
ReviewFinding(
title="Potential quadratic membership check inside loop",
line=child.lineno,
category="performance",
severity="warning",
rationale=(
"Repeated membership checks against a list inside a loop "
"can degrade to quadratic runtime."
),
recommendation="Use a set or dict for O(1) membership checks.",
rule_id="quadratic-membership-check",
)
)
break
self.generic_visit(node)
@staticmethod
def _call_name(node: ast.Call) -> str:
"""Extract a dotted function name such as `subprocess.run`."""
func = node.func
if isinstance(func, ast.Name):
return func.id
if isinstance(func, ast.Attribute):
prefix = _StaticAnalyzer._attribute_prefix(func.value)
return f"{prefix}.{func.attr}" if prefix else func.attr
return ""
@staticmethod
def _attribute_prefix(node: ast.AST) -> str:
"""Reconstruct the left-hand side of an attribute chain."""
if isinstance(node, ast.Name):
return node.id
if isinstance(node, ast.Attribute):
prefix = _StaticAnalyzer._attribute_prefix(node.value)
return f"{prefix}.{node.attr}" if prefix else node.attr
return ""
def analyze_python_code(code: str) -> List[ReviewFinding]:
"""Analyze arbitrary Python code and return structured findings."""
if not code.strip():
return [
ReviewFinding(
title="No code provided",
category="bug",
severity="warning",
rationale="The reviewer cannot inspect an empty submission.",
recommendation="Provide Python source code.",
rule_id="empty-input",
)
]
# Syntax errors are turned into findings rather than exceptions so API
# consumers always get a valid response shape.
try:
tree = ast.parse(code)
except SyntaxError as exc:
return [
ReviewFinding(
title="Syntax error",
line=exc.lineno,
category="bug",
severity="critical",
rationale=exc.msg,
recommendation="Fix the syntax error before running static review.",
rule_id="syntax-error",
)
]
analyzer = _StaticAnalyzer()
analyzer.visit(tree)
return _deduplicate(analyzer.issues)
def build_direct_review_response(
code: str, context: Optional[str] = None
) -> DirectReviewResponse:
"""Build the public direct-review response for the `/review` route."""
issues = analyze_python_code(code)
weighted_penalty = 0.0
# The direct-review score is intentionally simple: more severe issues lower
# the score more aggressively.
for issue in issues:
if issue.severity == "critical":
weighted_penalty += 0.3
elif issue.severity == "warning":
weighted_penalty += 0.15
else:
weighted_penalty += 0.05
score = max(0.0, min(1.0, 1.0 - weighted_penalty))
summary = _build_summary(issues, context)
improved_code = _suggest_improved_code(code, issues)
return DirectReviewResponse(
issues=issues,
summary=summary,
score=score,
improved_code=improved_code,
)
def _build_summary(issues: List[ReviewFinding], context: Optional[str]) -> str:
"""Create a concise human-readable summary for the direct-review response."""
if not issues:
base = "No obvious issues were detected by the deterministic reviewer."
else:
critical = sum(1 for issue in issues if issue.severity == "critical")
warnings = sum(1 for issue in issues if issue.severity == "warning")
infos = sum(1 for issue in issues if issue.severity == "info")
base = (
f"Detected {len(issues)} issue(s): {critical} critical, "
f"{warnings} warning, {infos} info."
)
if context:
return f"{base} Context: {context}"
return base
def _suggest_improved_code(code: str, issues: List[ReviewFinding]) -> Optional[str]:
"""Append high-level fix directions to the submitted code."""
if not issues:
return None
suggestions = [issue.recommendation for issue in issues if issue.recommendation]
comment = " | ".join(dict.fromkeys(suggestions))
return f"{code.rstrip()}\n\n# Suggested review directions: {comment}"
def _deduplicate(findings: List[ReviewFinding]) -> List[ReviewFinding]:
"""Drop duplicate findings that refer to the same rule and line."""
seen = set()
unique: List[ReviewFinding] = []
for finding in findings:
key = (finding.rule_id, finding.line, finding.category)
if key in seen:
continue
seen.add(key)
unique.append(finding)
return unique
|