File size: 5,374 Bytes
b8630cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import time
import os
import sys
import json
import gradio as gr
from utils.preprocessor import TextPreprocessor
from agents.agent1_external import ExternalAnalysisAgent
from agents.agent2_content import ContentAnalysisAgent
from agents.agent3_synthesizer import SynthesizerAgent
from agents.agent4_prompt import PromptInjectionAgent

class ThreatDetectionSystem:
    def __init__(self):
        print("Initializing Threat Detection System...")
        self.preprocessor = TextPreprocessor()
        self.agent1 = ExternalAnalysisAgent()
        self.agent2 = ContentAnalysisAgent()
        self.agent3 = SynthesizerAgent()
        self.agent4 = PromptInjectionAgent()
        print("System initialized!")
    
    def analyze(self, user_input):
        """Main analysis pipeline"""
        start_time = time.time()
        
        # Step 1: Preprocess
        preprocessed = self.preprocessor.preprocess(user_input)
        
        # Step 2: Run agents
        agent1_results = self.agent1.analyze(preprocessed)
        agent2_results = self.agent2.analyze(preprocessed)
        agent4_results = self.agent4.analyze(user_input)
        
        # Step 3: Synthesize results
        final_result = self.agent3.synthesize(agent1_results, agent2_results, agent4_results)
        final_result['processing_time'] = time.time() - start_time
        
        return final_result

# Initialize the system globally for HF
system = ThreatDetectionSystem()

# --- Gradio UI Logic ---
def ui_analyze(text):
    if not text or not text.strip():
        return "Please enter some text", {}, {}
    
    result = system.analyze(text)
    
    # Prettify the report for display
    risk_color = "🔴" if result['risk_level'] == "HIGH" else "🟠" if result['risk_level'] == "MEDIUM" else "🟡" if result['risk_level'] == "LOW" else "🟢"
    report = f"{risk_color} {result['risk_level']} RISK DETECTED\n"
    report += f"Confidence: {result['confidence']:.1%}\n"
    report += f"Type: {', '.join(result['threat_types'])}\n\n"
    report += "Forensic Reasons:\n" + "\n".join([f"- {r}" for r in result['explanation']['reasons']])
    
    return report, result['detailed_results'], result['explanation']['actions']

# --- Next.js Backend Compatibility API ---
# This endpoint is what the Vercel frontend calls
def api_analyze(text):
    try:
        if not text or not text.strip():
            return {"error": "No input provided"}
            
        result = system.analyze(text)
        
        # Map to the schema expected by the Next.js frontend
        risk_map = {"MINIMAL": "Safe", "LOW": "Low", "MEDIUM": "Medium", "HIGH": "High"}
        risk_level = risk_map.get(result["risk_level"], "Medium")

        return {
            "riskLevel": risk_level,
            "threatType": ", ".join(result["threat_types"]),
            "confidenceScore": round(result["confidence"] * 100, 1),
            "riskScore": round(result["risk_score"], 4),
            "explanation": " ".join(result["explanation"]["reasons"]),
            "indicators": result["explanation"]["reasons"],
            "recommendations": result["explanation"]["actions"],
            "detailedScores": {
                "phishingProb": round(result["detailed_results"]["agent2"].get("phishing_probability", 0), 3),
                "spamProb": round(result["detailed_results"]["agent2"].get("spam_probability", 0), 3),
                "urlRisk": round(result["detailed_results"]["agent1"].get("url_risk", 0), 3),
                "sentimentLabel": result["detailed_results"]["agent2"].get("sentiment_label", "UNKNOWN"),
                "sentimentScore": round(result["detailed_results"]["agent2"].get("sentiment_score", 0), 3),
                "promptInjectionScore": round(result["detailed_results"]["agent4"].get("confidence", 0), 3),
                "promptInjectionDetected": result["detailed_results"]["agent4"].get("prompt_injection_detected", False),
            },
        }
    except Exception as e:
        return {"error": str(e)}

# --- Theme and Layout ---
with gr.Blocks(theme="soft", title="🛡️ AegisAI Security") as demo:
    gr.Markdown("# 🛡️ AegisAI: Advanced Phishing & Fraud Detector")
    gr.Markdown("Drop an email body or URL here to get a full forensic breakdown.")
    
    with gr.Row():
        with gr.Column(scale=2):
            input_box = gr.Textbox(label="Message Content", lines=8, placeholder="Paste email content...")
            with gr.Row():
                clear_btn = gr.Button("Clear")
                submit_btn = gr.Button("Analyze Threat", variant="primary")
        
        with gr.Column(scale=3):
            out_report = gr.Textbox(label="Analysis Report", lines=10, interactive=False)
            out_actions = gr.JSON(label="Recommended Actions")
            out_scores = gr.JSON(label="Agent Confidence Scores")

    # Connect UI
    submit_btn.click(fn=ui_analyze, inputs=input_box, outputs=[out_report, out_scores, out_actions])
    clear_btn.click(lambda: ["", "", {}, {}], outputs=[input_box, out_report, out_scores, out_actions])

    # HIDDEN API ENDPOINT FOR VERCEL
    # Note: Hugging Face exposes this as an endpoint /run/predict or via api_name
    api_endpoint = gr.Button("API", visible=False)
    api_endpoint.click(fn=api_analyze, inputs=input_box, outputs=out_scores, api_name="analyze")

if __name__ == "__main__":
    demo.launch()