Spaces:
Running
Running
| import time | |
| import os | |
| import sys | |
| import json | |
| import gradio as gr | |
| from utils.preprocessor import TextPreprocessor | |
| from agents.agent1_external import ExternalAnalysisAgent | |
| from agents.agent2_content import ContentAnalysisAgent | |
| from agents.agent3_synthesizer import SynthesizerAgent | |
| from agents.agent4_prompt import PromptInjectionAgent | |
| class ThreatDetectionSystem: | |
| def __init__(self): | |
| print("Initializing Threat Detection System...") | |
| self.preprocessor = TextPreprocessor() | |
| self.agent1 = ExternalAnalysisAgent() | |
| self.agent2 = ContentAnalysisAgent() | |
| self.agent3 = SynthesizerAgent() | |
| self.agent4 = PromptInjectionAgent() | |
| print("System initialized!") | |
| def analyze(self, user_input): | |
| """Main analysis pipeline""" | |
| start_time = time.time() | |
| # Step 1: Preprocess | |
| preprocessed = self.preprocessor.preprocess(user_input) | |
| # Step 2: Run agents | |
| agent1_results = self.agent1.analyze(preprocessed) | |
| agent2_results = self.agent2.analyze(preprocessed) | |
| agent4_results = self.agent4.analyze(user_input) | |
| # Step 3: Synthesize results | |
| final_result = self.agent3.synthesize(agent1_results, agent2_results, agent4_results) | |
| final_result['processing_time'] = time.time() - start_time | |
| return final_result | |
| # Initialize the system globally for HF | |
| system = ThreatDetectionSystem() | |
| # --- Gradio UI Logic --- | |
| def ui_analyze(text): | |
| if not text or not text.strip(): | |
| return "Please enter some text", {}, {} | |
| result = system.analyze(text) | |
| # Prettify the report for display | |
| risk_color = "🔴" if result['risk_level'] == "HIGH" else "🟠" if result['risk_level'] == "MEDIUM" else "🟡" if result['risk_level'] == "LOW" else "🟢" | |
| report = f"{risk_color} {result['risk_level']} RISK DETECTED\n" | |
| report += f"Confidence: {result['confidence']:.1%}\n" | |
| report += f"Type: {', '.join(result['threat_types'])}\n\n" | |
| report += "Forensic Reasons:\n" + "\n".join([f"- {r}" for r in result['explanation']['reasons']]) | |
| return report, result['detailed_results'], result['explanation']['actions'] | |
| # --- Next.js Backend Compatibility API --- | |
| # This endpoint is what the Vercel frontend calls | |
| def api_analyze(text): | |
| try: | |
| if not text or not text.strip(): | |
| return {"error": "No input provided"} | |
| result = system.analyze(text) | |
| # Map to the schema expected by the Next.js frontend | |
| risk_map = {"MINIMAL": "Safe", "LOW": "Low", "MEDIUM": "Medium", "HIGH": "High"} | |
| risk_level = risk_map.get(result["risk_level"], "Medium") | |
| return { | |
| "riskLevel": risk_level, | |
| "threatType": ", ".join(result["threat_types"]), | |
| "confidenceScore": round(result["confidence"] * 100, 1), | |
| "riskScore": round(result["risk_score"], 4), | |
| "explanation": " ".join(result["explanation"]["reasons"]), | |
| "indicators": result["explanation"]["reasons"], | |
| "recommendations": result["explanation"]["actions"], | |
| "detailedScores": { | |
| "phishingProb": round(result["detailed_results"]["agent2"].get("phishing_probability", 0), 3), | |
| "spamProb": round(result["detailed_results"]["agent2"].get("spam_probability", 0), 3), | |
| "urlRisk": round(result["detailed_results"]["agent1"].get("url_risk", 0), 3), | |
| "sentimentLabel": result["detailed_results"]["agent2"].get("sentiment_label", "UNKNOWN"), | |
| "sentimentScore": round(result["detailed_results"]["agent2"].get("sentiment_score", 0), 3), | |
| "promptInjectionScore": round(result["detailed_results"]["agent4"].get("confidence", 0), 3), | |
| "promptInjectionDetected": result["detailed_results"]["agent4"].get("prompt_injection_detected", False), | |
| }, | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # --- Theme and Layout --- | |
| with gr.Blocks(theme="soft", title="🛡️ AegisAI Security") as demo: | |
| gr.Markdown("# 🛡️ AegisAI: Advanced Phishing & Fraud Detector") | |
| gr.Markdown("Drop an email body or URL here to get a full forensic breakdown.") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| input_box = gr.Textbox(label="Message Content", lines=8, placeholder="Paste email content...") | |
| with gr.Row(): | |
| clear_btn = gr.Button("Clear") | |
| submit_btn = gr.Button("Analyze Threat", variant="primary") | |
| with gr.Column(scale=3): | |
| out_report = gr.Textbox(label="Analysis Report", lines=10, interactive=False) | |
| out_actions = gr.JSON(label="Recommended Actions") | |
| out_scores = gr.JSON(label="Agent Confidence Scores") | |
| # Connect UI | |
| submit_btn.click(fn=ui_analyze, inputs=input_box, outputs=[out_report, out_scores, out_actions]) | |
| clear_btn.click(lambda: ["", "", {}, {}], outputs=[input_box, out_report, out_scores, out_actions]) | |
| # HIDDEN API ENDPOINT FOR VERCEL | |
| # Note: Hugging Face exposes this as an endpoint /run/predict or via api_name | |
| api_endpoint = gr.Button("API", visible=False) | |
| api_endpoint.click(fn=api_analyze, inputs=input_box, outputs=out_scores, api_name="analyze") | |
| if __name__ == "__main__": | |
| demo.launch() | |