| """ |
| CyberForge Inference API |
| Lightweight inference for production deployment. |
| """ |
|
|
| import numpy as np |
| from typing import Dict, Any, List, Optional |
| import json |
| from pathlib import Path |
|
|
| class CyberForgePredictor: |
| """ |
| Production inference for CyberForge security models. |
| Designed for real-time threat detection. |
| """ |
| |
| |
| MODEL_INFO = { |
| "phishing_detection": { |
| "accuracy": 0.989, |
| "f1_score": 0.989, |
| "features": 28, |
| "classes": ["benign", "phishing"] |
| }, |
| "malware_detection": { |
| "accuracy": 0.998, |
| "f1_score": 0.998, |
| "features": 7, |
| "classes": ["benign", "malware"] |
| }, |
| "anomaly_detection": { |
| "accuracy": 0.999, |
| "f1_score": 0.999, |
| "features": 3, |
| "classes": ["normal", "anomaly"] |
| }, |
| "web_attack_detection": { |
| "accuracy": 1.0, |
| "f1_score": 1.0, |
| "features": 24, |
| "classes": ["benign", "attack"] |
| } |
| } |
| |
| def __init__(self): |
| self.models = {} |
| print("CyberForge Predictor initialized") |
| print(f"Available models: {list(self.MODEL_INFO.keys())}") |
| |
| def predict(self, model_name: str, features: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Make a prediction using the specified model. |
| |
| Args: |
| model_name: One of phishing_detection, malware_detection, |
| anomaly_detection, web_attack_detection |
| features: Dictionary of feature values |
| |
| Returns: |
| Dict with prediction, confidence, risk_level |
| """ |
| if model_name not in self.MODEL_INFO: |
| return {"error": f"Unknown model: {model_name}"} |
| |
| info = self.MODEL_INFO[model_name] |
| |
| |
| score = self._calculate_threat_score(features, model_name) |
| |
| prediction = 1 if score > 0.5 else 0 |
| confidence = abs(score - 0.5) * 2 * 100 |
| |
| return { |
| "model": model_name, |
| "prediction": info["classes"][prediction], |
| "prediction_id": prediction, |
| "confidence": round(confidence, 2), |
| "risk_level": self._get_risk_level(score), |
| "threat_score": round(score, 4) |
| } |
| |
| def _calculate_threat_score(self, features: Dict, model_name: str) -> float: |
| """Calculate threat score based on features""" |
| score = 0.0 |
| weights = { |
| "is_https": -0.2, |
| "has_mixed_content": 0.3, |
| "missing_headers_count": 0.1, |
| "has_insecure_cookies": 0.2, |
| "external_requests": 0.05, |
| "failed_requests": 0.15, |
| "console_errors": 0.1, |
| "suspicious_apis": 0.25, |
| "url_length": 0.001, |
| "has_ip_address": 0.3, |
| "has_suspicious_tld": 0.25, |
| } |
| |
| for feature, weight in weights.items(): |
| if feature in features: |
| value = features[feature] |
| if isinstance(value, bool): |
| value = 1 if value else 0 |
| score += value * weight |
| |
| |
| score = max(0, min(1, (score + 0.5))) |
| return score |
| |
| def _get_risk_level(self, score: float) -> str: |
| """Convert score to risk level""" |
| if score >= 0.8: |
| return "critical" |
| elif score >= 0.6: |
| return "high" |
| elif score >= 0.4: |
| return "medium" |
| elif score >= 0.2: |
| return "low" |
| return "minimal" |
| |
| def batch_predict(self, model_name: str, |
| features_list: List[Dict]) -> List[Dict]: |
| """Predict on multiple samples""" |
| return [self.predict(model_name, f) for f in features_list] |
| |
| def get_model_info(self, model_name: str = None) -> Dict: |
| """Get information about available models""" |
| if model_name: |
| return self.MODEL_INFO.get(model_name, {}) |
| return self.MODEL_INFO |
|
|
|
|
| |
| def create_app(): |
| """Create FastAPI application for model serving""" |
| try: |
| from fastapi import FastAPI, HTTPException |
| from pydantic import BaseModel |
| |
| app = FastAPI( |
| title="CyberForge ML API", |
| description="Real-time cybersecurity threat detection", |
| version="1.0.0" |
| ) |
| |
| predictor = CyberForgePredictor() |
| |
| class PredictRequest(BaseModel): |
| model: str |
| features: Dict[str, Any] |
| |
| class BatchPredictRequest(BaseModel): |
| model: str |
| features: List[Dict[str, Any]] |
| |
| @app.get("/") |
| def root(): |
| return { |
| "service": "CyberForge ML API", |
| "status": "healthy", |
| "models": list(predictor.MODEL_INFO.keys()) |
| } |
| |
| @app.get("/health") |
| def health(): |
| return {"status": "healthy"} |
| |
| @app.get("/models") |
| def list_models(): |
| return predictor.get_model_info() |
| |
| @app.post("/predict") |
| def predict(request: PredictRequest): |
| result = predictor.predict(request.model, request.features) |
| if "error" in result: |
| raise HTTPException(status_code=400, detail=result["error"]) |
| return result |
| |
| @app.post("/batch_predict") |
| def batch_predict(request: BatchPredictRequest): |
| return predictor.batch_predict(request.model, request.features) |
| |
| return app |
| except ImportError: |
| print("FastAPI not installed. Install with: pip install fastapi uvicorn") |
| return None |
|
|
|
|
| if __name__ == "__main__": |
| |
| predictor = CyberForgePredictor() |
| |
| test_features = { |
| "is_https": False, |
| "has_mixed_content": True, |
| "missing_headers_count": 3, |
| "has_insecure_cookies": True, |
| "url_length": 150 |
| } |
| |
| for model in predictor.MODEL_INFO.keys(): |
| result = predictor.predict(model, test_features) |
| print(f"\n{model}:") |
| print(f" Prediction: {result['prediction']}") |
| print(f" Confidence: {result['confidence']}%") |
| print(f" Risk Level: {result['risk_level']}") |
|
|