|
|
|
|
|
"""CyberForge Agent Intelligence Module""" |
|
|
|
|
|
import json |
|
|
import time |
|
|
import numpy as np |
|
|
from pathlib import Path |
|
|
from dataclasses import dataclass, asdict |
|
|
from typing import Dict, List, Any, Optional |
|
|
|
|
|
@dataclass |
|
|
class AgentDecision: |
|
|
action: str |
|
|
confidence: float |
|
|
reasoning: str |
|
|
evidence: List[str] |
|
|
risk_level: str |
|
|
recommended_follow_up: List[str] |
|
|
|
|
|
def to_dict(self): |
|
|
return asdict(self) |
|
|
|
|
|
class DecisionEngine: |
|
|
SEVERITY_WEIGHTS = {"critical": 1.0, "high": 0.8, "medium": 0.5, "low": 0.3, "info": 0.1} |
|
|
|
|
|
def calculate_threat_score(self, indicators: List[Dict]) -> tuple: |
|
|
if not indicators: |
|
|
return 0.0, "low" |
|
|
scores = [i.get("confidence", 0.5) * self.SEVERITY_WEIGHTS.get(i.get("severity", "low"), 0.3) |
|
|
for i in indicators] |
|
|
score = sum(scores) / len(scores) if scores else 0 |
|
|
risk = "critical" if score >= 0.8 else "high" if score >= 0.6 else "medium" if score >= 0.4 else "low" |
|
|
return score, risk |
|
|
|
|
|
class CyberForgeAgent: |
|
|
def __init__(self): |
|
|
self.engine = DecisionEngine() |
|
|
|
|
|
def analyze(self, url: str, data: Dict) -> Dict: |
|
|
indicators = self._extract_indicators(data) |
|
|
score, risk = self.engine.calculate_threat_score(indicators) |
|
|
action = "block" if score >= 0.8 else "alert" if score >= 0.6 else "monitor" if score >= 0.4 else "allow" |
|
|
|
|
|
return AgentDecision( |
|
|
action=action, |
|
|
confidence=score, |
|
|
reasoning=f"Threat score: {score:.2f}. {len(indicators)} indicators found.", |
|
|
evidence=[str(i) for i in indicators[:3]], |
|
|
risk_level=risk, |
|
|
recommended_follow_up=["Continue monitoring"] |
|
|
).to_dict() |
|
|
|
|
|
def _extract_indicators(self, data: Dict) -> List[Dict]: |
|
|
indicators = [] |
|
|
sec = data.get("security_report", {}) |
|
|
if not sec.get("is_https", True): |
|
|
indicators.append({"type": "insecure", "severity": "medium", "confidence": 0.9}) |
|
|
if sec.get("mixed_content"): |
|
|
indicators.append({"type": "mixed_content", "severity": "medium", "confidence": 0.85}) |
|
|
return indicators |
|
|
|