File size: 3,948 Bytes
b41928d
8705a00
 
 
 
b41928d
8705a00
 
 
 
 
 
 
 
 
b41928d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8705a00
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b41928d
 
 
 
 
8705a00
 
 
 
 
 
b41928d
8705a00
 
 
b41928d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
from typing import Optional
import random
import time
from collections import defaultdict

app = FastAPI(
    title="ARF Sandbox API",
    description="Mock endpoint – does NOT use the real Bayesian engine. Simulated responses only.",
    version="1.0.0",
    docs_url="/docs",
    redoc_url="/redoc",
)

# ---------- Rate Limiting Configuration ----------
RATE_LIMIT = 10          # max requests per time window
RATE_LIMIT_WINDOW = 60   # seconds

# In-memory store: {ip: [(timestamp,), ...]} or use a deque
request_log = defaultdict(list)

def rate_limit_middleware(ip: str):
    now = time.time()
    # Remove timestamps older than the window
    request_log[ip] = [ts for ts in request_log[ip] if now - ts < RATE_LIMIT_WINDOW]
    if len(request_log[ip]) >= RATE_LIMIT:
        raise HTTPException(
            status_code=429,
            detail=f"Rate limit exceeded. Max {RATE_LIMIT} requests per {RATE_LIMIT_WINDOW} seconds."
        )
    request_log[ip].append(now)

# ---------- Request/Response Models ----------
class Metrics(BaseModel):
    latency_ms: Optional[float] = None
    error_rate: Optional[float] = None
    throughput: Optional[float] = None
    cpu_usage: Optional[float] = None

class EvaluateRequest(BaseModel):
    service_name: str
    event_type: str          # e.g., "latency", "error_rate", "cpu_spike"
    severity: str            # "low", "medium", "high", "critical"
    metrics: Optional[Metrics] = None
    timestamp: Optional[float] = None

class EvaluateResponse(BaseModel):
    status: str
    recommendation: str      # "APPROVE", "DENY", "ESCALATE"
    risk_score: float
    confidence: float
    justification: str
    policy_violations: list

# ---------- Mock Logic ----------
def generate_mock_response(request: EvaluateRequest) -> EvaluateResponse:
    # Deterministic randomness based on service name and event type
    seed = hash((request.service_name, request.event_type, request.severity)) % 1000
    random.seed(seed)
    
    # Simulate risk score based on severity
    severity_map = {"low": 0.2, "medium": 0.4, "high": 0.7, "critical": 0.9}
    base_risk = severity_map.get(request.severity, 0.5)
    risk = min(0.99, max(0.01, base_risk + random.uniform(-0.1, 0.1)))
    
    # Decision logic (mock)
    if risk < 0.3:
        rec = "APPROVE"
    elif risk > 0.8:
        rec = "DENY"
    else:
        rec = "ESCALATE"
    
    confidence = 1.0 - (risk * 0.3) + random.uniform(-0.05, 0.05)
    confidence = min(0.99, max(0.5, confidence))
    
    justification = (
        f"Simulated evaluation for {request.service_name}: {request.event_type} severity={request.severity}. "
        f"Risk score {risk:.2f}{rec}. (Mock response, not real inference.)"
    )
    
    return EvaluateResponse(
        status="success",
        recommendation=rec,
        risk_score=round(risk, 4),
        confidence=round(confidence, 4),
        justification=justification,
        policy_violations=[]
    )

# ---------- Endpoints ----------
@app.get("/health", tags=["health"])
async def health():
    return {"status": "ok", "timestamp": time.time()}

@app.post("/v1/evaluate", response_model=EvaluateResponse, tags=["evaluation"])
async def evaluate(request: EvaluateRequest, req: Request):
    # Apply rate limit based on client IP
    client_ip = req.client.host if req.client else "unknown"
    rate_limit_middleware(client_ip)
    
    if not request.service_name or not request.event_type:
        raise HTTPException(status_code=400, detail="Missing service_name or event_type")
    return generate_mock_response(request)

@app.get("/", include_in_schema=False)
async def root():
    return {"message": "ARF Sandbox API. See /docs for interactive documentation. Rate limit: 10 requests per minute per IP."}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=7860)