File size: 11,166 Bytes
c8e832f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
"""Deterministic static-review helpers for arbitrary Python code.



Unlike the benchmark grader, this module does not compare against hidden rubric

items. Instead, it performs direct AST-based review on arbitrary snippets so it

can be used for manual testing, examples, and future dataset generation.

"""

from __future__ import annotations

import ast
from typing import List, Optional

try:
    from models import DirectReviewResponse, ReviewFinding
except ModuleNotFoundError:  # pragma: no cover
    from ..models import DirectReviewResponse, ReviewFinding


class _StaticAnalyzer(ast.NodeVisitor):
    """AST visitor that emits structured review findings.



    The visitor intentionally focuses on a small set of high-signal patterns so

    the direct-review endpoint stays predictable and easy to understand.

    """

    def __init__(self) -> None:
        self.issues: List[ReviewFinding] = []

    def visit_FunctionDef(self, node: ast.FunctionDef) -> None:  # noqa: N802
        """Flag mutable default arguments in function definitions."""

        for default in list(node.args.defaults):
            if isinstance(default, (ast.List, ast.Dict, ast.Set)):
                self.issues.append(
                    ReviewFinding(
                        title="Mutable default argument",
                        line=getattr(default, "lineno", node.lineno),
                        category="bug",
                        severity="warning",
                        rationale=(
                            "Mutable defaults persist across calls and can leak state "
                            "between unrelated requests."
                        ),
                        recommendation="Use None as the default and create the object inside the function.",
                        rule_id="mutable-default-list",
                    )
                )
        self.generic_visit(node)

    def visit_Call(self, node: ast.Call) -> None:  # noqa: N802
        """Inspect function calls for obviously unsafe or noisy patterns."""

        func_name = self._call_name(node)
        if func_name in {"eval", "exec"}:
            self.issues.append(
                ReviewFinding(
                    title=f"Avoid {func_name} on untrusted input",
                    line=node.lineno,
                    category="security",
                    severity="critical",
                    rationale=(
                        f"{func_name} executes arbitrary code and is unsafe on "
                        "user-controlled input."
                    ),
                    recommendation="Use a safe parser or a whitelist-based evaluator.",
                    rule_id="avoid-eval" if func_name == "eval" else "avoid-exec",
                )
            )
        if func_name.endswith("check_output") or func_name.endswith("run"):
            for keyword in node.keywords:
                # `shell=True` is only a problem when the command comes from a
                # shell-parsed string, but this heuristic is high value for
                # review and intentionally conservative.
                if keyword.arg == "shell" and isinstance(keyword.value, ast.Constant) and keyword.value.value is True:
                    self.issues.append(
                        ReviewFinding(
                            title="shell=True with dynamic input",
                            line=node.lineno,
                            category="security",
                            severity="critical",
                            rationale=(
                                "shell=True executes through the shell and can allow "
                                "command injection when the command string is interpolated."
                            ),
                            recommendation="Pass a list of arguments and keep shell=False.",
                            rule_id="shell-true-command-injection",
                        )
                    )
        if func_name == "print":
            self.issues.append(
                ReviewFinding(
                    title="Print statement in application logic",
                    line=node.lineno,
                    category="style",
                    severity="info",
                    rationale="Production services should prefer structured logging over print statements.",
                    recommendation="Use the logging module or return the value to the caller.",
                    rule_id="print-statement",
                )
            )
        self.generic_visit(node)

    def visit_ExceptHandler(self, node: ast.ExceptHandler) -> None:  # noqa: N802
        """Flag bare exception handlers that hide failures."""

        if node.type is None:
            self.issues.append(
                ReviewFinding(
                    title="Bare except",
                    line=node.lineno,
                    category="maintainability",
                    severity="warning",
                    rationale="Bare except catches KeyboardInterrupt and other system-level exceptions.",
                    recommendation="Catch a specific exception and record the failure.",
                    rule_id="bare-except",
                )
            )
        self.generic_visit(node)

    def visit_For(self, node: ast.For) -> None:  # noqa: N802
        """Look for list-membership checks nested in loops."""

        for child in ast.walk(node):
            if isinstance(child, ast.Compare) and any(
                isinstance(operator, (ast.In, ast.NotIn)) for operator in child.ops
            ):
                if isinstance(child.comparators[0], ast.Name):
                    self.issues.append(
                        ReviewFinding(
                            title="Potential quadratic membership check inside loop",
                            line=child.lineno,
                            category="performance",
                            severity="warning",
                            rationale=(
                                "Repeated membership checks against a list inside a loop "
                                "can degrade to quadratic runtime."
                            ),
                            recommendation="Use a set or dict for O(1) membership checks.",
                            rule_id="quadratic-membership-check",
                        )
                    )
                    break
        self.generic_visit(node)

    @staticmethod
    def _call_name(node: ast.Call) -> str:
        """Extract a dotted function name such as `subprocess.run`."""

        func = node.func
        if isinstance(func, ast.Name):
            return func.id
        if isinstance(func, ast.Attribute):
            prefix = _StaticAnalyzer._attribute_prefix(func.value)
            return f"{prefix}.{func.attr}" if prefix else func.attr
        return ""

    @staticmethod
    def _attribute_prefix(node: ast.AST) -> str:
        """Reconstruct the left-hand side of an attribute chain."""

        if isinstance(node, ast.Name):
            return node.id
        if isinstance(node, ast.Attribute):
            prefix = _StaticAnalyzer._attribute_prefix(node.value)
            return f"{prefix}.{node.attr}" if prefix else node.attr
        return ""


def analyze_python_code(code: str) -> List[ReviewFinding]:
    """Analyze arbitrary Python code and return structured findings."""

    if not code.strip():
        return [
            ReviewFinding(
                title="No code provided",
                category="bug",
                severity="warning",
                rationale="The reviewer cannot inspect an empty submission.",
                recommendation="Provide Python source code.",
                rule_id="empty-input",
            )
        ]

    # Syntax errors are turned into findings rather than exceptions so API
    # consumers always get a valid response shape.
    try:
        tree = ast.parse(code)
    except SyntaxError as exc:
        return [
            ReviewFinding(
                title="Syntax error",
                line=exc.lineno,
                category="bug",
                severity="critical",
                rationale=exc.msg,
                recommendation="Fix the syntax error before running static review.",
                rule_id="syntax-error",
            )
        ]

    analyzer = _StaticAnalyzer()
    analyzer.visit(tree)
    return _deduplicate(analyzer.issues)


def build_direct_review_response(

    code: str, context: Optional[str] = None

) -> DirectReviewResponse:
    """Build the public direct-review response for the `/review` route."""

    issues = analyze_python_code(code)
    weighted_penalty = 0.0
    # The direct-review score is intentionally simple: more severe issues lower
    # the score more aggressively.
    for issue in issues:
        if issue.severity == "critical":
            weighted_penalty += 0.3
        elif issue.severity == "warning":
            weighted_penalty += 0.15
        else:
            weighted_penalty += 0.05

    score = max(0.0, min(1.0, 1.0 - weighted_penalty))
    summary = _build_summary(issues, context)
    improved_code = _suggest_improved_code(code, issues)
    return DirectReviewResponse(
        issues=issues,
        summary=summary,
        score=score,
        improved_code=improved_code,
    )


def _build_summary(issues: List[ReviewFinding], context: Optional[str]) -> str:
    """Create a concise human-readable summary for the direct-review response."""

    if not issues:
        base = "No obvious issues were detected by the deterministic reviewer."
    else:
        critical = sum(1 for issue in issues if issue.severity == "critical")
        warnings = sum(1 for issue in issues if issue.severity == "warning")
        infos = sum(1 for issue in issues if issue.severity == "info")
        base = (
            f"Detected {len(issues)} issue(s): {critical} critical, "
            f"{warnings} warning, {infos} info."
        )
    if context:
        return f"{base} Context: {context}"
    return base


def _suggest_improved_code(code: str, issues: List[ReviewFinding]) -> Optional[str]:
    """Append high-level fix directions to the submitted code."""

    if not issues:
        return None
    suggestions = [issue.recommendation for issue in issues if issue.recommendation]
    comment = " | ".join(dict.fromkeys(suggestions))
    return f"{code.rstrip()}\n\n# Suggested review directions: {comment}"


def _deduplicate(findings: List[ReviewFinding]) -> List[ReviewFinding]:
    """Drop duplicate findings that refer to the same rule and line."""

    seen = set()
    unique: List[ReviewFinding] = []
    for finding in findings:
        key = (finding.rule_id, finding.line, finding.category)
        if key in seen:
            continue
        seen.add(key)
        unique.append(finding)
    return unique