YashashviAlva's picture
Initial commit for HF Spaces deploy
7b4f5dd
"""
CodeSentry Backend β€” FastAPI Application
AI Security Copilot for AI-Generated Code
Endpoints:
POST /api/scan β€” Initiate a scan, returns scanId
GET /api/scan/stream/{scanId} β€” SSE stream of agent events
GET /api/health β€” Health check
"""
import asyncio
import json
import uuid
from typing import AsyncGenerator
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from agents.orchestrator import run_scan_pipeline
app = FastAPI(
title="CodeSentry API",
description="AI Security Copilot β€” Backend API",
version="1.0.0",
)
# CORS for Vite dev server
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://localhost:5174", "http://localhost:3000", "*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
return {
"status": "online",
"name": "CodeSentry AI Security API",
"version": "1.0.0",
"endpoints": {
"health": "/api/health",
"docs": "/docs",
"scan": "/api/scan"
}
}
# In-memory scan registry
scans: dict[str, dict] = {}
class ScanRequest(BaseModel):
type: str # "github" | "code"
url: str | None = None
code: str | None = None
language: str | None = "python"
@app.get("/api/health")
async def health():
return {"status": "ok", "service": "codesentry-api"}
@app.post("/api/scan")
async def create_scan(request: ScanRequest):
scan_id = f"cs-{uuid.uuid4().hex[:8]}"
scans[scan_id] = {
"id": scan_id,
"request": request.dict(),
"status": "pending",
"events": [],
}
return {"scanId": scan_id, "status": "pending"}
@app.get("/api/scan/stream/{scan_id}")
async def stream_scan(scan_id: str):
if scan_id not in scans:
async def error_stream():
yield f"event: error\ndata: {json.dumps({'message': 'Scan not found'})}\n\n"
return StreamingResponse(error_stream(), media_type="text/event-stream")
scan = scans[scan_id]
request = ScanRequest(**scan["request"])
async def event_stream() -> AsyncGenerator[str, None]:
try:
async for event_type, event_data in run_scan_pipeline(request):
payload = json.dumps(event_data)
yield f"event: {event_type}\ndata: {payload}\n\n"
await asyncio.sleep(0)
except Exception as e:
error_payload = json.dumps({"message": str(e)})
yield f"event: error\ndata: {error_payload}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
},
)