scrapeRL / backend /app /plugins /python_sandbox.py
NeerajCodz's picture
fix: resolve scraper functionality and plugin issues
54ec9cb
"""Sandboxed Python execution helpers for scrape plugins."""
from __future__ import annotations
import ast
import json
import os
import shutil
import subprocess
import sys
import tempfile
from dataclasses import dataclass
from pathlib import Path
from typing import Any
ALLOWED_IMPORTS = {
"json",
"math",
"statistics",
"datetime",
"re",
"numpy",
"pandas",
"bs4",
}
BLOCKED_CALLS = {
"open",
"exec",
"eval",
"compile",
"input",
"__import__",
"globals",
# Removed "locals" to allow local variable introspection in analysis
"vars",
"getattr",
"setattr",
"delattr",
"breakpoint",
}
BLOCKED_NAMES = {
"os",
"sys",
"subprocess",
"socket",
"pathlib",
"shutil",
}
BLOCKED_ATTRS = {
"system",
"popen",
"spawn",
"fork",
"remove",
"unlink",
"rmdir",
"rmtree",
"chmod",
"chown",
"putenv",
"environ",
"walk",
"listdir",
"mkdir",
"makedirs",
"rename",
"replace",
"symlink",
}
DEFAULT_ANALYSIS_CODE = """
rows = payload.get("dataset_rows") or []
result = {
"row_count": len(rows),
"columns": sorted(list(rows[0].keys())) if rows else [],
"summary": {},
"source_links": payload.get("source_links") or [],
}
if rows:
import pandas as pd
import numpy as np
df = pd.DataFrame(rows)
if "gold_price_usd" in df.columns:
series = pd.to_numeric(df["gold_price_usd"], errors="coerce").dropna()
if len(series) > 0:
result["summary"] = {
"min_price": float(series.min()),
"max_price": float(series.max()),
"mean_price": float(series.mean()),
"std_price": float(series.std(ddof=0)),
"median_price": float(np.median(series.to_numpy())),
}
html_samples = payload.get("html_samples") or {}
if html_samples:
from bs4 import BeautifulSoup
html_link_counts = {}
for source, html in html_samples.items():
soup = BeautifulSoup(html or "", "html.parser")
html_link_counts[source] = len(soup.find_all("a"))
result["html_link_counts"] = html_link_counts
"""
class UnsafePythonCodeError(ValueError):
"""Raised when user-provided Python code violates sandbox constraints."""
@dataclass
class SandboxExecutionResult:
"""Execution result for sandboxed Python plugin runs."""
success: bool
output: dict[str, Any] | None = None
error: str | None = None
stdout: str = ""
stderr: str = ""
timeout: bool = False
def _validate_code(code: str) -> None:
"""Validate user code against sandbox safety constraints."""
try:
tree = ast.parse(code, mode="exec")
except SyntaxError as exc:
raise UnsafePythonCodeError(f"Invalid Python syntax: {exc}") from exc
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
root = alias.name.split(".")[0]
if root not in ALLOWED_IMPORTS:
raise UnsafePythonCodeError(f"Import not allowed: {alias.name}")
if isinstance(node, ast.ImportFrom):
if node.level and node.level > 0:
raise UnsafePythonCodeError("Relative imports are not allowed in sandbox code")
module = node.module or ""
root = module.split(".")[0]
if root not in ALLOWED_IMPORTS:
raise UnsafePythonCodeError(f"Import not allowed: {module}")
if isinstance(node, ast.Name) and node.id in BLOCKED_NAMES:
raise UnsafePythonCodeError(f"Blocked name used: {node.id}")
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name) and node.func.id in BLOCKED_CALLS:
raise UnsafePythonCodeError(f"Blocked call used: {node.func.id}")
if isinstance(node.func, ast.Attribute):
if node.func.attr.startswith("__") or node.func.attr in BLOCKED_ATTRS:
raise UnsafePythonCodeError(f"Blocked attribute call: {node.func.attr}")
if isinstance(node, ast.Attribute) and node.attr.startswith("__"):
raise UnsafePythonCodeError("Dunder attribute access is not allowed")
def _build_runner_script(user_code: str) -> str:
"""Wrap user code in a deterministic runner script."""
return f"""import json
from pathlib import Path
try:
import numpy as np # noqa: F401
except Exception:
np = None # noqa: N816
try:
import pandas as pd # noqa: F401
except Exception:
pd = None
try:
from bs4 import BeautifulSoup # noqa: F401
except Exception:
BeautifulSoup = None
payload = json.loads(Path("input.json").read_text(encoding="utf-8"))
result = None
{user_code}
if result is None:
raise ValueError("Sandbox code must assign a JSON-serializable value to `result`.")
print(json.dumps(result, default=str))
"""
def execute_python_sandbox(
code: str,
payload: dict[str, Any],
*,
session_id: str,
timeout_seconds: int = 25,
) -> SandboxExecutionResult:
"""Execute validated Python code in an isolated temporary workspace."""
_validate_code(code)
workspace = Path(tempfile.mkdtemp(prefix=f"scraperl-sandbox-{session_id}-"))
try:
input_path = workspace / "input.json"
script_path = workspace / "runner.py"
input_path.write_text(json.dumps(payload, default=str), encoding="utf-8")
script_path.write_text(_build_runner_script(code), encoding="utf-8")
env = os.environ.copy()
env["PYTHONNOUSERSITE"] = "1"
env.pop("PYTHONPATH", None)
process = subprocess.run(
[sys.executable, "-I", str(script_path)],
cwd=workspace,
capture_output=True,
text=True,
timeout=timeout_seconds,
env=env,
check=False,
)
stdout = process.stdout.strip()
stderr = process.stderr.strip()
if process.returncode != 0:
return SandboxExecutionResult(
success=False,
error=f"Sandbox execution failed (exit {process.returncode})",
stdout=stdout,
stderr=stderr,
)
if not stdout:
return SandboxExecutionResult(
success=False,
error="Sandbox execution returned empty stdout",
stdout=stdout,
stderr=stderr,
)
try:
output = json.loads(stdout.splitlines()[-1])
except json.JSONDecodeError as exc:
return SandboxExecutionResult(
success=False,
error=f"Sandbox output was not valid JSON: {exc}",
stdout=stdout,
stderr=stderr,
)
if not isinstance(output, dict):
output = {"result": output}
return SandboxExecutionResult(
success=True,
output=output,
stdout=stdout,
stderr=stderr,
)
except subprocess.TimeoutExpired as exc:
return SandboxExecutionResult(
success=False,
error="Sandbox execution timed out",
stdout=(exc.stdout or "").strip(),
stderr=(exc.stderr or "").strip(),
timeout=True,
)
finally:
shutil.rmtree(workspace, ignore_errors=True)