Spaces:
Sleeping
Sleeping
| """ | |
| ShadowWatch v2 - Open Source Threat Intelligence Platform | |
| 100% Free - No API Keys Required | |
| Uses public threat feeds and local analysis: | |
| - abuse.ch (URLhaus, ThreatFox, FeodoTracker, MalwareBazaar) | |
| - Spamhaus DROP lists | |
| - Emerging Threats blocklists | |
| - Direct DNS/WHOIS lookups | |
| - IOC extraction and analysis | |
| By Cogensec | ARGUS Platform | |
| """ | |
| import gradio as gr | |
| import requests | |
| import socket | |
| import hashlib | |
| import re | |
| import json | |
| import csv | |
| import io | |
| from datetime import datetime, timedelta | |
| from typing import Optional, Tuple, List, Dict | |
| from functools import lru_cache | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import plotly.graph_objects as go | |
| import plotly.express as px | |
| # ============================================================================ | |
| # THREAT FEED MANAGER - Downloads and caches public feeds | |
| # ============================================================================ | |
| class ThreatFeedManager: | |
| """Manages open-source threat intelligence feeds""" | |
| # Feed URLs (all free, no auth required) | |
| FEEDS = { | |
| "urlhaus": "https://urlhaus.abuse.ch/downloads/csv_recent/", | |
| "threatfox_iocs": "https://threatfox.abuse.ch/export/json/recent/", | |
| "feodo_ipblocklist": "https://feodotracker.abuse.ch/downloads/ipblocklist.csv", | |
| "malwarebazaar_recent": "https://bazaar.abuse.ch/export/csv/recent/", | |
| "spamhaus_drop": "https://www.spamhaus.org/drop/drop.txt", | |
| "spamhaus_edrop": "https://www.spamhaus.org/drop/edrop.txt", | |
| "emergingthreats_compromised": "https://rules.emergingthreats.net/blockrules/compromised-ips.txt", | |
| "openphish": "https://openphish.com/feed.txt", | |
| "hibp_breaches": "https://haveibeenpwned.com/api/v3/breaches", | |
| } | |
| def __init__(self): | |
| self.cache = {} | |
| self.cache_time = {} | |
| self.cache_duration = timedelta(hours=1) # Refresh every hour | |
| def _is_cache_valid(self, feed_name: str) -> bool: | |
| if feed_name not in self.cache_time: | |
| return False | |
| return datetime.now() - self.cache_time[feed_name] < self.cache_duration | |
| def fetch_feed(self, feed_name: str) -> Optional[str]: | |
| """Fetch a threat feed, using cache if available""" | |
| if self._is_cache_valid(feed_name): | |
| return self.cache.get(feed_name) | |
| url = self.FEEDS.get(feed_name) | |
| if not url: | |
| return None | |
| try: | |
| headers = {"User-Agent": "ShadowWatch-ARGUS/1.0"} | |
| resp = requests.get(url, headers=headers, timeout=30) | |
| if resp.status_code == 200: | |
| self.cache[feed_name] = resp.text | |
| self.cache_time[feed_name] = datetime.now() | |
| return resp.text | |
| except Exception as e: | |
| print(f"Error fetching {feed_name}: {e}") | |
| return self.cache.get(feed_name) # Return stale cache if fetch fails | |
| def get_urlhaus_urls(self) -> List[Dict]: | |
| """Get recent malicious URLs from URLhaus""" | |
| data = self.fetch_feed("urlhaus") | |
| if not data: | |
| return [] | |
| urls = [] | |
| reader = csv.reader(io.StringIO(data)) | |
| for row in reader: | |
| if row and not row[0].startswith('#') and len(row) >= 8: | |
| try: | |
| urls.append({ | |
| "id": row[0], | |
| "dateadded": row[1], | |
| "url": row[2], | |
| "url_status": row[3], | |
| "threat": row[5], | |
| "tags": row[6], | |
| "host": row[7] if len(row) > 7 else "" | |
| }) | |
| except: | |
| continue | |
| return urls | |
| def get_threatfox_iocs(self) -> List[Dict]: | |
| """Get recent IOCs from ThreatFox""" | |
| data = self.fetch_feed("threatfox_iocs") | |
| if not data: | |
| return [] | |
| try: | |
| parsed = json.loads(data) | |
| return parsed.get("data", []) if parsed.get("query_status") == "ok" else [] | |
| except: | |
| return [] | |
| def get_feodo_ips(self) -> List[Dict]: | |
| """Get botnet C2 IPs from FeodoTracker""" | |
| data = self.fetch_feed("feodo_ipblocklist") | |
| if not data: | |
| return [] | |
| ips = [] | |
| reader = csv.reader(io.StringIO(data)) | |
| for row in reader: | |
| if row and not row[0].startswith('#') and len(row) >= 5: | |
| try: | |
| ips.append({ | |
| "first_seen": row[0], | |
| "ip": row[1], | |
| "port": row[2], | |
| "status": row[3], | |
| "malware": row[4] | |
| }) | |
| except: | |
| continue | |
| return ips | |
| def get_spamhaus_ips(self) -> set: | |
| """Get bad IP ranges from Spamhaus DROP""" | |
| ips = set() | |
| for feed in ["spamhaus_drop", "spamhaus_edrop"]: | |
| data = self.fetch_feed(feed) | |
| if data: | |
| for line in data.split('\n'): | |
| line = line.strip() | |
| if line and not line.startswith(';'): | |
| # Extract IP/CIDR before any semicolon | |
| ip_part = line.split(';')[0].strip() | |
| if ip_part: | |
| ips.add(ip_part) | |
| return ips | |
| def get_emergingthreats_ips(self) -> set: | |
| """Get compromised IPs from Emerging Threats""" | |
| data = self.fetch_feed("emergingthreats_compromised") | |
| if not data: | |
| return set() | |
| ips = set() | |
| for line in data.split('\n'): | |
| line = line.strip() | |
| if line and not line.startswith('#'): | |
| ips.add(line) | |
| return ips | |
| def get_openphish_urls(self) -> set: | |
| """Get phishing URLs from OpenPhish""" | |
| data = self.fetch_feed("openphish") | |
| if not data: | |
| return set() | |
| return set(line.strip() for line in data.split('\n') if line.strip()) | |
| def get_hibp_breaches(self) -> List[Dict]: | |
| """Get public breach list from HIBP (no API key needed for breach list)""" | |
| data = self.fetch_feed("hibp_breaches") | |
| if not data: | |
| return [] | |
| try: | |
| return json.loads(data) | |
| except: | |
| return [] | |
| # Initialize feed manager | |
| feed_manager = ThreatFeedManager() | |
| # ============================================================================ | |
| # LOCAL ANALYSIS TOOLS | |
| # ============================================================================ | |
| def dns_lookup(domain: str) -> Dict: | |
| """Perform DNS lookup without external APIs""" | |
| results = { | |
| "domain": domain, | |
| "a_records": [], | |
| "aaaa_records": [], | |
| "mx_records": [], | |
| "ns_records": [], | |
| "error": None | |
| } | |
| try: | |
| # A records (IPv4) | |
| try: | |
| results["a_records"] = list(set(socket.gethostbyname_ex(domain)[2])) | |
| except: | |
| pass | |
| # Try to get additional info via getaddrinfo | |
| try: | |
| info = socket.getaddrinfo(domain, None) | |
| for item in info: | |
| ip = item[4][0] | |
| if ':' in ip and ip not in results["aaaa_records"]: | |
| results["aaaa_records"].append(ip) | |
| elif '.' in ip and ip not in results["a_records"]: | |
| results["a_records"].append(ip) | |
| except: | |
| pass | |
| except socket.gaierror as e: | |
| results["error"] = f"DNS lookup failed: {str(e)}" | |
| except Exception as e: | |
| results["error"] = str(e) | |
| return results | |
| def extract_iocs(text: str) -> Dict: | |
| """Extract Indicators of Compromise from text""" | |
| patterns = { | |
| "ipv4": r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b', | |
| "ipv6": r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b', | |
| "domain": r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b', | |
| "url": r'https?://[^\s<>"{}|\\^`\[\]]+', | |
| "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', | |
| "md5": r'\b[a-fA-F0-9]{32}\b', | |
| "sha1": r'\b[a-fA-F0-9]{40}\b', | |
| "sha256": r'\b[a-fA-F0-9]{64}\b', | |
| "bitcoin": r'\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b', | |
| "cve": r'CVE-\d{4}-\d{4,7}', | |
| } | |
| results = {} | |
| for ioc_type, pattern in patterns.items(): | |
| matches = list(set(re.findall(pattern, text, re.IGNORECASE))) | |
| # Filter out common false positives for domains | |
| if ioc_type == "domain": | |
| matches = [m for m in matches if not m.endswith('.png') and not m.endswith('.jpg') | |
| and not m.endswith('.gif') and len(m) > 4] | |
| results[ioc_type] = matches[:100] # Limit to 100 per type | |
| return results | |
| def calculate_hashes(text: str) -> Dict: | |
| """Calculate various hashes for given text/data""" | |
| data = text.encode('utf-8') | |
| return { | |
| "md5": hashlib.md5(data).hexdigest(), | |
| "sha1": hashlib.sha1(data).hexdigest(), | |
| "sha256": hashlib.sha256(data).hexdigest(), | |
| "sha512": hashlib.sha512(data).hexdigest(), | |
| } | |
| def check_ip_reputation(ip: str) -> Dict: | |
| """Check IP against local blocklists""" | |
| results = { | |
| "ip": ip, | |
| "is_malicious": False, | |
| "sources": [], | |
| "risk_score": 0, | |
| "details": [] | |
| } | |
| # Check Feodo botnet IPs | |
| feodo_ips = feed_manager.get_feodo_ips() | |
| for entry in feodo_ips: | |
| if entry.get("ip") == ip: | |
| results["is_malicious"] = True | |
| results["sources"].append("FeodoTracker") | |
| results["risk_score"] += 40 | |
| results["details"].append({ | |
| "source": "FeodoTracker", | |
| "malware": entry.get("malware", "Unknown"), | |
| "first_seen": entry.get("first_seen", "Unknown"), | |
| "status": entry.get("status", "Unknown") | |
| }) | |
| break | |
| # Check Spamhaus DROP | |
| spamhaus_ranges = feed_manager.get_spamhaus_ips() | |
| for ip_range in spamhaus_ranges: | |
| if '/' in ip_range: | |
| # CIDR check (simplified - just check if IP starts with network portion) | |
| network = ip_range.split('/')[0] | |
| network_parts = network.split('.') | |
| ip_parts = ip.split('.') | |
| if ip_parts[:len(network_parts)-1] == network_parts[:len(network_parts)-1]: | |
| results["is_malicious"] = True | |
| results["sources"].append("Spamhaus DROP") | |
| results["risk_score"] += 35 | |
| results["details"].append({ | |
| "source": "Spamhaus DROP", | |
| "matched_range": ip_range | |
| }) | |
| break | |
| elif ip == ip_range: | |
| results["is_malicious"] = True | |
| results["sources"].append("Spamhaus DROP") | |
| results["risk_score"] += 35 | |
| break | |
| # Check Emerging Threats | |
| et_ips = feed_manager.get_emergingthreats_ips() | |
| if ip in et_ips: | |
| results["is_malicious"] = True | |
| results["sources"].append("Emerging Threats") | |
| results["risk_score"] += 30 | |
| results["details"].append({ | |
| "source": "Emerging Threats", | |
| "category": "Compromised IP" | |
| }) | |
| # Check ThreatFox IOCs | |
| threatfox = feed_manager.get_threatfox_iocs() | |
| for ioc in threatfox: | |
| if ioc.get("ioc_type") == "ip:port" and ip in ioc.get("ioc", ""): | |
| results["is_malicious"] = True | |
| results["sources"].append("ThreatFox") | |
| results["risk_score"] += 45 | |
| results["details"].append({ | |
| "source": "ThreatFox", | |
| "malware": ioc.get("malware_printable", "Unknown"), | |
| "threat_type": ioc.get("threat_type", "Unknown"), | |
| "confidence": ioc.get("confidence_level", 0) | |
| }) | |
| break | |
| results["risk_score"] = min(results["risk_score"], 100) | |
| return results | |
| def check_url_reputation(url: str) -> Dict: | |
| """Check URL against local threat feeds""" | |
| results = { | |
| "url": url, | |
| "is_malicious": False, | |
| "sources": [], | |
| "risk_score": 0, | |
| "details": [] | |
| } | |
| # Normalize URL | |
| url_lower = url.lower().strip() | |
| # Extract domain from URL | |
| domain_match = re.search(r'https?://([^/]+)', url_lower) | |
| domain = domain_match.group(1) if domain_match else "" | |
| # Check URLhaus | |
| urlhaus_urls = feed_manager.get_urlhaus_urls() | |
| for entry in urlhaus_urls: | |
| if url_lower == entry.get("url", "").lower() or domain == entry.get("host", "").lower(): | |
| results["is_malicious"] = True | |
| results["sources"].append("URLhaus") | |
| results["risk_score"] += 50 | |
| results["details"].append({ | |
| "source": "URLhaus", | |
| "threat": entry.get("threat", "Unknown"), | |
| "tags": entry.get("tags", ""), | |
| "status": entry.get("url_status", "Unknown"), | |
| "date_added": entry.get("dateadded", "Unknown") | |
| }) | |
| break | |
| # Check OpenPhish | |
| phishing_urls = feed_manager.get_openphish_urls() | |
| if url_lower in phishing_urls or any(url_lower.startswith(p) for p in phishing_urls): | |
| results["is_malicious"] = True | |
| results["sources"].append("OpenPhish") | |
| results["risk_score"] += 45 | |
| results["details"].append({ | |
| "source": "OpenPhish", | |
| "category": "Phishing" | |
| }) | |
| # Check ThreatFox for URL IOCs | |
| threatfox = feed_manager.get_threatfox_iocs() | |
| for ioc in threatfox: | |
| if ioc.get("ioc_type") == "url" and url_lower in ioc.get("ioc", "").lower(): | |
| results["is_malicious"] = True | |
| results["sources"].append("ThreatFox") | |
| results["risk_score"] += 50 | |
| results["details"].append({ | |
| "source": "ThreatFox", | |
| "malware": ioc.get("malware_printable", "Unknown"), | |
| "threat_type": ioc.get("threat_type", "Unknown") | |
| }) | |
| break | |
| results["risk_score"] = min(results["risk_score"], 100) | |
| return results | |
| def check_domain_reputation(domain: str) -> Dict: | |
| """Check domain against threat feeds and perform DNS analysis""" | |
| results = { | |
| "domain": domain, | |
| "is_malicious": False, | |
| "sources": [], | |
| "risk_score": 0, | |
| "dns": {}, | |
| "details": [] | |
| } | |
| # DNS lookup | |
| results["dns"] = dns_lookup(domain) | |
| # Check if resolved IPs are malicious | |
| for ip in results["dns"].get("a_records", []): | |
| ip_rep = check_ip_reputation(ip) | |
| if ip_rep["is_malicious"]: | |
| results["is_malicious"] = True | |
| results["risk_score"] += ip_rep["risk_score"] // 2 | |
| results["details"].append({ | |
| "source": "DNS Resolution", | |
| "detail": f"Resolves to malicious IP: {ip}", | |
| "ip_sources": ip_rep["sources"] | |
| }) | |
| # Check URLhaus for domain | |
| urlhaus_urls = feed_manager.get_urlhaus_urls() | |
| for entry in urlhaus_urls: | |
| if domain.lower() == entry.get("host", "").lower(): | |
| results["is_malicious"] = True | |
| results["sources"].append("URLhaus") | |
| results["risk_score"] += 40 | |
| results["details"].append({ | |
| "source": "URLhaus", | |
| "threat": entry.get("threat", "Unknown"), | |
| "tags": entry.get("tags", "") | |
| }) | |
| break | |
| # Check ThreatFox for domain IOCs | |
| threatfox = feed_manager.get_threatfox_iocs() | |
| for ioc in threatfox: | |
| if ioc.get("ioc_type") == "domain" and domain.lower() == ioc.get("ioc", "").lower(): | |
| results["is_malicious"] = True | |
| results["sources"].append("ThreatFox") | |
| results["risk_score"] += 45 | |
| results["details"].append({ | |
| "source": "ThreatFox", | |
| "malware": ioc.get("malware_printable", "Unknown"), | |
| "threat_type": ioc.get("threat_type", "Unknown") | |
| }) | |
| break | |
| results["risk_score"] = min(results["risk_score"], 100) | |
| return results | |
| # ============================================================================ | |
| # VISUALIZATION HELPERS | |
| # ============================================================================ | |
| def create_risk_gauge(score: int, title: str = "Risk Score") -> go.Figure: | |
| """Create a risk gauge chart""" | |
| if score >= 75: | |
| color = "#ef4444" | |
| elif score >= 50: | |
| color = "#f97316" | |
| elif score >= 25: | |
| color = "#fbbf24" | |
| else: | |
| color = "#22c55e" | |
| fig = go.Figure(go.Indicator( | |
| mode="gauge+number", | |
| value=score, | |
| domain={'x': [0, 1], 'y': [0, 1]}, | |
| title={'text': title, 'font': {'size': 18, 'color': '#e2e8f0'}}, | |
| number={'font': {'size': 36, 'color': '#e2e8f0'}}, | |
| gauge={ | |
| 'axis': {'range': [0, 100], 'tickcolor': "#1e3a5f"}, | |
| 'bar': {'color': color}, | |
| 'bgcolor': "#111827", | |
| 'borderwidth': 2, | |
| 'bordercolor': "#1e3a5f", | |
| 'steps': [ | |
| {'range': [0, 25], 'color': 'rgba(34, 197, 94, 0.15)'}, | |
| {'range': [25, 50], 'color': 'rgba(251, 191, 36, 0.15)'}, | |
| {'range': [50, 75], 'color': 'rgba(249, 115, 22, 0.15)'}, | |
| {'range': [75, 100], 'color': 'rgba(239, 68, 68, 0.15)'} | |
| ], | |
| } | |
| )) | |
| fig.update_layout( | |
| paper_bgcolor='#0a0f1a', | |
| plot_bgcolor='#0a0f1a', | |
| height=280, | |
| margin=dict(l=20, r=20, t=50, b=20) | |
| ) | |
| return fig | |
| def create_ioc_chart(iocs: Dict) -> go.Figure: | |
| """Create a bar chart of extracted IOCs""" | |
| types = [] | |
| counts = [] | |
| for ioc_type, items in iocs.items(): | |
| if items: | |
| types.append(ioc_type.upper()) | |
| counts.append(len(items)) | |
| if not types: | |
| fig = go.Figure() | |
| fig.add_annotation(text="No IOCs extracted", x=0.5, y=0.5, showarrow=False, | |
| font=dict(size=16, color="#64748b"), xref="paper", yref="paper") | |
| fig.update_layout(paper_bgcolor='#0a0f1a', plot_bgcolor='#0a0f1a', height=280) | |
| return fig | |
| colors = ['#00ffd5' if c > 0 else '#64748b' for c in counts] | |
| fig = go.Figure(data=[go.Bar( | |
| x=types, | |
| y=counts, | |
| marker_color=colors, | |
| text=counts, | |
| textposition='outside' | |
| )]) | |
| fig.update_layout( | |
| title={'text': 'π Extracted IOCs', 'font': {'color': '#00ffd5'}}, | |
| paper_bgcolor='#0a0f1a', | |
| plot_bgcolor='#111827', | |
| font={'color': '#e2e8f0'}, | |
| height=280, | |
| xaxis={'gridcolor': '#1e3a5f'}, | |
| yaxis={'gridcolor': '#1e3a5f'}, | |
| margin=dict(l=40, r=20, t=50, b=40) | |
| ) | |
| return fig | |
| def create_threat_sources_chart(sources: List[str]) -> go.Figure: | |
| """Create a pie chart of threat sources""" | |
| if not sources: | |
| fig = go.Figure() | |
| fig.add_annotation(text="β No threats detected", x=0.5, y=0.5, showarrow=False, | |
| font=dict(size=18, color="#22c55e"), xref="paper", yref="paper") | |
| fig.update_layout(paper_bgcolor='#0a0f1a', plot_bgcolor='#0a0f1a', height=280) | |
| return fig | |
| from collections import Counter | |
| source_counts = Counter(sources) | |
| colors = ['#ef4444', '#f97316', '#fbbf24', '#00ffd5', '#8b5cf6'] | |
| fig = go.Figure(data=[go.Pie( | |
| labels=list(source_counts.keys()), | |
| values=list(source_counts.values()), | |
| hole=0.5, | |
| marker_colors=colors[:len(source_counts)], | |
| textinfo='label+value', | |
| textfont={'color': '#e2e8f0'} | |
| )]) | |
| fig.update_layout( | |
| title={'text': 'β οΈ Threat Sources', 'font': {'color': '#ef4444'}}, | |
| paper_bgcolor='#0a0f1a', | |
| plot_bgcolor='#0a0f1a', | |
| font={'color': '#e2e8f0'}, | |
| height=280, | |
| showlegend=False, | |
| margin=dict(l=20, r=20, t=50, b=20) | |
| ) | |
| return fig | |
| # ============================================================================ | |
| # MCP TOOLS | |
| # ============================================================================ | |
| def scan_indicator(indicator: str, indicator_type: str = "auto") -> Tuple[go.Figure, go.Figure, str]: | |
| """Scan an IP address, domain, or URL against open-source threat feeds. | |
| Checks the indicator against URLhaus, ThreatFox, FeodoTracker, | |
| Spamhaus DROP, Emerging Threats, and OpenPhish databases. | |
| Args: | |
| indicator: The IP, domain, or URL to scan | |
| indicator_type: "auto", "ip", "domain", or "url" | |
| Returns: | |
| Risk gauge, threat sources chart, and detailed HTML report | |
| """ | |
| if not indicator: | |
| empty_fig = go.Figure() | |
| empty_fig.update_layout(paper_bgcolor='#0a0f1a', height=280) | |
| return empty_fig, empty_fig, "<p style='color: #ef4444;'>Please enter an indicator to scan</p>" | |
| indicator = indicator.strip() | |
| # Auto-detect type | |
| if indicator_type == "auto": | |
| if re.match(r'^https?://', indicator): | |
| indicator_type = "url" | |
| elif re.match(r'^(\d{1,3}\.){3}\d{1,3}$', indicator): | |
| indicator_type = "ip" | |
| else: | |
| indicator_type = "domain" | |
| # Perform check based on type | |
| if indicator_type == "ip": | |
| result = check_ip_reputation(indicator) | |
| elif indicator_type == "url": | |
| result = check_url_reputation(indicator) | |
| else: | |
| result = check_domain_reputation(indicator) | |
| # Create visualizations | |
| gauge_fig = create_risk_gauge(result["risk_score"], "Threat Score") | |
| sources_fig = create_threat_sources_chart(result["sources"]) | |
| # Build HTML report | |
| status_color = "#ef4444" if result["is_malicious"] else "#22c55e" | |
| status_text = "β οΈ MALICIOUS" if result["is_malicious"] else "β CLEAN" | |
| html = f""" | |
| <div style="font-family: 'JetBrains Mono', monospace; color: #e2e8f0;"> | |
| <h3 style="color: #00ffd5; border-bottom: 1px solid #1e3a5f; padding-bottom: 10px;"> | |
| π Threat Intelligence Report | |
| </h3> | |
| <div style="display: grid; grid-template-columns: repeat(3, 1fr); gap: 16px; margin: 20px 0;"> | |
| <div style="background: #111827; padding: 16px; border-radius: 8px; border: 1px solid #1e3a5f;"> | |
| <div style="color: #64748b; font-size: 11px; text-transform: uppercase;">Indicator</div> | |
| <div style="font-size: 14px; color: #00ffd5; word-break: break-all;">{indicator}</div> | |
| </div> | |
| <div style="background: #111827; padding: 16px; border-radius: 8px; border: 1px solid #1e3a5f;"> | |
| <div style="color: #64748b; font-size: 11px; text-transform: uppercase;">Type</div> | |
| <div style="font-size: 18px; color: #e2e8f0;">{indicator_type.upper()}</div> | |
| </div> | |
| <div style="background: #111827; padding: 16px; border-radius: 8px; border: 1px solid #1e3a5f;"> | |
| <div style="color: #64748b; font-size: 11px; text-transform: uppercase;">Status</div> | |
| <div style="font-size: 18px; color: {status_color};">{status_text}</div> | |
| </div> | |
| </div> | |
| """ | |
| # DNS info for domains | |
| if indicator_type == "domain" and "dns" in result: | |
| dns = result["dns"] | |
| a_records = ", ".join(dns.get("a_records", [])) or "None" | |
| html += f""" | |
| <h4 style="color: #fbbf24; margin-top: 24px;">π DNS Resolution</h4> | |
| <div style="background: #111827; padding: 16px; border-radius: 8px; margin-top: 12px;"> | |
| <p><span style="color: #64748b;">A Records:</span> <span style="color: #00ffd5;">{a_records}</span></p> | |
| </div> | |
| """ | |
| # Threat details | |
| if result["details"]: | |
| html += """ | |
| <h4 style="color: #ef4444; margin-top: 24px;">π¨ Threat Details</h4> | |
| <table style="width: 100%; border-collapse: collapse; margin-top: 12px;"> | |
| <tr style="background: #1e3a5f;"> | |
| <th style="padding: 12px; text-align: left; color: #00ffd5;">Source</th> | |
| <th style="padding: 12px; text-align: left; color: #00ffd5;">Details</th> | |
| </tr> | |
| """ | |
| for detail in result["details"]: | |
| source = detail.get("source", "Unknown") | |
| info_parts = [f"{k}: {v}" for k, v in detail.items() if k != "source" and v] | |
| info = " | ".join(info_parts) | |
| html += f""" | |
| <tr style="border-bottom: 1px solid #1e3a5f;"> | |
| <td style="padding: 12px; color: #f97316;">{source}</td> | |
| <td style="padding: 12px; color: #94a3b8; font-size: 12px;">{info}</td> | |
| </tr> | |
| """ | |
| html += "</table>" | |
| # Recommendations | |
| html += f""" | |
| <div style="margin-top: 24px; padding: 16px; background: #111827; border-radius: 8px; border-left: 4px solid {'#ef4444' if result['is_malicious'] else '#22c55e'};"> | |
| <h4 style="color: {'#ef4444' if result['is_malicious'] else '#22c55e'}; margin: 0 0 12px 0;"> | |
| {'π« Recommended Actions' if result['is_malicious'] else 'β Assessment'} | |
| </h4> | |
| <ul style="color: #94a3b8; margin: 0; padding-left: 20px;"> | |
| """ | |
| if result["is_malicious"]: | |
| html += """ | |
| <li>Block this indicator in your firewall/proxy</li> | |
| <li>Search logs for any connections to this indicator</li> | |
| <li>If found, isolate affected systems</li> | |
| <li>Report to your security team</li> | |
| """ | |
| else: | |
| html += """ | |
| <li>No threats detected in current threat feeds</li> | |
| <li>Continue monitoring for changes</li> | |
| <li>Note: Absence of threat data doesn't guarantee safety</li> | |
| """ | |
| html += """ | |
| </ul> | |
| </div> | |
| <p style="color: #64748b; font-size: 11px; margin-top: 16px;"> | |
| Data sources: URLhaus, ThreatFox, FeodoTracker, Spamhaus DROP, Emerging Threats, OpenPhish | |
| </p> | |
| </div> | |
| """ | |
| return gauge_fig, sources_fig, html | |
| def extract_and_analyze_iocs(text: str) -> Tuple[go.Figure, str, str]: | |
| """Extract and analyze Indicators of Compromise from text. | |
| Extracts IPs, domains, URLs, email addresses, file hashes, | |
| Bitcoin addresses, and CVEs from the provided text, then | |
| checks extracted indicators against threat feeds. | |
| Args: | |
| text: Text containing potential IOCs (logs, reports, emails, etc.) | |
| Returns: | |
| IOC distribution chart, extracted IOCs as JSON, and analysis report HTML | |
| """ | |
| if not text or len(text.strip()) < 5: | |
| empty_fig = go.Figure() | |
| empty_fig.update_layout(paper_bgcolor='#0a0f1a', height=280) | |
| return empty_fig, "{}", "<p style='color: #ef4444;'>Please enter text to analyze</p>" | |
| # Extract IOCs | |
| iocs = extract_iocs(text) | |
| # Create visualization | |
| ioc_chart = create_ioc_chart(iocs) | |
| # Count totals | |
| total_iocs = sum(len(v) for v in iocs.values()) | |
| # Check some IOCs against threat feeds (limit to avoid overload) | |
| malicious_found = [] | |
| # Check IPs (limit to first 10) | |
| for ip in iocs.get("ipv4", [])[:10]: | |
| result = check_ip_reputation(ip) | |
| if result["is_malicious"]: | |
| malicious_found.append({"type": "IP", "value": ip, "sources": result["sources"]}) | |
| # Check domains (limit to first 10) | |
| for domain in iocs.get("domain", [])[:10]: | |
| result = check_domain_reputation(domain) | |
| if result["is_malicious"]: | |
| malicious_found.append({"type": "Domain", "value": domain, "sources": result["sources"]}) | |
| # Check URLs (limit to first 5) | |
| for url in iocs.get("url", [])[:5]: | |
| result = check_url_reputation(url) | |
| if result["is_malicious"]: | |
| malicious_found.append({"type": "URL", "value": url, "sources": result["sources"]}) | |
| # Build HTML report | |
| html = f""" | |
| <div style="font-family: 'JetBrains Mono', monospace; color: #e2e8f0;"> | |
| <h3 style="color: #00ffd5; border-bottom: 1px solid #1e3a5f; padding-bottom: 10px;"> | |
| π IOC Extraction Report | |
| </h3> | |
| <div style="display: grid; grid-template-columns: repeat(4, 1fr); gap: 16px; margin: 20px 0;"> | |
| <div style="background: #111827; padding: 16px; border-radius: 8px; border: 1px solid #1e3a5f;"> | |
| <div style="color: #64748b; font-size: 11px; text-transform: uppercase;">Total IOCs</div> | |
| <div style="font-size: 28px; color: #00ffd5;">{total_iocs}</div> | |
| </div> | |
| <div style="background: #111827; padding: 16px; border-radius: 8px; border: 1px solid #1e3a5f;"> | |
| <div style="color: #64748b; font-size: 11px; text-transform: uppercase;">IPs Found</div> | |
| <div style="font-size: 28px; color: #e2e8f0;">{len(iocs.get('ipv4', []))}</div> | |
| </div> | |
| <div style="background: #111827; padding: 16px; border-radius: 8px; border: 1px solid #1e3a5f;"> | |
| <div style="color: #64748b; font-size: 11px; text-transform: uppercase;">Domains</div> | |
| <div style="font-size: 28px; color: #e2e8f0;">{len(iocs.get('domain', []))}</div> | |
| </div> | |
| <div style="background: #111827; padding: 16px; border-radius: 8px; border: 1px solid #1e3a5f;"> | |
| <div style="color: #64748b; font-size: 11px; text-transform: uppercase;">Malicious</div> | |
| <div style="font-size: 28px; color: {'#ef4444' if malicious_found else '#22c55e'};">{len(malicious_found)}</div> | |
| </div> | |
| </div> | |
| """ | |
| # Show malicious indicators | |
| if malicious_found: | |
| html += """ | |
| <h4 style="color: #ef4444; margin-top: 24px;">π¨ Malicious Indicators Detected</h4> | |
| <table style="width: 100%; border-collapse: collapse; margin-top: 12px;"> | |
| <tr style="background: #1e3a5f;"> | |
| <th style="padding: 12px; text-align: left; color: #00ffd5;">Type</th> | |
| <th style="padding: 12px; text-align: left; color: #00ffd5;">Indicator</th> | |
| <th style="padding: 12px; text-align: left; color: #00ffd5;">Sources</th> | |
| </tr> | |
| """ | |
| for item in malicious_found: | |
| html += f""" | |
| <tr style="border-bottom: 1px solid #1e3a5f;"> | |
| <td style="padding: 12px; color: #f97316;">{item['type']}</td> | |
| <td style="padding: 12px; color: #ef4444; font-size: 12px; word-break: break-all;">{item['value']}</td> | |
| <td style="padding: 12px; color: #94a3b8;">{', '.join(item['sources'])}</td> | |
| </tr> | |
| """ | |
| html += "</table>" | |
| # Show extracted IOCs summary | |
| html += "<h4 style='color: #fbbf24; margin-top: 24px;'>π Extracted Indicators</h4>" | |
| for ioc_type, items in iocs.items(): | |
| if items: | |
| preview = items[:5] | |
| html += f""" | |
| <div style="background: #111827; padding: 12px 16px; border-radius: 6px; margin-top: 8px; border-left: 3px solid #00ffd5;"> | |
| <span style="color: #00ffd5; font-weight: 600;">{ioc_type.upper()}</span> | |
| <span style="color: #64748b; margin-left: 8px;">({len(items)} found)</span> | |
| <div style="color: #94a3b8; font-size: 12px; margin-top: 8px; word-break: break-all;"> | |
| {', '.join(preview)}{'...' if len(items) > 5 else ''} | |
| </div> | |
| </div> | |
| """ | |
| html += """ | |
| <div style="margin-top: 24px; padding: 16px; background: #111827; border-radius: 8px; border-left: 4px solid #00ffd5;"> | |
| <h4 style="color: #00ffd5; margin: 0 0 12px 0;">π‘ Next Steps</h4> | |
| <ul style="color: #94a3b8; margin: 0; padding-left: 20px;"> | |
| <li>Export IOCs for import into your SIEM/SOAR</li> | |
| <li>Block malicious indicators at network perimeter</li> | |
| <li>Search historical logs for these indicators</li> | |
| <li>Share findings with threat intel sharing groups</li> | |
| </ul> | |
| </div> | |
| </div> | |
| """ | |
| # Return IOCs as formatted JSON | |
| iocs_json = json.dumps(iocs, indent=2) | |
| return ioc_chart, iocs_json, html | |
| def get_threat_feed_stats() -> Tuple[go.Figure, str]: | |
| """Get current statistics from all loaded threat feeds. | |
| Shows the number of indicators loaded from each open-source | |
| threat intelligence feed. | |
| Returns: | |
| Feed statistics chart and detailed HTML report | |
| """ | |
| # Gather stats from all feeds | |
| stats = {} | |
| # URLhaus | |
| urlhaus = feed_manager.get_urlhaus_urls() | |
| stats["URLhaus URLs"] = len(urlhaus) | |
| # ThreatFox | |
| threatfox = feed_manager.get_threatfox_iocs() | |
| stats["ThreatFox IOCs"] = len(threatfox) | |
| # FeodoTracker | |
| feodo = feed_manager.get_feodo_ips() | |
| stats["Feodo Botnet IPs"] = len(feodo) | |
| # Spamhaus | |
| spamhaus = feed_manager.get_spamhaus_ips() | |
| stats["Spamhaus DROP Ranges"] = len(spamhaus) | |
| # Emerging Threats | |
| et = feed_manager.get_emergingthreats_ips() | |
| stats["Emerging Threats IPs"] = len(et) | |
| # OpenPhish | |
| phish = feed_manager.get_openphish_urls() | |
| stats["OpenPhish URLs"] = len(phish) | |
| # HIBP Breaches | |
| hibp = feed_manager.get_hibp_breaches() | |
| stats["Known Breaches"] = len(hibp) | |
| # Create bar chart | |
| fig = go.Figure(data=[go.Bar( | |
| x=list(stats.keys()), | |
| y=list(stats.values()), | |
| marker_color=['#00ffd5', '#22c55e', '#fbbf24', '#f97316', '#ef4444', '#8b5cf6', '#06b6d4'], | |
| text=list(stats.values()), | |
| textposition='outside' | |
| )]) | |
| fig.update_layout( | |
| title={'text': 'π Loaded Threat Intelligence', 'font': {'color': '#00ffd5'}}, | |
| paper_bgcolor='#0a0f1a', | |
| plot_bgcolor='#111827', | |
| font={'color': '#e2e8f0'}, | |
| height=350, | |
| xaxis={'tickangle': 45, 'gridcolor': '#1e3a5f'}, | |
| yaxis={'gridcolor': '#1e3a5f'}, | |
| margin=dict(l=40, r=20, t=60, b=120) | |
| ) | |
| total_indicators = sum(stats.values()) | |
| # Build HTML | |
| html = f""" | |
| <div style="font-family: 'JetBrains Mono', monospace; color: #e2e8f0;"> | |
| <h3 style="color: #00ffd5; border-bottom: 1px solid #1e3a5f; padding-bottom: 10px;"> | |
| π‘ Threat Feed Status | |
| </h3> | |
| <div style="background: #111827; padding: 20px; border-radius: 8px; margin: 20px 0; text-align: center;"> | |
| <div style="color: #64748b; font-size: 12px; text-transform: uppercase;">Total Indicators Loaded</div> | |
| <div style="font-size: 48px; color: #00ffd5; font-weight: bold;">{total_indicators:,}</div> | |
| </div> | |
| <h4 style="color: #fbbf24; margin-top: 24px;">π Feed Details</h4> | |
| <table style="width: 100%; border-collapse: collapse; margin-top: 12px;"> | |
| <tr style="background: #1e3a5f;"> | |
| <th style="padding: 12px; text-align: left; color: #00ffd5;">Feed</th> | |
| <th style="padding: 12px; text-align: left; color: #00ffd5;">Indicators</th> | |
| <th style="padding: 12px; text-align: left; color: #00ffd5;">Status</th> | |
| </tr> | |
| """ | |
| for feed_name, count in stats.items(): | |
| status = "β Loaded" if count > 0 else "β οΈ Empty" | |
| status_color = "#22c55e" if count > 0 else "#f97316" | |
| html += f""" | |
| <tr style="border-bottom: 1px solid #1e3a5f;"> | |
| <td style="padding: 12px; color: #e2e8f0;">{feed_name}</td> | |
| <td style="padding: 12px; color: #00ffd5;">{count:,}</td> | |
| <td style="padding: 12px; color: {status_color};">{status}</td> | |
| </tr> | |
| """ | |
| html += """ | |
| </table> | |
| <div style="margin-top: 24px; padding: 16px; background: #111827; border-radius: 8px; border-left: 4px solid #00ffd5;"> | |
| <h4 style="color: #00ffd5; margin: 0 0 12px 0;">βΉοΈ About These Feeds</h4> | |
| <ul style="color: #94a3b8; margin: 0; padding-left: 20px; font-size: 13px;"> | |
| <li><b>URLhaus:</b> Malicious URLs used for malware distribution (abuse.ch)</li> | |
| <li><b>ThreatFox:</b> IOCs from malware samples and campaigns (abuse.ch)</li> | |
| <li><b>FeodoTracker:</b> Botnet C2 servers (Emotet, Dridex, etc.)</li> | |
| <li><b>Spamhaus DROP:</b> IPs controlled by spammers/criminals</li> | |
| <li><b>Emerging Threats:</b> Compromised IPs from ProofPoint</li> | |
| <li><b>OpenPhish:</b> Active phishing URLs</li> | |
| <li><b>HIBP Breaches:</b> Public breach database metadata</li> | |
| </ul> | |
| <p style="color: #64748b; font-size: 11px; margin-top: 12px;"> | |
| Feeds refresh automatically every hour. All data is free and requires no API keys. | |
| </p> | |
| </div> | |
| </div> | |
| """ | |
| return fig, html | |
| # ============================================================================ | |
| # CUSTOM CSS | |
| # ============================================================================ | |
| custom_css = """ | |
| @import url('https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;500;600;700&family=Inter:wght@400;500;600&display=swap'); | |
| :root { | |
| --sw-bg-dark: #0a0f1a; | |
| --sw-bg-card: #111827; | |
| --sw-border: #1e3a5f; | |
| --sw-cyan: #00ffd5; | |
| } | |
| .gradio-container { | |
| background: var(--sw-bg-dark) !important; | |
| font-family: 'Inter', sans-serif !important; | |
| } | |
| .dark { | |
| --background-fill-primary: var(--sw-bg-dark) !important; | |
| --background-fill-secondary: var(--sw-bg-card) !important; | |
| --border-color-primary: var(--sw-border) !important; | |
| } | |
| h1, h2, h3, h4 { font-family: 'JetBrains Mono', monospace !important; } | |
| .tab-nav button { | |
| font-family: 'JetBrains Mono', monospace !important; | |
| font-size: 12px !important; | |
| letter-spacing: 1px !important; | |
| text-transform: uppercase !important; | |
| } | |
| .tab-nav button.selected { | |
| background: var(--sw-cyan) !important; | |
| color: var(--sw-bg-dark) !important; | |
| } | |
| input, textarea { | |
| font-family: 'JetBrains Mono', monospace !important; | |
| background: var(--sw-bg-dark) !important; | |
| border-color: var(--sw-border) !important; | |
| } | |
| input:focus, textarea:focus { border-color: var(--sw-cyan) !important; } | |
| .primary { background: linear-gradient(135deg, #00b396 0%, #007a6a 100%) !important; } | |
| """ | |
| # ============================================================================ | |
| # GRADIO INTERFACE | |
| # ============================================================================ | |
| with gr.Blocks( | |
| title="ShadowWatch | Open Source Threat Intel", | |
| theme=gr.themes.Base( | |
| primary_hue="cyan", | |
| secondary_hue="slate", | |
| neutral_hue="slate", | |
| ).set( | |
| body_background_fill="#0a0f1a", | |
| block_background_fill="#111827", | |
| block_border_width="1px", | |
| block_border_color="#1e3a5f", | |
| button_primary_background_fill="#00b396", | |
| ), | |
| css=custom_css | |
| ) as demo: | |
| # Header | |
| gr.HTML(""" | |
| <div style="background: linear-gradient(90deg, #111827 0%, #0d1520 100%); border: 1px solid #1e3a5f; border-radius: 8px; padding: 20px 24px; margin-bottom: 20px; display: flex; justify-content: space-between; align-items: center;"> | |
| <div style="display: flex; align-items: center; gap: 16px;"> | |
| <div style="font-size: 40px;">π‘οΈ</div> | |
| <div> | |
| <div style="font-family: 'JetBrains Mono', monospace; font-size: 28px; font-weight: 700; color: white; letter-spacing: 3px;">SHADOWWATCH</div> | |
| <div style="font-size: 12px; color: #64748b; letter-spacing: 2px; text-transform: uppercase;">Open Source Threat Intelligence β’ No API Keys Required</div> | |
| </div> | |
| </div> | |
| <div style="display: flex; gap: 20px; align-items: center; font-family: 'JetBrains Mono', monospace; font-size: 11px;"> | |
| <div style="color: #22c55e;">β 100% FREE</div> | |
| <div style="color: #00ffd5;">β MCP ENABLED</div> | |
| </div> | |
| </div> | |
| """) | |
| with gr.Tabs(): | |
| # Indicator Scan Tab | |
| with gr.TabItem("π― SCAN INDICATOR"): | |
| gr.Markdown("**Check an IP, domain, or URL against open-source threat feeds.**") | |
| with gr.Row(): | |
| indicator_input = gr.Textbox( | |
| label="Indicator", | |
| placeholder="Enter IP, domain, or URL...", | |
| scale=3 | |
| ) | |
| indicator_type = gr.Radio( | |
| choices=["auto", "ip", "domain", "url"], | |
| value="auto", | |
| label="Type", | |
| scale=1 | |
| ) | |
| scan_btn = gr.Button("π SCAN", variant="primary", scale=1) | |
| with gr.Row(): | |
| risk_gauge = gr.Plot(label="Threat Score") | |
| sources_chart = gr.Plot(label="Detection Sources") | |
| scan_report = gr.HTML(label="Analysis Report") | |
| scan_btn.click( | |
| fn=scan_indicator, | |
| inputs=[indicator_input, indicator_type], | |
| outputs=[risk_gauge, sources_chart, scan_report] | |
| ) | |
| # IOC Extractor Tab | |
| with gr.TabItem("π IOC EXTRACTOR"): | |
| gr.Markdown("**Extract and analyze Indicators of Compromise from text, logs, or reports.**") | |
| text_input = gr.Textbox( | |
| label="Paste Text to Analyze", | |
| placeholder="Paste logs, reports, emails, or any text containing potential IOCs...", | |
| lines=8 | |
| ) | |
| extract_btn = gr.Button("π¬ EXTRACT & ANALYZE", variant="primary") | |
| with gr.Row(): | |
| ioc_chart = gr.Plot(label="IOC Distribution") | |
| ioc_json = gr.Code(label="Extracted IOCs (JSON)", language="json") | |
| ioc_report = gr.HTML(label="Analysis Report") | |
| extract_btn.click( | |
| fn=extract_and_analyze_iocs, | |
| inputs=[text_input], | |
| outputs=[ioc_chart, ioc_json, ioc_report] | |
| ) | |
| # Threat Feeds Tab | |
| with gr.TabItem("π‘ THREAT FEEDS"): | |
| gr.Markdown("**View loaded threat intelligence from open-source feeds.**") | |
| refresh_btn = gr.Button("π REFRESH FEEDS", variant="primary") | |
| feed_chart = gr.Plot(label="Feed Statistics") | |
| feed_report = gr.HTML(label="Feed Details") | |
| refresh_btn.click( | |
| fn=get_threat_feed_stats, | |
| inputs=[], | |
| outputs=[feed_chart, feed_report] | |
| ) | |
| # Load on page load | |
| demo.load(fn=get_threat_feed_stats, outputs=[feed_chart, feed_report]) | |
| # Footer | |
| gr.HTML(""" | |
| <div style="background: #111827; border: 1px solid #1e3a5f; border-radius: 8px; padding: 16px 24px; margin-top: 20px; font-family: 'JetBrains Mono', monospace; font-size: 11px; color: #64748b;"> | |
| <div style="display: flex; justify-content: space-between; align-items: center;"> | |
| <div> | |
| <span>π MCP:</span> | |
| <span style="color: #00ffd5; margin-left: 8px;">https://crypticallyrequie-shadowwatchv2.hf.space/gradio_api/mcp/sse</span> | |
| </div> | |
| <div> | |
| Built by <span style="color: #00ffd5;">Cogensec</span> | ARGUS Platform | |
| </div> | |
| </div> | |
| <div style="margin-top: 12px; padding-top: 12px; border-top: 1px solid #1e3a5f;"> | |
| Data Sources: abuse.ch (URLhaus, ThreatFox, FeodoTracker, MalwareBazaar) β’ Spamhaus β’ Emerging Threats β’ OpenPhish β’ HIBP | |
| </div> | |
| </div> | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True) |