cyberforge-models / cyberforge_agent.py
Che237's picture
Update CyberForge ML models and deployment artifacts
a17163e verified
"""CyberForge Agent Intelligence Module"""
import json
import time
import numpy as np
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Dict, List, Any, Optional
@dataclass
class AgentDecision:
action: str
confidence: float
reasoning: str
evidence: List[str]
risk_level: str
recommended_follow_up: List[str]
def to_dict(self):
return asdict(self)
class DecisionEngine:
SEVERITY_WEIGHTS = {"critical": 1.0, "high": 0.8, "medium": 0.5, "low": 0.3, "info": 0.1}
def calculate_threat_score(self, indicators: List[Dict]) -> tuple:
if not indicators:
return 0.0, "low"
scores = [i.get("confidence", 0.5) * self.SEVERITY_WEIGHTS.get(i.get("severity", "low"), 0.3)
for i in indicators]
score = sum(scores) / len(scores) if scores else 0
risk = "critical" if score >= 0.8 else "high" if score >= 0.6 else "medium" if score >= 0.4 else "low"
return score, risk
class CyberForgeAgent:
def __init__(self):
self.engine = DecisionEngine()
def analyze(self, url: str, data: Dict) -> Dict:
indicators = self._extract_indicators(data)
score, risk = self.engine.calculate_threat_score(indicators)
action = "block" if score >= 0.8 else "alert" if score >= 0.6 else "monitor" if score >= 0.4 else "allow"
return AgentDecision(
action=action,
confidence=score,
reasoning=f"Threat score: {score:.2f}. {len(indicators)} indicators found.",
evidence=[str(i) for i in indicators[:3]],
risk_level=risk,
recommended_follow_up=["Continue monitoring"]
).to_dict()
def _extract_indicators(self, data: Dict) -> List[Dict]:
indicators = []
sec = data.get("security_report", {})
if not sec.get("is_https", True):
indicators.append({"type": "insecure", "severity": "medium", "confidence": 0.9})
if sec.get("mixed_content"):
indicators.append({"type": "mixed_content", "severity": "medium", "confidence": 0.85})
return indicators