ShadowWatchv2 / app.py
CrypticallyRequie's picture
Update app.py
56921f3 verified
"""
ShadowWatch v2 - Open Source Threat Intelligence Platform
100% Free - No API Keys Required
Uses public threat feeds and local analysis:
- abuse.ch (URLhaus, ThreatFox, FeodoTracker, MalwareBazaar)
- Spamhaus DROP lists
- Emerging Threats blocklists
- Direct DNS/WHOIS lookups
- IOC extraction and analysis
By Cogensec | ARGUS Platform
"""
import gradio as gr
import requests
import socket
import hashlib
import re
import json
import csv
import io
from datetime import datetime, timedelta
from typing import Optional, Tuple, List, Dict
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor, as_completed
import plotly.graph_objects as go
import plotly.express as px
# ============================================================================
# THREAT FEED MANAGER - Downloads and caches public feeds
# ============================================================================
class ThreatFeedManager:
"""Manages open-source threat intelligence feeds"""
# Feed URLs (all free, no auth required)
FEEDS = {
"urlhaus": "https://urlhaus.abuse.ch/downloads/csv_recent/",
"threatfox_iocs": "https://threatfox.abuse.ch/export/json/recent/",
"feodo_ipblocklist": "https://feodotracker.abuse.ch/downloads/ipblocklist.csv",
"malwarebazaar_recent": "https://bazaar.abuse.ch/export/csv/recent/",
"spamhaus_drop": "https://www.spamhaus.org/drop/drop.txt",
"spamhaus_edrop": "https://www.spamhaus.org/drop/edrop.txt",
"emergingthreats_compromised": "https://rules.emergingthreats.net/blockrules/compromised-ips.txt",
"openphish": "https://openphish.com/feed.txt",
"hibp_breaches": "https://haveibeenpwned.com/api/v3/breaches",
}
def __init__(self):
self.cache = {}
self.cache_time = {}
self.cache_duration = timedelta(hours=1) # Refresh every hour
def _is_cache_valid(self, feed_name: str) -> bool:
if feed_name not in self.cache_time:
return False
return datetime.now() - self.cache_time[feed_name] < self.cache_duration
def fetch_feed(self, feed_name: str) -> Optional[str]:
"""Fetch a threat feed, using cache if available"""
if self._is_cache_valid(feed_name):
return self.cache.get(feed_name)
url = self.FEEDS.get(feed_name)
if not url:
return None
try:
headers = {"User-Agent": "ShadowWatch-ARGUS/1.0"}
resp = requests.get(url, headers=headers, timeout=30)
if resp.status_code == 200:
self.cache[feed_name] = resp.text
self.cache_time[feed_name] = datetime.now()
return resp.text
except Exception as e:
print(f"Error fetching {feed_name}: {e}")
return self.cache.get(feed_name) # Return stale cache if fetch fails
def get_urlhaus_urls(self) -> List[Dict]:
"""Get recent malicious URLs from URLhaus"""
data = self.fetch_feed("urlhaus")
if not data:
return []
urls = []
reader = csv.reader(io.StringIO(data))
for row in reader:
if row and not row[0].startswith('#') and len(row) >= 8:
try:
urls.append({
"id": row[0],
"dateadded": row[1],
"url": row[2],
"url_status": row[3],
"threat": row[5],
"tags": row[6],
"host": row[7] if len(row) > 7 else ""
})
except:
continue
return urls
def get_threatfox_iocs(self) -> List[Dict]:
"""Get recent IOCs from ThreatFox"""
data = self.fetch_feed("threatfox_iocs")
if not data:
return []
try:
parsed = json.loads(data)
return parsed.get("data", []) if parsed.get("query_status") == "ok" else []
except:
return []
def get_feodo_ips(self) -> List[Dict]:
"""Get botnet C2 IPs from FeodoTracker"""
data = self.fetch_feed("feodo_ipblocklist")
if not data:
return []
ips = []
reader = csv.reader(io.StringIO(data))
for row in reader:
if row and not row[0].startswith('#') and len(row) >= 5:
try:
ips.append({
"first_seen": row[0],
"ip": row[1],
"port": row[2],
"status": row[3],
"malware": row[4]
})
except:
continue
return ips
def get_spamhaus_ips(self) -> set:
"""Get bad IP ranges from Spamhaus DROP"""
ips = set()
for feed in ["spamhaus_drop", "spamhaus_edrop"]:
data = self.fetch_feed(feed)
if data:
for line in data.split('\n'):
line = line.strip()
if line and not line.startswith(';'):
# Extract IP/CIDR before any semicolon
ip_part = line.split(';')[0].strip()
if ip_part:
ips.add(ip_part)
return ips
def get_emergingthreats_ips(self) -> set:
"""Get compromised IPs from Emerging Threats"""
data = self.fetch_feed("emergingthreats_compromised")
if not data:
return set()
ips = set()
for line in data.split('\n'):
line = line.strip()
if line and not line.startswith('#'):
ips.add(line)
return ips
def get_openphish_urls(self) -> set:
"""Get phishing URLs from OpenPhish"""
data = self.fetch_feed("openphish")
if not data:
return set()
return set(line.strip() for line in data.split('\n') if line.strip())
def get_hibp_breaches(self) -> List[Dict]:
"""Get public breach list from HIBP (no API key needed for breach list)"""
data = self.fetch_feed("hibp_breaches")
if not data:
return []
try:
return json.loads(data)
except:
return []
# Initialize feed manager
feed_manager = ThreatFeedManager()
# ============================================================================
# LOCAL ANALYSIS TOOLS
# ============================================================================
def dns_lookup(domain: str) -> Dict:
"""Perform DNS lookup without external APIs"""
results = {
"domain": domain,
"a_records": [],
"aaaa_records": [],
"mx_records": [],
"ns_records": [],
"error": None
}
try:
# A records (IPv4)
try:
results["a_records"] = list(set(socket.gethostbyname_ex(domain)[2]))
except:
pass
# Try to get additional info via getaddrinfo
try:
info = socket.getaddrinfo(domain, None)
for item in info:
ip = item[4][0]
if ':' in ip and ip not in results["aaaa_records"]:
results["aaaa_records"].append(ip)
elif '.' in ip and ip not in results["a_records"]:
results["a_records"].append(ip)
except:
pass
except socket.gaierror as e:
results["error"] = f"DNS lookup failed: {str(e)}"
except Exception as e:
results["error"] = str(e)
return results
def extract_iocs(text: str) -> Dict:
"""Extract Indicators of Compromise from text"""
patterns = {
"ipv4": r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b',
"ipv6": r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b',
"domain": r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b',
"url": r'https?://[^\s<>"{}|\\^`\[\]]+',
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"md5": r'\b[a-fA-F0-9]{32}\b',
"sha1": r'\b[a-fA-F0-9]{40}\b',
"sha256": r'\b[a-fA-F0-9]{64}\b',
"bitcoin": r'\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b',
"cve": r'CVE-\d{4}-\d{4,7}',
}
results = {}
for ioc_type, pattern in patterns.items():
matches = list(set(re.findall(pattern, text, re.IGNORECASE)))
# Filter out common false positives for domains
if ioc_type == "domain":
matches = [m for m in matches if not m.endswith('.png') and not m.endswith('.jpg')
and not m.endswith('.gif') and len(m) > 4]
results[ioc_type] = matches[:100] # Limit to 100 per type
return results
def calculate_hashes(text: str) -> Dict:
"""Calculate various hashes for given text/data"""
data = text.encode('utf-8')
return {
"md5": hashlib.md5(data).hexdigest(),
"sha1": hashlib.sha1(data).hexdigest(),
"sha256": hashlib.sha256(data).hexdigest(),
"sha512": hashlib.sha512(data).hexdigest(),
}
def check_ip_reputation(ip: str) -> Dict:
"""Check IP against local blocklists"""
results = {
"ip": ip,
"is_malicious": False,
"sources": [],
"risk_score": 0,
"details": []
}
# Check Feodo botnet IPs
feodo_ips = feed_manager.get_feodo_ips()
for entry in feodo_ips:
if entry.get("ip") == ip:
results["is_malicious"] = True
results["sources"].append("FeodoTracker")
results["risk_score"] += 40
results["details"].append({
"source": "FeodoTracker",
"malware": entry.get("malware", "Unknown"),
"first_seen": entry.get("first_seen", "Unknown"),
"status": entry.get("status", "Unknown")
})
break
# Check Spamhaus DROP
spamhaus_ranges = feed_manager.get_spamhaus_ips()
for ip_range in spamhaus_ranges:
if '/' in ip_range:
# CIDR check (simplified - just check if IP starts with network portion)
network = ip_range.split('/')[0]
network_parts = network.split('.')
ip_parts = ip.split('.')
if ip_parts[:len(network_parts)-1] == network_parts[:len(network_parts)-1]:
results["is_malicious"] = True
results["sources"].append("Spamhaus DROP")
results["risk_score"] += 35
results["details"].append({
"source": "Spamhaus DROP",
"matched_range": ip_range
})
break
elif ip == ip_range:
results["is_malicious"] = True
results["sources"].append("Spamhaus DROP")
results["risk_score"] += 35
break
# Check Emerging Threats
et_ips = feed_manager.get_emergingthreats_ips()
if ip in et_ips:
results["is_malicious"] = True
results["sources"].append("Emerging Threats")
results["risk_score"] += 30
results["details"].append({
"source": "Emerging Threats",
"category": "Compromised IP"
})
# Check ThreatFox IOCs
threatfox = feed_manager.get_threatfox_iocs()
for ioc in threatfox:
if ioc.get("ioc_type") == "ip:port" and ip in ioc.get("ioc", ""):
results["is_malicious"] = True
results["sources"].append("ThreatFox")
results["risk_score"] += 45
results["details"].append({
"source": "ThreatFox",
"malware": ioc.get("malware_printable", "Unknown"),
"threat_type": ioc.get("threat_type", "Unknown"),
"confidence": ioc.get("confidence_level", 0)
})
break
results["risk_score"] = min(results["risk_score"], 100)
return results
def check_url_reputation(url: str) -> Dict:
"""Check URL against local threat feeds"""
results = {
"url": url,
"is_malicious": False,
"sources": [],
"risk_score": 0,
"details": []
}
# Normalize URL
url_lower = url.lower().strip()
# Extract domain from URL
domain_match = re.search(r'https?://([^/]+)', url_lower)
domain = domain_match.group(1) if domain_match else ""
# Check URLhaus
urlhaus_urls = feed_manager.get_urlhaus_urls()
for entry in urlhaus_urls:
if url_lower == entry.get("url", "").lower() or domain == entry.get("host", "").lower():
results["is_malicious"] = True
results["sources"].append("URLhaus")
results["risk_score"] += 50
results["details"].append({
"source": "URLhaus",
"threat": entry.get("threat", "Unknown"),
"tags": entry.get("tags", ""),
"status": entry.get("url_status", "Unknown"),
"date_added": entry.get("dateadded", "Unknown")
})
break
# Check OpenPhish
phishing_urls = feed_manager.get_openphish_urls()
if url_lower in phishing_urls or any(url_lower.startswith(p) for p in phishing_urls):
results["is_malicious"] = True
results["sources"].append("OpenPhish")
results["risk_score"] += 45
results["details"].append({
"source": "OpenPhish",
"category": "Phishing"
})
# Check ThreatFox for URL IOCs
threatfox = feed_manager.get_threatfox_iocs()
for ioc in threatfox:
if ioc.get("ioc_type") == "url" and url_lower in ioc.get("ioc", "").lower():
results["is_malicious"] = True
results["sources"].append("ThreatFox")
results["risk_score"] += 50
results["details"].append({
"source": "ThreatFox",
"malware": ioc.get("malware_printable", "Unknown"),
"threat_type": ioc.get("threat_type", "Unknown")
})
break
results["risk_score"] = min(results["risk_score"], 100)
return results
def check_domain_reputation(domain: str) -> Dict:
"""Check domain against threat feeds and perform DNS analysis"""
results = {
"domain": domain,
"is_malicious": False,
"sources": [],
"risk_score": 0,
"dns": {},
"details": []
}
# DNS lookup
results["dns"] = dns_lookup(domain)
# Check if resolved IPs are malicious
for ip in results["dns"].get("a_records", []):
ip_rep = check_ip_reputation(ip)
if ip_rep["is_malicious"]:
results["is_malicious"] = True
results["risk_score"] += ip_rep["risk_score"] // 2
results["details"].append({
"source": "DNS Resolution",
"detail": f"Resolves to malicious IP: {ip}",
"ip_sources": ip_rep["sources"]
})
# Check URLhaus for domain
urlhaus_urls = feed_manager.get_urlhaus_urls()
for entry in urlhaus_urls:
if domain.lower() == entry.get("host", "").lower():
results["is_malicious"] = True
results["sources"].append("URLhaus")
results["risk_score"] += 40
results["details"].append({
"source": "URLhaus",
"threat": entry.get("threat", "Unknown"),
"tags": entry.get("tags", "")
})
break
# Check ThreatFox for domain IOCs
threatfox = feed_manager.get_threatfox_iocs()
for ioc in threatfox:
if ioc.get("ioc_type") == "domain" and domain.lower() == ioc.get("ioc", "").lower():
results["is_malicious"] = True
results["sources"].append("ThreatFox")
results["risk_score"] += 45
results["details"].append({
"source": "ThreatFox",
"malware": ioc.get("malware_printable", "Unknown"),
"threat_type": ioc.get("threat_type", "Unknown")
})
break
results["risk_score"] = min(results["risk_score"], 100)
return results
# ============================================================================
# VISUALIZATION HELPERS
# ============================================================================
def create_risk_gauge(score: int, title: str = "Risk Score") -> go.Figure:
"""Create a risk gauge chart"""
if score >= 75:
color = "#ef4444"
elif score >= 50:
color = "#f97316"
elif score >= 25:
color = "#fbbf24"
else:
color = "#22c55e"
fig = go.Figure(go.Indicator(
mode="gauge+number",
value=score,
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': title, 'font': {'size': 18, 'color': '#e2e8f0'}},
number={'font': {'size': 36, 'color': '#e2e8f0'}},
gauge={
'axis': {'range': [0, 100], 'tickcolor': "#1e3a5f"},
'bar': {'color': color},
'bgcolor': "#111827",
'borderwidth': 2,
'bordercolor': "#1e3a5f",
'steps': [
{'range': [0, 25], 'color': 'rgba(34, 197, 94, 0.15)'},
{'range': [25, 50], 'color': 'rgba(251, 191, 36, 0.15)'},
{'range': [50, 75], 'color': 'rgba(249, 115, 22, 0.15)'},
{'range': [75, 100], 'color': 'rgba(239, 68, 68, 0.15)'}
],
}
))
fig.update_layout(
paper_bgcolor='#0a0f1a',
plot_bgcolor='#0a0f1a',
height=280,
margin=dict(l=20, r=20, t=50, b=20)
)
return fig
def create_ioc_chart(iocs: Dict) -> go.Figure:
"""Create a bar chart of extracted IOCs"""
types = []
counts = []
for ioc_type, items in iocs.items():
if items:
types.append(ioc_type.upper())
counts.append(len(items))
if not types:
fig = go.Figure()
fig.add_annotation(text="No IOCs extracted", x=0.5, y=0.5, showarrow=False,
font=dict(size=16, color="#64748b"), xref="paper", yref="paper")
fig.update_layout(paper_bgcolor='#0a0f1a', plot_bgcolor='#0a0f1a', height=280)
return fig
colors = ['#00ffd5' if c > 0 else '#64748b' for c in counts]
fig = go.Figure(data=[go.Bar(
x=types,
y=counts,
marker_color=colors,
text=counts,
textposition='outside'
)])
fig.update_layout(
title={'text': 'πŸ“Š Extracted IOCs', 'font': {'color': '#00ffd5'}},
paper_bgcolor='#0a0f1a',
plot_bgcolor='#111827',
font={'color': '#e2e8f0'},
height=280,
xaxis={'gridcolor': '#1e3a5f'},
yaxis={'gridcolor': '#1e3a5f'},
margin=dict(l=40, r=20, t=50, b=40)
)
return fig
def create_threat_sources_chart(sources: List[str]) -> go.Figure:
"""Create a pie chart of threat sources"""
if not sources:
fig = go.Figure()
fig.add_annotation(text="βœ“ No threats detected", x=0.5, y=0.5, showarrow=False,
font=dict(size=18, color="#22c55e"), xref="paper", yref="paper")
fig.update_layout(paper_bgcolor='#0a0f1a', plot_bgcolor='#0a0f1a', height=280)
return fig
from collections import Counter
source_counts = Counter(sources)
colors = ['#ef4444', '#f97316', '#fbbf24', '#00ffd5', '#8b5cf6']
fig = go.Figure(data=[go.Pie(
labels=list(source_counts.keys()),
values=list(source_counts.values()),
hole=0.5,
marker_colors=colors[:len(source_counts)],
textinfo='label+value',
textfont={'color': '#e2e8f0'}
)])
fig.update_layout(
title={'text': '⚠️ Threat Sources', 'font': {'color': '#ef4444'}},
paper_bgcolor='#0a0f1a',
plot_bgcolor='#0a0f1a',
font={'color': '#e2e8f0'},
height=280,
showlegend=False,
margin=dict(l=20, r=20, t=50, b=20)
)
return fig
# ============================================================================
# MCP TOOLS
# ============================================================================
def scan_indicator(indicator: str, indicator_type: str = "auto") -> Tuple[go.Figure, go.Figure, str]:
"""Scan an IP address, domain, or URL against open-source threat feeds.
Checks the indicator against URLhaus, ThreatFox, FeodoTracker,
Spamhaus DROP, Emerging Threats, and OpenPhish databases.
Args:
indicator: The IP, domain, or URL to scan
indicator_type: "auto", "ip", "domain", or "url"
Returns:
Risk gauge, threat sources chart, and detailed HTML report
"""
if not indicator:
empty_fig = go.Figure()
empty_fig.update_layout(paper_bgcolor='#0a0f1a', height=280)
return empty_fig, empty_fig, "<p style='color: #ef4444;'>Please enter an indicator to scan</p>"
indicator = indicator.strip()
# Auto-detect type
if indicator_type == "auto":
if re.match(r'^https?://', indicator):
indicator_type = "url"
elif re.match(r'^(\d{1,3}\.){3}\d{1,3}$', indicator):
indicator_type = "ip"
else:
indicator_type = "domain"
# Perform check based on type
if indicator_type == "ip":
result = check_ip_reputation(indicator)
elif indicator_type == "url":
result = check_url_reputation(indicator)
else:
result = check_domain_reputation(indicator)
# Create visualizations
gauge_fig = create_risk_gauge(result["risk_score"], "Threat Score")
sources_fig = create_threat_sources_chart(result["sources"])
# Build HTML report
status_color = "#ef4444" if result["is_malicious"] else "#22c55e"
status_text = "⚠️ MALICIOUS" if result["is_malicious"] else "βœ“ CLEAN"
html = f"""
<div style="font-family: 'JetBrains Mono', monospace; color: #e2e8f0;">
<h3 style="color: #00ffd5; border-bottom: 1px solid #1e3a5f; padding-bottom: 10px;">
πŸ” Threat Intelligence Report
</h3>
<div style="display: grid; grid-template-columns: repeat(3, 1fr); gap: 16px; margin: 20px 0;">
<div style="background: #111827; padding: 16px; border-radius: 8px; border: 1px solid #1e3a5f;">
<div style="color: #64748b; font-size: 11px; text-transform: uppercase;">Indicator</div>
<div style="font-size: 14px; color: #00ffd5; word-break: break-all;">{indicator}</div>
</div>
<div style="background: #111827; padding: 16px; border-radius: 8px; border: 1px solid #1e3a5f;">
<div style="color: #64748b; font-size: 11px; text-transform: uppercase;">Type</div>
<div style="font-size: 18px; color: #e2e8f0;">{indicator_type.upper()}</div>
</div>
<div style="background: #111827; padding: 16px; border-radius: 8px; border: 1px solid #1e3a5f;">
<div style="color: #64748b; font-size: 11px; text-transform: uppercase;">Status</div>
<div style="font-size: 18px; color: {status_color};">{status_text}</div>
</div>
</div>
"""
# DNS info for domains
if indicator_type == "domain" and "dns" in result:
dns = result["dns"]
a_records = ", ".join(dns.get("a_records", [])) or "None"
html += f"""
<h4 style="color: #fbbf24; margin-top: 24px;">🌐 DNS Resolution</h4>
<div style="background: #111827; padding: 16px; border-radius: 8px; margin-top: 12px;">
<p><span style="color: #64748b;">A Records:</span> <span style="color: #00ffd5;">{a_records}</span></p>
</div>
"""
# Threat details
if result["details"]:
html += """
<h4 style="color: #ef4444; margin-top: 24px;">🚨 Threat Details</h4>
<table style="width: 100%; border-collapse: collapse; margin-top: 12px;">
<tr style="background: #1e3a5f;">
<th style="padding: 12px; text-align: left; color: #00ffd5;">Source</th>
<th style="padding: 12px; text-align: left; color: #00ffd5;">Details</th>
</tr>
"""
for detail in result["details"]:
source = detail.get("source", "Unknown")
info_parts = [f"{k}: {v}" for k, v in detail.items() if k != "source" and v]
info = " | ".join(info_parts)
html += f"""
<tr style="border-bottom: 1px solid #1e3a5f;">
<td style="padding: 12px; color: #f97316;">{source}</td>
<td style="padding: 12px; color: #94a3b8; font-size: 12px;">{info}</td>
</tr>
"""
html += "</table>"
# Recommendations
html += f"""
<div style="margin-top: 24px; padding: 16px; background: #111827; border-radius: 8px; border-left: 4px solid {'#ef4444' if result['is_malicious'] else '#22c55e'};">
<h4 style="color: {'#ef4444' if result['is_malicious'] else '#22c55e'}; margin: 0 0 12px 0;">
{'🚫 Recommended Actions' if result['is_malicious'] else 'βœ“ Assessment'}
</h4>
<ul style="color: #94a3b8; margin: 0; padding-left: 20px;">
"""
if result["is_malicious"]:
html += """
<li>Block this indicator in your firewall/proxy</li>
<li>Search logs for any connections to this indicator</li>
<li>If found, isolate affected systems</li>
<li>Report to your security team</li>
"""
else:
html += """
<li>No threats detected in current threat feeds</li>
<li>Continue monitoring for changes</li>
<li>Note: Absence of threat data doesn't guarantee safety</li>
"""
html += """
</ul>
</div>
<p style="color: #64748b; font-size: 11px; margin-top: 16px;">
Data sources: URLhaus, ThreatFox, FeodoTracker, Spamhaus DROP, Emerging Threats, OpenPhish
</p>
</div>
"""
return gauge_fig, sources_fig, html
def extract_and_analyze_iocs(text: str) -> Tuple[go.Figure, str, str]:
"""Extract and analyze Indicators of Compromise from text.
Extracts IPs, domains, URLs, email addresses, file hashes,
Bitcoin addresses, and CVEs from the provided text, then
checks extracted indicators against threat feeds.
Args:
text: Text containing potential IOCs (logs, reports, emails, etc.)
Returns:
IOC distribution chart, extracted IOCs as JSON, and analysis report HTML
"""
if not text or len(text.strip()) < 5:
empty_fig = go.Figure()
empty_fig.update_layout(paper_bgcolor='#0a0f1a', height=280)
return empty_fig, "{}", "<p style='color: #ef4444;'>Please enter text to analyze</p>"
# Extract IOCs
iocs = extract_iocs(text)
# Create visualization
ioc_chart = create_ioc_chart(iocs)
# Count totals
total_iocs = sum(len(v) for v in iocs.values())
# Check some IOCs against threat feeds (limit to avoid overload)
malicious_found = []
# Check IPs (limit to first 10)
for ip in iocs.get("ipv4", [])[:10]:
result = check_ip_reputation(ip)
if result["is_malicious"]:
malicious_found.append({"type": "IP", "value": ip, "sources": result["sources"]})
# Check domains (limit to first 10)
for domain in iocs.get("domain", [])[:10]:
result = check_domain_reputation(domain)
if result["is_malicious"]:
malicious_found.append({"type": "Domain", "value": domain, "sources": result["sources"]})
# Check URLs (limit to first 5)
for url in iocs.get("url", [])[:5]:
result = check_url_reputation(url)
if result["is_malicious"]:
malicious_found.append({"type": "URL", "value": url, "sources": result["sources"]})
# Build HTML report
html = f"""
<div style="font-family: 'JetBrains Mono', monospace; color: #e2e8f0;">
<h3 style="color: #00ffd5; border-bottom: 1px solid #1e3a5f; padding-bottom: 10px;">
πŸ“‹ IOC Extraction Report
</h3>
<div style="display: grid; grid-template-columns: repeat(4, 1fr); gap: 16px; margin: 20px 0;">
<div style="background: #111827; padding: 16px; border-radius: 8px; border: 1px solid #1e3a5f;">
<div style="color: #64748b; font-size: 11px; text-transform: uppercase;">Total IOCs</div>
<div style="font-size: 28px; color: #00ffd5;">{total_iocs}</div>
</div>
<div style="background: #111827; padding: 16px; border-radius: 8px; border: 1px solid #1e3a5f;">
<div style="color: #64748b; font-size: 11px; text-transform: uppercase;">IPs Found</div>
<div style="font-size: 28px; color: #e2e8f0;">{len(iocs.get('ipv4', []))}</div>
</div>
<div style="background: #111827; padding: 16px; border-radius: 8px; border: 1px solid #1e3a5f;">
<div style="color: #64748b; font-size: 11px; text-transform: uppercase;">Domains</div>
<div style="font-size: 28px; color: #e2e8f0;">{len(iocs.get('domain', []))}</div>
</div>
<div style="background: #111827; padding: 16px; border-radius: 8px; border: 1px solid #1e3a5f;">
<div style="color: #64748b; font-size: 11px; text-transform: uppercase;">Malicious</div>
<div style="font-size: 28px; color: {'#ef4444' if malicious_found else '#22c55e'};">{len(malicious_found)}</div>
</div>
</div>
"""
# Show malicious indicators
if malicious_found:
html += """
<h4 style="color: #ef4444; margin-top: 24px;">🚨 Malicious Indicators Detected</h4>
<table style="width: 100%; border-collapse: collapse; margin-top: 12px;">
<tr style="background: #1e3a5f;">
<th style="padding: 12px; text-align: left; color: #00ffd5;">Type</th>
<th style="padding: 12px; text-align: left; color: #00ffd5;">Indicator</th>
<th style="padding: 12px; text-align: left; color: #00ffd5;">Sources</th>
</tr>
"""
for item in malicious_found:
html += f"""
<tr style="border-bottom: 1px solid #1e3a5f;">
<td style="padding: 12px; color: #f97316;">{item['type']}</td>
<td style="padding: 12px; color: #ef4444; font-size: 12px; word-break: break-all;">{item['value']}</td>
<td style="padding: 12px; color: #94a3b8;">{', '.join(item['sources'])}</td>
</tr>
"""
html += "</table>"
# Show extracted IOCs summary
html += "<h4 style='color: #fbbf24; margin-top: 24px;'>πŸ“Š Extracted Indicators</h4>"
for ioc_type, items in iocs.items():
if items:
preview = items[:5]
html += f"""
<div style="background: #111827; padding: 12px 16px; border-radius: 6px; margin-top: 8px; border-left: 3px solid #00ffd5;">
<span style="color: #00ffd5; font-weight: 600;">{ioc_type.upper()}</span>
<span style="color: #64748b; margin-left: 8px;">({len(items)} found)</span>
<div style="color: #94a3b8; font-size: 12px; margin-top: 8px; word-break: break-all;">
{', '.join(preview)}{'...' if len(items) > 5 else ''}
</div>
</div>
"""
html += """
<div style="margin-top: 24px; padding: 16px; background: #111827; border-radius: 8px; border-left: 4px solid #00ffd5;">
<h4 style="color: #00ffd5; margin: 0 0 12px 0;">πŸ’‘ Next Steps</h4>
<ul style="color: #94a3b8; margin: 0; padding-left: 20px;">
<li>Export IOCs for import into your SIEM/SOAR</li>
<li>Block malicious indicators at network perimeter</li>
<li>Search historical logs for these indicators</li>
<li>Share findings with threat intel sharing groups</li>
</ul>
</div>
</div>
"""
# Return IOCs as formatted JSON
iocs_json = json.dumps(iocs, indent=2)
return ioc_chart, iocs_json, html
def get_threat_feed_stats() -> Tuple[go.Figure, str]:
"""Get current statistics from all loaded threat feeds.
Shows the number of indicators loaded from each open-source
threat intelligence feed.
Returns:
Feed statistics chart and detailed HTML report
"""
# Gather stats from all feeds
stats = {}
# URLhaus
urlhaus = feed_manager.get_urlhaus_urls()
stats["URLhaus URLs"] = len(urlhaus)
# ThreatFox
threatfox = feed_manager.get_threatfox_iocs()
stats["ThreatFox IOCs"] = len(threatfox)
# FeodoTracker
feodo = feed_manager.get_feodo_ips()
stats["Feodo Botnet IPs"] = len(feodo)
# Spamhaus
spamhaus = feed_manager.get_spamhaus_ips()
stats["Spamhaus DROP Ranges"] = len(spamhaus)
# Emerging Threats
et = feed_manager.get_emergingthreats_ips()
stats["Emerging Threats IPs"] = len(et)
# OpenPhish
phish = feed_manager.get_openphish_urls()
stats["OpenPhish URLs"] = len(phish)
# HIBP Breaches
hibp = feed_manager.get_hibp_breaches()
stats["Known Breaches"] = len(hibp)
# Create bar chart
fig = go.Figure(data=[go.Bar(
x=list(stats.keys()),
y=list(stats.values()),
marker_color=['#00ffd5', '#22c55e', '#fbbf24', '#f97316', '#ef4444', '#8b5cf6', '#06b6d4'],
text=list(stats.values()),
textposition='outside'
)])
fig.update_layout(
title={'text': 'πŸ“Š Loaded Threat Intelligence', 'font': {'color': '#00ffd5'}},
paper_bgcolor='#0a0f1a',
plot_bgcolor='#111827',
font={'color': '#e2e8f0'},
height=350,
xaxis={'tickangle': 45, 'gridcolor': '#1e3a5f'},
yaxis={'gridcolor': '#1e3a5f'},
margin=dict(l=40, r=20, t=60, b=120)
)
total_indicators = sum(stats.values())
# Build HTML
html = f"""
<div style="font-family: 'JetBrains Mono', monospace; color: #e2e8f0;">
<h3 style="color: #00ffd5; border-bottom: 1px solid #1e3a5f; padding-bottom: 10px;">
πŸ“‘ Threat Feed Status
</h3>
<div style="background: #111827; padding: 20px; border-radius: 8px; margin: 20px 0; text-align: center;">
<div style="color: #64748b; font-size: 12px; text-transform: uppercase;">Total Indicators Loaded</div>
<div style="font-size: 48px; color: #00ffd5; font-weight: bold;">{total_indicators:,}</div>
</div>
<h4 style="color: #fbbf24; margin-top: 24px;">πŸ“‹ Feed Details</h4>
<table style="width: 100%; border-collapse: collapse; margin-top: 12px;">
<tr style="background: #1e3a5f;">
<th style="padding: 12px; text-align: left; color: #00ffd5;">Feed</th>
<th style="padding: 12px; text-align: left; color: #00ffd5;">Indicators</th>
<th style="padding: 12px; text-align: left; color: #00ffd5;">Status</th>
</tr>
"""
for feed_name, count in stats.items():
status = "βœ… Loaded" if count > 0 else "⚠️ Empty"
status_color = "#22c55e" if count > 0 else "#f97316"
html += f"""
<tr style="border-bottom: 1px solid #1e3a5f;">
<td style="padding: 12px; color: #e2e8f0;">{feed_name}</td>
<td style="padding: 12px; color: #00ffd5;">{count:,}</td>
<td style="padding: 12px; color: {status_color};">{status}</td>
</tr>
"""
html += """
</table>
<div style="margin-top: 24px; padding: 16px; background: #111827; border-radius: 8px; border-left: 4px solid #00ffd5;">
<h4 style="color: #00ffd5; margin: 0 0 12px 0;">ℹ️ About These Feeds</h4>
<ul style="color: #94a3b8; margin: 0; padding-left: 20px; font-size: 13px;">
<li><b>URLhaus:</b> Malicious URLs used for malware distribution (abuse.ch)</li>
<li><b>ThreatFox:</b> IOCs from malware samples and campaigns (abuse.ch)</li>
<li><b>FeodoTracker:</b> Botnet C2 servers (Emotet, Dridex, etc.)</li>
<li><b>Spamhaus DROP:</b> IPs controlled by spammers/criminals</li>
<li><b>Emerging Threats:</b> Compromised IPs from ProofPoint</li>
<li><b>OpenPhish:</b> Active phishing URLs</li>
<li><b>HIBP Breaches:</b> Public breach database metadata</li>
</ul>
<p style="color: #64748b; font-size: 11px; margin-top: 12px;">
Feeds refresh automatically every hour. All data is free and requires no API keys.
</p>
</div>
</div>
"""
return fig, html
# ============================================================================
# CUSTOM CSS
# ============================================================================
custom_css = """
@import url('https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;500;600;700&family=Inter:wght@400;500;600&display=swap');
:root {
--sw-bg-dark: #0a0f1a;
--sw-bg-card: #111827;
--sw-border: #1e3a5f;
--sw-cyan: #00ffd5;
}
.gradio-container {
background: var(--sw-bg-dark) !important;
font-family: 'Inter', sans-serif !important;
}
.dark {
--background-fill-primary: var(--sw-bg-dark) !important;
--background-fill-secondary: var(--sw-bg-card) !important;
--border-color-primary: var(--sw-border) !important;
}
h1, h2, h3, h4 { font-family: 'JetBrains Mono', monospace !important; }
.tab-nav button {
font-family: 'JetBrains Mono', monospace !important;
font-size: 12px !important;
letter-spacing: 1px !important;
text-transform: uppercase !important;
}
.tab-nav button.selected {
background: var(--sw-cyan) !important;
color: var(--sw-bg-dark) !important;
}
input, textarea {
font-family: 'JetBrains Mono', monospace !important;
background: var(--sw-bg-dark) !important;
border-color: var(--sw-border) !important;
}
input:focus, textarea:focus { border-color: var(--sw-cyan) !important; }
.primary { background: linear-gradient(135deg, #00b396 0%, #007a6a 100%) !important; }
"""
# ============================================================================
# GRADIO INTERFACE
# ============================================================================
with gr.Blocks(
title="ShadowWatch | Open Source Threat Intel",
theme=gr.themes.Base(
primary_hue="cyan",
secondary_hue="slate",
neutral_hue="slate",
).set(
body_background_fill="#0a0f1a",
block_background_fill="#111827",
block_border_width="1px",
block_border_color="#1e3a5f",
button_primary_background_fill="#00b396",
),
css=custom_css
) as demo:
# Header
gr.HTML("""
<div style="background: linear-gradient(90deg, #111827 0%, #0d1520 100%); border: 1px solid #1e3a5f; border-radius: 8px; padding: 20px 24px; margin-bottom: 20px; display: flex; justify-content: space-between; align-items: center;">
<div style="display: flex; align-items: center; gap: 16px;">
<div style="font-size: 40px;">πŸ›‘οΈ</div>
<div>
<div style="font-family: 'JetBrains Mono', monospace; font-size: 28px; font-weight: 700; color: white; letter-spacing: 3px;">SHADOWWATCH</div>
<div style="font-size: 12px; color: #64748b; letter-spacing: 2px; text-transform: uppercase;">Open Source Threat Intelligence β€’ No API Keys Required</div>
</div>
</div>
<div style="display: flex; gap: 20px; align-items: center; font-family: 'JetBrains Mono', monospace; font-size: 11px;">
<div style="color: #22c55e;">● 100% FREE</div>
<div style="color: #00ffd5;">● MCP ENABLED</div>
</div>
</div>
""")
with gr.Tabs():
# Indicator Scan Tab
with gr.TabItem("🎯 SCAN INDICATOR"):
gr.Markdown("**Check an IP, domain, or URL against open-source threat feeds.**")
with gr.Row():
indicator_input = gr.Textbox(
label="Indicator",
placeholder="Enter IP, domain, or URL...",
scale=3
)
indicator_type = gr.Radio(
choices=["auto", "ip", "domain", "url"],
value="auto",
label="Type",
scale=1
)
scan_btn = gr.Button("πŸ” SCAN", variant="primary", scale=1)
with gr.Row():
risk_gauge = gr.Plot(label="Threat Score")
sources_chart = gr.Plot(label="Detection Sources")
scan_report = gr.HTML(label="Analysis Report")
scan_btn.click(
fn=scan_indicator,
inputs=[indicator_input, indicator_type],
outputs=[risk_gauge, sources_chart, scan_report]
)
# IOC Extractor Tab
with gr.TabItem("πŸ“‹ IOC EXTRACTOR"):
gr.Markdown("**Extract and analyze Indicators of Compromise from text, logs, or reports.**")
text_input = gr.Textbox(
label="Paste Text to Analyze",
placeholder="Paste logs, reports, emails, or any text containing potential IOCs...",
lines=8
)
extract_btn = gr.Button("πŸ”¬ EXTRACT & ANALYZE", variant="primary")
with gr.Row():
ioc_chart = gr.Plot(label="IOC Distribution")
ioc_json = gr.Code(label="Extracted IOCs (JSON)", language="json")
ioc_report = gr.HTML(label="Analysis Report")
extract_btn.click(
fn=extract_and_analyze_iocs,
inputs=[text_input],
outputs=[ioc_chart, ioc_json, ioc_report]
)
# Threat Feeds Tab
with gr.TabItem("πŸ“‘ THREAT FEEDS"):
gr.Markdown("**View loaded threat intelligence from open-source feeds.**")
refresh_btn = gr.Button("πŸ”„ REFRESH FEEDS", variant="primary")
feed_chart = gr.Plot(label="Feed Statistics")
feed_report = gr.HTML(label="Feed Details")
refresh_btn.click(
fn=get_threat_feed_stats,
inputs=[],
outputs=[feed_chart, feed_report]
)
# Load on page load
demo.load(fn=get_threat_feed_stats, outputs=[feed_chart, feed_report])
# Footer
gr.HTML("""
<div style="background: #111827; border: 1px solid #1e3a5f; border-radius: 8px; padding: 16px 24px; margin-top: 20px; font-family: 'JetBrains Mono', monospace; font-size: 11px; color: #64748b;">
<div style="display: flex; justify-content: space-between; align-items: center;">
<div>
<span>πŸ”— MCP:</span>
<span style="color: #00ffd5; margin-left: 8px;">https://crypticallyrequie-shadowwatchv2.hf.space/gradio_api/mcp/sse</span>
</div>
<div>
Built by <span style="color: #00ffd5;">Cogensec</span> | ARGUS Platform
</div>
</div>
<div style="margin-top: 12px; padding-top: 12px; border-top: 1px solid #1e3a5f;">
Data Sources: abuse.ch (URLhaus, ThreatFox, FeodoTracker, MalwareBazaar) β€’ Spamhaus β€’ Emerging Threats β€’ OpenPhish β€’ HIBP
</div>
</div>
""")
if __name__ == "__main__":
demo.launch(mcp_server=True)