""" ShadowWatch v2 - Open Source Threat Intelligence Platform 100% Free - No API Keys Required Uses public threat feeds and local analysis: - abuse.ch (URLhaus, ThreatFox, FeodoTracker, MalwareBazaar) - Spamhaus DROP lists - Emerging Threats blocklists - Direct DNS/WHOIS lookups - IOC extraction and analysis By Cogensec | ARGUS Platform """ import gradio as gr import requests import socket import hashlib import re import json import csv import io from datetime import datetime, timedelta from typing import Optional, Tuple, List, Dict from functools import lru_cache from concurrent.futures import ThreadPoolExecutor, as_completed import plotly.graph_objects as go import plotly.express as px # ============================================================================ # THREAT FEED MANAGER - Downloads and caches public feeds # ============================================================================ class ThreatFeedManager: """Manages open-source threat intelligence feeds""" # Feed URLs (all free, no auth required) FEEDS = { "urlhaus": "https://urlhaus.abuse.ch/downloads/csv_recent/", "threatfox_iocs": "https://threatfox.abuse.ch/export/json/recent/", "feodo_ipblocklist": "https://feodotracker.abuse.ch/downloads/ipblocklist.csv", "malwarebazaar_recent": "https://bazaar.abuse.ch/export/csv/recent/", "spamhaus_drop": "https://www.spamhaus.org/drop/drop.txt", "spamhaus_edrop": "https://www.spamhaus.org/drop/edrop.txt", "emergingthreats_compromised": "https://rules.emergingthreats.net/blockrules/compromised-ips.txt", "openphish": "https://openphish.com/feed.txt", "hibp_breaches": "https://haveibeenpwned.com/api/v3/breaches", } def __init__(self): self.cache = {} self.cache_time = {} self.cache_duration = timedelta(hours=1) # Refresh every hour def _is_cache_valid(self, feed_name: str) -> bool: if feed_name not in self.cache_time: return False return datetime.now() - self.cache_time[feed_name] < self.cache_duration def fetch_feed(self, feed_name: str) -> Optional[str]: """Fetch a threat feed, using cache if available""" if self._is_cache_valid(feed_name): return self.cache.get(feed_name) url = self.FEEDS.get(feed_name) if not url: return None try: headers = {"User-Agent": "ShadowWatch-ARGUS/1.0"} resp = requests.get(url, headers=headers, timeout=30) if resp.status_code == 200: self.cache[feed_name] = resp.text self.cache_time[feed_name] = datetime.now() return resp.text except Exception as e: print(f"Error fetching {feed_name}: {e}") return self.cache.get(feed_name) # Return stale cache if fetch fails def get_urlhaus_urls(self) -> List[Dict]: """Get recent malicious URLs from URLhaus""" data = self.fetch_feed("urlhaus") if not data: return [] urls = [] reader = csv.reader(io.StringIO(data)) for row in reader: if row and not row[0].startswith('#') and len(row) >= 8: try: urls.append({ "id": row[0], "dateadded": row[1], "url": row[2], "url_status": row[3], "threat": row[5], "tags": row[6], "host": row[7] if len(row) > 7 else "" }) except: continue return urls def get_threatfox_iocs(self) -> List[Dict]: """Get recent IOCs from ThreatFox""" data = self.fetch_feed("threatfox_iocs") if not data: return [] try: parsed = json.loads(data) return parsed.get("data", []) if parsed.get("query_status") == "ok" else [] except: return [] def get_feodo_ips(self) -> List[Dict]: """Get botnet C2 IPs from FeodoTracker""" data = self.fetch_feed("feodo_ipblocklist") if not data: return [] ips = [] reader = csv.reader(io.StringIO(data)) for row in reader: if row and not row[0].startswith('#') and len(row) >= 5: try: ips.append({ "first_seen": row[0], "ip": row[1], "port": row[2], "status": row[3], "malware": row[4] }) except: continue return ips def get_spamhaus_ips(self) -> set: """Get bad IP ranges from Spamhaus DROP""" ips = set() for feed in ["spamhaus_drop", "spamhaus_edrop"]: data = self.fetch_feed(feed) if data: for line in data.split('\n'): line = line.strip() if line and not line.startswith(';'): # Extract IP/CIDR before any semicolon ip_part = line.split(';')[0].strip() if ip_part: ips.add(ip_part) return ips def get_emergingthreats_ips(self) -> set: """Get compromised IPs from Emerging Threats""" data = self.fetch_feed("emergingthreats_compromised") if not data: return set() ips = set() for line in data.split('\n'): line = line.strip() if line and not line.startswith('#'): ips.add(line) return ips def get_openphish_urls(self) -> set: """Get phishing URLs from OpenPhish""" data = self.fetch_feed("openphish") if not data: return set() return set(line.strip() for line in data.split('\n') if line.strip()) def get_hibp_breaches(self) -> List[Dict]: """Get public breach list from HIBP (no API key needed for breach list)""" data = self.fetch_feed("hibp_breaches") if not data: return [] try: return json.loads(data) except: return [] # Initialize feed manager feed_manager = ThreatFeedManager() # ============================================================================ # LOCAL ANALYSIS TOOLS # ============================================================================ def dns_lookup(domain: str) -> Dict: """Perform DNS lookup without external APIs""" results = { "domain": domain, "a_records": [], "aaaa_records": [], "mx_records": [], "ns_records": [], "error": None } try: # A records (IPv4) try: results["a_records"] = list(set(socket.gethostbyname_ex(domain)[2])) except: pass # Try to get additional info via getaddrinfo try: info = socket.getaddrinfo(domain, None) for item in info: ip = item[4][0] if ':' in ip and ip not in results["aaaa_records"]: results["aaaa_records"].append(ip) elif '.' in ip and ip not in results["a_records"]: results["a_records"].append(ip) except: pass except socket.gaierror as e: results["error"] = f"DNS lookup failed: {str(e)}" except Exception as e: results["error"] = str(e) return results def extract_iocs(text: str) -> Dict: """Extract Indicators of Compromise from text""" patterns = { "ipv4": r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b', "ipv6": r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b', "domain": r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b', "url": r'https?://[^\s<>"{}|\\^`\[\]]+', "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', "md5": r'\b[a-fA-F0-9]{32}\b', "sha1": r'\b[a-fA-F0-9]{40}\b', "sha256": r'\b[a-fA-F0-9]{64}\b', "bitcoin": r'\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b', "cve": r'CVE-\d{4}-\d{4,7}', } results = {} for ioc_type, pattern in patterns.items(): matches = list(set(re.findall(pattern, text, re.IGNORECASE))) # Filter out common false positives for domains if ioc_type == "domain": matches = [m for m in matches if not m.endswith('.png') and not m.endswith('.jpg') and not m.endswith('.gif') and len(m) > 4] results[ioc_type] = matches[:100] # Limit to 100 per type return results def calculate_hashes(text: str) -> Dict: """Calculate various hashes for given text/data""" data = text.encode('utf-8') return { "md5": hashlib.md5(data).hexdigest(), "sha1": hashlib.sha1(data).hexdigest(), "sha256": hashlib.sha256(data).hexdigest(), "sha512": hashlib.sha512(data).hexdigest(), } def check_ip_reputation(ip: str) -> Dict: """Check IP against local blocklists""" results = { "ip": ip, "is_malicious": False, "sources": [], "risk_score": 0, "details": [] } # Check Feodo botnet IPs feodo_ips = feed_manager.get_feodo_ips() for entry in feodo_ips: if entry.get("ip") == ip: results["is_malicious"] = True results["sources"].append("FeodoTracker") results["risk_score"] += 40 results["details"].append({ "source": "FeodoTracker", "malware": entry.get("malware", "Unknown"), "first_seen": entry.get("first_seen", "Unknown"), "status": entry.get("status", "Unknown") }) break # Check Spamhaus DROP spamhaus_ranges = feed_manager.get_spamhaus_ips() for ip_range in spamhaus_ranges: if '/' in ip_range: # CIDR check (simplified - just check if IP starts with network portion) network = ip_range.split('/')[0] network_parts = network.split('.') ip_parts = ip.split('.') if ip_parts[:len(network_parts)-1] == network_parts[:len(network_parts)-1]: results["is_malicious"] = True results["sources"].append("Spamhaus DROP") results["risk_score"] += 35 results["details"].append({ "source": "Spamhaus DROP", "matched_range": ip_range }) break elif ip == ip_range: results["is_malicious"] = True results["sources"].append("Spamhaus DROP") results["risk_score"] += 35 break # Check Emerging Threats et_ips = feed_manager.get_emergingthreats_ips() if ip in et_ips: results["is_malicious"] = True results["sources"].append("Emerging Threats") results["risk_score"] += 30 results["details"].append({ "source": "Emerging Threats", "category": "Compromised IP" }) # Check ThreatFox IOCs threatfox = feed_manager.get_threatfox_iocs() for ioc in threatfox: if ioc.get("ioc_type") == "ip:port" and ip in ioc.get("ioc", ""): results["is_malicious"] = True results["sources"].append("ThreatFox") results["risk_score"] += 45 results["details"].append({ "source": "ThreatFox", "malware": ioc.get("malware_printable", "Unknown"), "threat_type": ioc.get("threat_type", "Unknown"), "confidence": ioc.get("confidence_level", 0) }) break results["risk_score"] = min(results["risk_score"], 100) return results def check_url_reputation(url: str) -> Dict: """Check URL against local threat feeds""" results = { "url": url, "is_malicious": False, "sources": [], "risk_score": 0, "details": [] } # Normalize URL url_lower = url.lower().strip() # Extract domain from URL domain_match = re.search(r'https?://([^/]+)', url_lower) domain = domain_match.group(1) if domain_match else "" # Check URLhaus urlhaus_urls = feed_manager.get_urlhaus_urls() for entry in urlhaus_urls: if url_lower == entry.get("url", "").lower() or domain == entry.get("host", "").lower(): results["is_malicious"] = True results["sources"].append("URLhaus") results["risk_score"] += 50 results["details"].append({ "source": "URLhaus", "threat": entry.get("threat", "Unknown"), "tags": entry.get("tags", ""), "status": entry.get("url_status", "Unknown"), "date_added": entry.get("dateadded", "Unknown") }) break # Check OpenPhish phishing_urls = feed_manager.get_openphish_urls() if url_lower in phishing_urls or any(url_lower.startswith(p) for p in phishing_urls): results["is_malicious"] = True results["sources"].append("OpenPhish") results["risk_score"] += 45 results["details"].append({ "source": "OpenPhish", "category": "Phishing" }) # Check ThreatFox for URL IOCs threatfox = feed_manager.get_threatfox_iocs() for ioc in threatfox: if ioc.get("ioc_type") == "url" and url_lower in ioc.get("ioc", "").lower(): results["is_malicious"] = True results["sources"].append("ThreatFox") results["risk_score"] += 50 results["details"].append({ "source": "ThreatFox", "malware": ioc.get("malware_printable", "Unknown"), "threat_type": ioc.get("threat_type", "Unknown") }) break results["risk_score"] = min(results["risk_score"], 100) return results def check_domain_reputation(domain: str) -> Dict: """Check domain against threat feeds and perform DNS analysis""" results = { "domain": domain, "is_malicious": False, "sources": [], "risk_score": 0, "dns": {}, "details": [] } # DNS lookup results["dns"] = dns_lookup(domain) # Check if resolved IPs are malicious for ip in results["dns"].get("a_records", []): ip_rep = check_ip_reputation(ip) if ip_rep["is_malicious"]: results["is_malicious"] = True results["risk_score"] += ip_rep["risk_score"] // 2 results["details"].append({ "source": "DNS Resolution", "detail": f"Resolves to malicious IP: {ip}", "ip_sources": ip_rep["sources"] }) # Check URLhaus for domain urlhaus_urls = feed_manager.get_urlhaus_urls() for entry in urlhaus_urls: if domain.lower() == entry.get("host", "").lower(): results["is_malicious"] = True results["sources"].append("URLhaus") results["risk_score"] += 40 results["details"].append({ "source": "URLhaus", "threat": entry.get("threat", "Unknown"), "tags": entry.get("tags", "") }) break # Check ThreatFox for domain IOCs threatfox = feed_manager.get_threatfox_iocs() for ioc in threatfox: if ioc.get("ioc_type") == "domain" and domain.lower() == ioc.get("ioc", "").lower(): results["is_malicious"] = True results["sources"].append("ThreatFox") results["risk_score"] += 45 results["details"].append({ "source": "ThreatFox", "malware": ioc.get("malware_printable", "Unknown"), "threat_type": ioc.get("threat_type", "Unknown") }) break results["risk_score"] = min(results["risk_score"], 100) return results # ============================================================================ # VISUALIZATION HELPERS # ============================================================================ def create_risk_gauge(score: int, title: str = "Risk Score") -> go.Figure: """Create a risk gauge chart""" if score >= 75: color = "#ef4444" elif score >= 50: color = "#f97316" elif score >= 25: color = "#fbbf24" else: color = "#22c55e" fig = go.Figure(go.Indicator( mode="gauge+number", value=score, domain={'x': [0, 1], 'y': [0, 1]}, title={'text': title, 'font': {'size': 18, 'color': '#e2e8f0'}}, number={'font': {'size': 36, 'color': '#e2e8f0'}}, gauge={ 'axis': {'range': [0, 100], 'tickcolor': "#1e3a5f"}, 'bar': {'color': color}, 'bgcolor': "#111827", 'borderwidth': 2, 'bordercolor': "#1e3a5f", 'steps': [ {'range': [0, 25], 'color': 'rgba(34, 197, 94, 0.15)'}, {'range': [25, 50], 'color': 'rgba(251, 191, 36, 0.15)'}, {'range': [50, 75], 'color': 'rgba(249, 115, 22, 0.15)'}, {'range': [75, 100], 'color': 'rgba(239, 68, 68, 0.15)'} ], } )) fig.update_layout( paper_bgcolor='#0a0f1a', plot_bgcolor='#0a0f1a', height=280, margin=dict(l=20, r=20, t=50, b=20) ) return fig def create_ioc_chart(iocs: Dict) -> go.Figure: """Create a bar chart of extracted IOCs""" types = [] counts = [] for ioc_type, items in iocs.items(): if items: types.append(ioc_type.upper()) counts.append(len(items)) if not types: fig = go.Figure() fig.add_annotation(text="No IOCs extracted", x=0.5, y=0.5, showarrow=False, font=dict(size=16, color="#64748b"), xref="paper", yref="paper") fig.update_layout(paper_bgcolor='#0a0f1a', plot_bgcolor='#0a0f1a', height=280) return fig colors = ['#00ffd5' if c > 0 else '#64748b' for c in counts] fig = go.Figure(data=[go.Bar( x=types, y=counts, marker_color=colors, text=counts, textposition='outside' )]) fig.update_layout( title={'text': '📊 Extracted IOCs', 'font': {'color': '#00ffd5'}}, paper_bgcolor='#0a0f1a', plot_bgcolor='#111827', font={'color': '#e2e8f0'}, height=280, xaxis={'gridcolor': '#1e3a5f'}, yaxis={'gridcolor': '#1e3a5f'}, margin=dict(l=40, r=20, t=50, b=40) ) return fig def create_threat_sources_chart(sources: List[str]) -> go.Figure: """Create a pie chart of threat sources""" if not sources: fig = go.Figure() fig.add_annotation(text="✓ No threats detected", x=0.5, y=0.5, showarrow=False, font=dict(size=18, color="#22c55e"), xref="paper", yref="paper") fig.update_layout(paper_bgcolor='#0a0f1a', plot_bgcolor='#0a0f1a', height=280) return fig from collections import Counter source_counts = Counter(sources) colors = ['#ef4444', '#f97316', '#fbbf24', '#00ffd5', '#8b5cf6'] fig = go.Figure(data=[go.Pie( labels=list(source_counts.keys()), values=list(source_counts.values()), hole=0.5, marker_colors=colors[:len(source_counts)], textinfo='label+value', textfont={'color': '#e2e8f0'} )]) fig.update_layout( title={'text': 'âš ī¸ Threat Sources', 'font': {'color': '#ef4444'}}, paper_bgcolor='#0a0f1a', plot_bgcolor='#0a0f1a', font={'color': '#e2e8f0'}, height=280, showlegend=False, margin=dict(l=20, r=20, t=50, b=20) ) return fig # ============================================================================ # MCP TOOLS # ============================================================================ def scan_indicator(indicator: str, indicator_type: str = "auto") -> Tuple[go.Figure, go.Figure, str]: """Scan an IP address, domain, or URL against open-source threat feeds. Checks the indicator against URLhaus, ThreatFox, FeodoTracker, Spamhaus DROP, Emerging Threats, and OpenPhish databases. Args: indicator: The IP, domain, or URL to scan indicator_type: "auto", "ip", "domain", or "url" Returns: Risk gauge, threat sources chart, and detailed HTML report """ if not indicator: empty_fig = go.Figure() empty_fig.update_layout(paper_bgcolor='#0a0f1a', height=280) return empty_fig, empty_fig, "

Please enter an indicator to scan

" indicator = indicator.strip() # Auto-detect type if indicator_type == "auto": if re.match(r'^https?://', indicator): indicator_type = "url" elif re.match(r'^(\d{1,3}\.){3}\d{1,3}$', indicator): indicator_type = "ip" else: indicator_type = "domain" # Perform check based on type if indicator_type == "ip": result = check_ip_reputation(indicator) elif indicator_type == "url": result = check_url_reputation(indicator) else: result = check_domain_reputation(indicator) # Create visualizations gauge_fig = create_risk_gauge(result["risk_score"], "Threat Score") sources_fig = create_threat_sources_chart(result["sources"]) # Build HTML report status_color = "#ef4444" if result["is_malicious"] else "#22c55e" status_text = "âš ī¸ MALICIOUS" if result["is_malicious"] else "✓ CLEAN" html = f"""

🔍 Threat Intelligence Report

Indicator
{indicator}
Type
{indicator_type.upper()}
Status
{status_text}
""" # DNS info for domains if indicator_type == "domain" and "dns" in result: dns = result["dns"] a_records = ", ".join(dns.get("a_records", [])) or "None" html += f"""

🌐 DNS Resolution

A Records: {a_records}

""" # Threat details if result["details"]: html += """

🚨 Threat Details

""" for detail in result["details"]: source = detail.get("source", "Unknown") info_parts = [f"{k}: {v}" for k, v in detail.items() if k != "source" and v] info = " | ".join(info_parts) html += f""" """ html += "
Source Details
{source} {info}
" # Recommendations html += f"""

{'đŸšĢ Recommended Actions' if result['is_malicious'] else '✓ Assessment'}

Data sources: URLhaus, ThreatFox, FeodoTracker, Spamhaus DROP, Emerging Threats, OpenPhish

""" return gauge_fig, sources_fig, html def extract_and_analyze_iocs(text: str) -> Tuple[go.Figure, str, str]: """Extract and analyze Indicators of Compromise from text. Extracts IPs, domains, URLs, email addresses, file hashes, Bitcoin addresses, and CVEs from the provided text, then checks extracted indicators against threat feeds. Args: text: Text containing potential IOCs (logs, reports, emails, etc.) Returns: IOC distribution chart, extracted IOCs as JSON, and analysis report HTML """ if not text or len(text.strip()) < 5: empty_fig = go.Figure() empty_fig.update_layout(paper_bgcolor='#0a0f1a', height=280) return empty_fig, "{}", "

Please enter text to analyze

" # Extract IOCs iocs = extract_iocs(text) # Create visualization ioc_chart = create_ioc_chart(iocs) # Count totals total_iocs = sum(len(v) for v in iocs.values()) # Check some IOCs against threat feeds (limit to avoid overload) malicious_found = [] # Check IPs (limit to first 10) for ip in iocs.get("ipv4", [])[:10]: result = check_ip_reputation(ip) if result["is_malicious"]: malicious_found.append({"type": "IP", "value": ip, "sources": result["sources"]}) # Check domains (limit to first 10) for domain in iocs.get("domain", [])[:10]: result = check_domain_reputation(domain) if result["is_malicious"]: malicious_found.append({"type": "Domain", "value": domain, "sources": result["sources"]}) # Check URLs (limit to first 5) for url in iocs.get("url", [])[:5]: result = check_url_reputation(url) if result["is_malicious"]: malicious_found.append({"type": "URL", "value": url, "sources": result["sources"]}) # Build HTML report html = f"""

📋 IOC Extraction Report

Total IOCs
{total_iocs}
IPs Found
{len(iocs.get('ipv4', []))}
Domains
{len(iocs.get('domain', []))}
Malicious
{len(malicious_found)}
""" # Show malicious indicators if malicious_found: html += """

🚨 Malicious Indicators Detected

""" for item in malicious_found: html += f""" """ html += "
Type Indicator Sources
{item['type']} {item['value']} {', '.join(item['sources'])}
" # Show extracted IOCs summary html += "

📊 Extracted Indicators

" for ioc_type, items in iocs.items(): if items: preview = items[:5] html += f"""
{ioc_type.upper()} ({len(items)} found)
{', '.join(preview)}{'...' if len(items) > 5 else ''}
""" html += """

💡 Next Steps

""" # Return IOCs as formatted JSON iocs_json = json.dumps(iocs, indent=2) return ioc_chart, iocs_json, html def get_threat_feed_stats() -> Tuple[go.Figure, str]: """Get current statistics from all loaded threat feeds. Shows the number of indicators loaded from each open-source threat intelligence feed. Returns: Feed statistics chart and detailed HTML report """ # Gather stats from all feeds stats = {} # URLhaus urlhaus = feed_manager.get_urlhaus_urls() stats["URLhaus URLs"] = len(urlhaus) # ThreatFox threatfox = feed_manager.get_threatfox_iocs() stats["ThreatFox IOCs"] = len(threatfox) # FeodoTracker feodo = feed_manager.get_feodo_ips() stats["Feodo Botnet IPs"] = len(feodo) # Spamhaus spamhaus = feed_manager.get_spamhaus_ips() stats["Spamhaus DROP Ranges"] = len(spamhaus) # Emerging Threats et = feed_manager.get_emergingthreats_ips() stats["Emerging Threats IPs"] = len(et) # OpenPhish phish = feed_manager.get_openphish_urls() stats["OpenPhish URLs"] = len(phish) # HIBP Breaches hibp = feed_manager.get_hibp_breaches() stats["Known Breaches"] = len(hibp) # Create bar chart fig = go.Figure(data=[go.Bar( x=list(stats.keys()), y=list(stats.values()), marker_color=['#00ffd5', '#22c55e', '#fbbf24', '#f97316', '#ef4444', '#8b5cf6', '#06b6d4'], text=list(stats.values()), textposition='outside' )]) fig.update_layout( title={'text': '📊 Loaded Threat Intelligence', 'font': {'color': '#00ffd5'}}, paper_bgcolor='#0a0f1a', plot_bgcolor='#111827', font={'color': '#e2e8f0'}, height=350, xaxis={'tickangle': 45, 'gridcolor': '#1e3a5f'}, yaxis={'gridcolor': '#1e3a5f'}, margin=dict(l=40, r=20, t=60, b=120) ) total_indicators = sum(stats.values()) # Build HTML html = f"""

📡 Threat Feed Status

Total Indicators Loaded
{total_indicators:,}

📋 Feed Details

""" for feed_name, count in stats.items(): status = "✅ Loaded" if count > 0 else "âš ī¸ Empty" status_color = "#22c55e" if count > 0 else "#f97316" html += f""" """ html += """
Feed Indicators Status
{feed_name} {count:,} {status}

â„šī¸ About These Feeds

Feeds refresh automatically every hour. All data is free and requires no API keys.

""" return fig, html # ============================================================================ # CUSTOM CSS # ============================================================================ custom_css = """ @import url('https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;500;600;700&family=Inter:wght@400;500;600&display=swap'); :root { --sw-bg-dark: #0a0f1a; --sw-bg-card: #111827; --sw-border: #1e3a5f; --sw-cyan: #00ffd5; } .gradio-container { background: var(--sw-bg-dark) !important; font-family: 'Inter', sans-serif !important; } .dark { --background-fill-primary: var(--sw-bg-dark) !important; --background-fill-secondary: var(--sw-bg-card) !important; --border-color-primary: var(--sw-border) !important; } h1, h2, h3, h4 { font-family: 'JetBrains Mono', monospace !important; } .tab-nav button { font-family: 'JetBrains Mono', monospace !important; font-size: 12px !important; letter-spacing: 1px !important; text-transform: uppercase !important; } .tab-nav button.selected { background: var(--sw-cyan) !important; color: var(--sw-bg-dark) !important; } input, textarea { font-family: 'JetBrains Mono', monospace !important; background: var(--sw-bg-dark) !important; border-color: var(--sw-border) !important; } input:focus, textarea:focus { border-color: var(--sw-cyan) !important; } .primary { background: linear-gradient(135deg, #00b396 0%, #007a6a 100%) !important; } """ # ============================================================================ # GRADIO INTERFACE # ============================================================================ with gr.Blocks( title="ShadowWatch | Open Source Threat Intel", theme=gr.themes.Base( primary_hue="cyan", secondary_hue="slate", neutral_hue="slate", ).set( body_background_fill="#0a0f1a", block_background_fill="#111827", block_border_width="1px", block_border_color="#1e3a5f", button_primary_background_fill="#00b396", ), css=custom_css ) as demo: # Header gr.HTML("""
đŸ›Ąī¸
SHADOWWATCH
Open Source Threat Intelligence â€ĸ No API Keys Required
● 100% FREE
● MCP ENABLED
""") with gr.Tabs(): # Indicator Scan Tab with gr.TabItem("đŸŽ¯ SCAN INDICATOR"): gr.Markdown("**Check an IP, domain, or URL against open-source threat feeds.**") with gr.Row(): indicator_input = gr.Textbox( label="Indicator", placeholder="Enter IP, domain, or URL...", scale=3 ) indicator_type = gr.Radio( choices=["auto", "ip", "domain", "url"], value="auto", label="Type", scale=1 ) scan_btn = gr.Button("🔍 SCAN", variant="primary", scale=1) with gr.Row(): risk_gauge = gr.Plot(label="Threat Score") sources_chart = gr.Plot(label="Detection Sources") scan_report = gr.HTML(label="Analysis Report") scan_btn.click( fn=scan_indicator, inputs=[indicator_input, indicator_type], outputs=[risk_gauge, sources_chart, scan_report] ) # IOC Extractor Tab with gr.TabItem("📋 IOC EXTRACTOR"): gr.Markdown("**Extract and analyze Indicators of Compromise from text, logs, or reports.**") text_input = gr.Textbox( label="Paste Text to Analyze", placeholder="Paste logs, reports, emails, or any text containing potential IOCs...", lines=8 ) extract_btn = gr.Button("đŸ”Ŧ EXTRACT & ANALYZE", variant="primary") with gr.Row(): ioc_chart = gr.Plot(label="IOC Distribution") ioc_json = gr.Code(label="Extracted IOCs (JSON)", language="json") ioc_report = gr.HTML(label="Analysis Report") extract_btn.click( fn=extract_and_analyze_iocs, inputs=[text_input], outputs=[ioc_chart, ioc_json, ioc_report] ) # Threat Feeds Tab with gr.TabItem("📡 THREAT FEEDS"): gr.Markdown("**View loaded threat intelligence from open-source feeds.**") refresh_btn = gr.Button("🔄 REFRESH FEEDS", variant="primary") feed_chart = gr.Plot(label="Feed Statistics") feed_report = gr.HTML(label="Feed Details") refresh_btn.click( fn=get_threat_feed_stats, inputs=[], outputs=[feed_chart, feed_report] ) # Load on page load demo.load(fn=get_threat_feed_stats, outputs=[feed_chart, feed_report]) # Footer gr.HTML("""
🔗 MCP: https://crypticallyrequie-shadowwatchv2.hf.space/gradio_api/mcp/sse
Built by Cogensec | ARGUS Platform
Data Sources: abuse.ch (URLhaus, ThreatFox, FeodoTracker, MalwareBazaar) â€ĸ Spamhaus â€ĸ Emerging Threats â€ĸ OpenPhish â€ĸ HIBP
""") if __name__ == "__main__": demo.launch(mcp_server=True)