import re from sentence_transformers import SentenceTransformer import numpy as np from sklearn.metrics.pairwise import cosine_similarity from urllib.parse import urlparse from difflib import SequenceMatcher import os import pickle class ExternalAnalysisAgent: def __init__(self): print("Loading External Analysis Agent...") self.model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') # Load pickle models for URL analysis model_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'models') try: with open(os.path.join(model_dir, 'phishing_new.pkl'), 'rb') as f: self.url_ml_model = pickle.load(f) with open(os.path.join(model_dir, 'vectorizerurl_new.pkl'), 'rb') as f: self.url_vectorizer = pickle.load(f) self.has_url_ml = True print("Successfully loaded URL ML models.") except Exception as e: print(f"Failed to load URL ML models: {e}") self.has_url_ml = False self.phishing_patterns = [ "verify your account immediately", "suspicious activity detected", "click here to confirm", "your account will be suspended", "update your payment information", "unusual sign-in attempt", "secure your account now", "limited time offer", "you have won a prize", "inheritance money transfer" ] self.suspicious_tlds = ['.xyz', '.top', '.club', '.online', '.site', '.win', '.bid'] self.legitimate_domains = ['google.com', 'microsoft.com', 'amazon.com', 'paypal.com', 'apple.com'] self.pattern_embeddings = self.model.encode(self.phishing_patterns) print("External Analysis Agent loaded successfully!") def analyze_url_risk(self, url): """Analyze URL for suspicious patterns""" risk_score = 0.0 reasons = [] for tld in self.suspicious_tlds: if url.lower().endswith(tld) or tld in url.lower(): risk_score += 0.3 reasons.append(f"Suspicious TLD: {tld}") break if re.search(r'\d+\.\d+\.\d+\.\d+', url): risk_score += 0.4 reasons.append("IP address used instead of domain name") if url.count('.') > 3: risk_score += 0.2 reasons.append("Excessive subdomains") shortening_services = ['bit.ly', 'tinyurl', 'goo.gl', 'ow.ly', 'tiny.cc'] for service in shortening_services: if service in url.lower(): risk_score += 0.3 reasons.append(f"URL shortening service detected: {service}") break suspicious_keywords = ['login', 'signin', 'verify', 'account', 'secure', 'update', 'confirm'] for keyword in suspicious_keywords: if keyword in url.lower(): risk_score += 0.1 reasons.append(f"Suspicious keyword in URL: '{keyword}'") break domain_similarity = self.check_domain_similarity(url) if domain_similarity > 0.7: risk_score += 0.3 reasons.append("Domain similar to legitimate brand") url_ml_prob = 0.0 if self.has_url_ml: try: features = self.url_vectorizer.transform([url]) # phishing.pkl is LogisticRegression url_ml_prob = self.url_ml_model.predict_proba(features)[0][1] # Hybrid Logic: Weight the ML model heavily if it has high confidence if url_ml_prob > 0.8: risk_score = max(risk_score, 0.9) reasons.append(f"ML model identified highly malicious URL structure (Score: {url_ml_prob:.1%})") elif url_ml_prob > 0.5: risk_score = max(risk_score, 0.6) reasons.append(f"ML model flagged suspicious URL structure (Score: {url_ml_prob:.1%})") except Exception as e: print(f"Error predicting URL with ML model: {e}") return min(risk_score, 1.0), reasons, url_ml_prob def check_domain_similarity(self, url): """Check if domain is similar to legitimate domains""" domain = self.extract_domain(url) max_similarity = 0.0 for legit_domain in self.legitimate_domains: similarity = SequenceMatcher(None, domain.lower(), legit_domain).ratio() max_similarity = max(max_similarity, similarity) return max_similarity def extract_domain(self, url): """Extract domain from URL""" parsed = urlparse(url) domain = parsed.netloc or parsed.path.split('/')[0] return domain def analyze(self, input_data): """Main analysis function""" text = input_data['cleaned_text'] urls = input_data['urls'] results = { 'url_risk': 0.0, 'url_ml_risk': 0.0, 'domain_similarity': 0.0, 'suspicious_patterns': [], 'risk_factors': [], 'overall_risk': 0.0 } if urls: url_risks = [] url_ml_risks = [] for url in urls: risk, reasons, ml_prob = self.analyze_url_risk(url) url_risks.append(risk) url_ml_risks.append(ml_prob) results['risk_factors'].extend(reasons) results['url_risk'] = np.mean(url_risks) if url_risks else 0 results['url_ml_risk'] = max(url_ml_risks) if url_ml_risks else 0 results['domain_similarity'] = self.check_domain_similarity(urls[0]) try: text_embedding = self.model.encode([text]) similarities = cosine_similarity(text_embedding, self.pattern_embeddings)[0] if max(similarities) > 0.6: results['suspicious_patterns'].append("Text similar to known phishing patterns") results['overall_risk'] += 0.3 except Exception as e: print(f"Error in semantic similarity: {e}") results['overall_risk'] = min( results['url_risk'] * 0.6 + results['domain_similarity'] * 0.4 + len(results['suspicious_patterns']) * 0.1, 1.0 ) return results