# -*- coding: utf-8 -*- """ Verification System Module - SysCRED v2.0 ========================================== Main credibility verification system with real API integration. Refactored from sys-cred-Python-27avril2025.py (c) Dominique S. Loyer - PhD Thesis Prototype Citation Key: loyerModelingHybridSystem2025 """ import re import json import datetime from typing import Optional, Dict, Any, List from urllib.parse import urlparse # Transformers and ML try: from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification import numpy as np import torch from lime.lime_text import LimeTextExplainer HAS_ML = True except ImportError: HAS_ML = False print("Warning: ML libraries not fully installed. Run: pip install transformers torch lime numpy") try: from sentence_transformers import SentenceTransformer, util HAS_SBERT = True except ImportError: HAS_SBERT = False print("Warning: sentence-transformers not installed. Semantic coherence will use heuristics.") # Local imports - Support both syscred.module and relative imports try: from syscred.api_clients import ExternalAPIClients, WebContent, ExternalData from syscred.ontology_manager import OntologyManager from syscred.seo_analyzer import SEOAnalyzer from syscred.graph_rag import GraphRAG from syscred.trec_retriever import TRECRetriever, Evidence, RetrievalResult from syscred import config except ImportError: from api_clients import ExternalAPIClients, WebContent, ExternalData from ontology_manager import OntologyManager from seo_analyzer import SEOAnalyzer from graph_rag import GraphRAG from trec_retriever import TRECRetriever, Evidence, RetrievalResult import config # [NER + E-E-A-T] Imports optionnels - n'interferent pas avec les imports principaux HAS_NER_EEAT = False try: from syscred.ner_analyzer import NERAnalyzer from syscred.eeat_calculator import EEATCalculator, EEATScore HAS_NER_EEAT = True except ImportError: try: from ner_analyzer import NERAnalyzer from eeat_calculator import EEATCalculator, EEATScore HAS_NER_EEAT = True except ImportError: pass class CredibilityVerificationSystem: """ Système neuro-symbolique de vérification de crédibilité. Combine: - Analyse basée sur des règles (symbolique, transparent) - Analyse NLP/IA (apprentissage automatique) - Ontologie OWL pour la traçabilité - APIs externes pour les données réelles """ def __init__( self, google_api_key: Optional[str] = None, ontology_base_path: Optional[str] = None, ontology_data_path: Optional[str] = None, load_ml_models: bool = True ): """ Initialize the credibility verification system. Args: google_api_key: API key for Google Fact Check (optional) ontology_base_path: Path to base ontology TTL file ontology_data_path: Path to store accumulated data load_ml_models: Whether to load ML models (disable for testing) """ print("[SysCRED] Initializing Credibility Verification System v2.0...") # Initialize API clients self.api_clients = ExternalAPIClients(google_api_key=google_api_key) print("[SysCRED] API clients initialized") # Initialize ontology manager self.ontology_manager = None if ontology_base_path or ontology_data_path: try: self.ontology_manager = OntologyManager( base_ontology_path=ontology_base_path, data_path=ontology_data_path ) self.graph_rag = GraphRAG(self.ontology_manager) # [NEW] Init GraphRAG print("[SysCRED] Ontology manager & GraphRAG initialized") except Exception as e: print(f"[SysCRED] Ontology manager disabled: {e}") self.graph_rag = None else: self.graph_rag = None # [NEW] Initialize TREC Retriever for evidence gathering self.trec_retriever = None try: self.trec_retriever = TRECRetriever( index_path=config.Config.TREC_INDEX_PATH, corpus_path=config.Config.TREC_CORPUS_PATH, use_stemming=True, enable_prf=config.Config.ENABLE_PRF, prf_top_docs=config.Config.PRF_TOP_DOCS, prf_expansion_terms=config.Config.PRF_EXPANSION_TERMS ) print("[SysCRED] TREC Retriever initialized for evidence gathering") except Exception as e: print(f"[SysCRED] TREC Retriever disabled: {e}") # Initialize ML models self.sentiment_pipeline = None self.ner_pipeline = None self.bias_tokenizer = None self.bias_model = None self.coherence_model = None self.explainer = None if load_ml_models and HAS_ML: self._load_ml_models() # Weights for score calculation (configurable) # Weights for score calculation (Loaded from Config) self.weights = config.Config.SCORE_WEIGHTS print(f"[SysCRED] Using weights: {self.weights}") # [NER + E-E-A-T] Initialize analyzers self.ner_analyzer = None self.eeat_calculator = None if HAS_NER_EEAT: try: self.ner_analyzer = NERAnalyzer() self.eeat_calculator = EEATCalculator() print("[SysCRED] NER analyzer initialized") print("[SysCRED] E-E-A-T calculator initialized") except Exception as e: print(f"[SysCRED] NER/E-E-A-T init failed: {e}") print("[SysCRED] System ready!") def _load_ml_models(self): """Load ML models for NLP analysis.""" print("[SysCRED] Loading ML models (this may take a moment)...") try: # Sentiment analysis - modèle ultra-léger self.sentiment_pipeline = pipeline( "sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english", device=-1, model_kwargs={"low_cpu_mem_usage": True} ) print("[SysCRED] ✓ Sentiment model loaded (distilbert-base)") except Exception as e: print(f"[SysCRED] ✗ Sentiment model failed: {e}") try: # NER pipeline - modèle plus léger self.ner_pipeline = pipeline( "ner", model="dslim/bert-base-NER", grouped_entities=True, device=-1, model_kwargs={"low_cpu_mem_usage": True} ) print("[SysCRED] ✓ NER model loaded (dslim/bert-base-NER)") except Exception as e: print(f"[SysCRED] ✗ NER model failed: {e}") try: # Bias detection - modèle plus léger si possible bias_model_name = "typeform/distilbert-base-uncased-mnli" self.bias_tokenizer = AutoTokenizer.from_pretrained(bias_model_name) self.bias_model = AutoModelForSequenceClassification.from_pretrained(bias_model_name) print("[SysCRED] ✓ Bias model loaded (distilbert-mnli)") except Exception as e: print(f"[SysCRED] ✗ Bias model failed: {e}. Using heuristics.") try: # Semantic Coherence - modèle MiniLM (déjà léger) if HAS_SBERT: self.coherence_model = SentenceTransformer('all-MiniLM-L6-v2') print("[SysCRED] ✓ Coherence model loaded (SBERT MiniLM)") except Exception as e: print(f"[SysCRED] ✗ Coherence model failed: {e}") try: # LIME explainer self.explainer = LimeTextExplainer(class_names=['NEGATIVE', 'POSITIVE']) print("[SysCRED] ✓ LIME explainer loaded") except Exception as e: print(f"[SysCRED] ✗ LIME explainer failed: {e}") def is_url(self, text: str) -> bool: """Check if a string is a valid URL.""" try: result = urlparse(text) return all([result.scheme, result.netloc]) except ValueError: return False def preprocess(self, text: str) -> str: """Clean and normalize text for analysis.""" if not isinstance(text, str): return "" # Remove URLs text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE) # Normalize whitespace text = re.sub(r'\s+', ' ', text) # Keep basic punctuation text = re.sub(r'[^\w\s\.\?,!]', '', text) return text.lower().strip() def rule_based_analysis(self, text: str, external_data: ExternalData) -> Dict[str, Any]: """ Perform rule-based analysis using symbolic reasoning. Args: text: Preprocessed text to analyze external_data: Data from external APIs Returns: Dictionary with rule-based analysis results """ results = { 'linguistic_markers': {}, 'source_analysis': {}, 'timeliness_flags': [], 'fact_checking': [] } # 1. Linguistic markers sensational_words = [ 'shocking', 'revealed', 'conspiracy', 'amazing', 'secret', 'breakthrough', 'miracle', 'unbelievable', 'exclusive', 'urgent' ] certainty_words = [ 'verified', 'authentic', 'credible', 'proven', 'fact', 'confirmed', 'official', 'legitimate', 'established' ] doubt_words = [ 'hoax', 'false', 'fake', 'unproven', 'rumor', 'allegedly', 'claim', 'debunked', 'misleading', 'disputed' ] text_lower = text.lower() results['linguistic_markers']['sensationalism'] = sum( 1 for word in sensational_words if word in text_lower ) results['linguistic_markers']['certainty'] = sum( 1 for word in certainty_words if word in text_lower ) results['linguistic_markers']['doubt'] = sum( 1 for word in doubt_words if word in text_lower ) # 2. Source analysis from external data results['source_analysis']['reputation'] = external_data.source_reputation results['source_analysis']['domain_age_days'] = external_data.domain_age_days if external_data.domain_info: results['source_analysis']['registrar'] = external_data.domain_info.registrar results['source_analysis']['domain'] = external_data.domain_info.domain # 3. Timeliness flags if external_data.domain_age_days is not None: if external_data.domain_age_days < 180: results['timeliness_flags'].append('Source domain is relatively new (<6 months)') elif external_data.domain_age_days < 365: results['timeliness_flags'].append('Source domain is less than 1 year old') # 4. Fact checking results for fc in external_data.fact_checks: results['fact_checking'].append({ 'claim': fc.claim, 'rating': fc.rating, 'publisher': fc.publisher, 'url': fc.url }) return results def nlp_analysis(self, text: str) -> Dict[str, Any]: """ Perform NLP-based analysis using ML models. Args: text: Preprocessed text to analyze Returns: Dictionary with NLP analysis results """ results = { 'sentiment': None, 'sentiment_explanation': None, 'bias_analysis': {'score': None, 'label': 'Unavailable'}, 'named_entities': [], 'coherence_score': None } if not text: results['sentiment'] = {'label': 'Neutral', 'score': 0.5} return results # 1. Sentiment analysis with LIME explanation if self.sentiment_pipeline: try: main_pred = self.sentiment_pipeline(text[:512])[0] results['sentiment'] = main_pred if self.explainer: def predict_proba(texts): if isinstance(texts, str): texts = [texts] predictions = self.sentiment_pipeline(list(texts)) probs = [] for pred in predictions: if pred['label'] == 'POSITIVE': probs.append([1 - pred['score'], pred['score']]) else: probs.append([pred['score'], 1 - pred['score']]) return np.array(probs) explanation = self.explainer.explain_instance( text[:512], predict_proba, num_features=6 ) results['sentiment_explanation'] = explanation.as_list() except Exception as e: print(f"[NLP] Sentiment error: {e}") results['sentiment'] = {'label': 'Error', 'score': 0.0} # 2. Bias analysis results['bias_analysis'] = self._analyze_bias(text) # 3. Named Entity Recognition if self.ner_pipeline: try: entities = self.ner_pipeline(text[:512]) results['named_entities'] = entities except Exception as e: print(f"[NLP] NER error: {e}") # 4. Semantic Coherence results['coherence_score'] = self._calculate_coherence(text) return results def _analyze_bias(self, text: str) -> Dict[str, Any]: """Analyze text for bias using ML or heuristics.""" # Method 1: ML Model if self.bias_model and self.bias_tokenizer: try: inputs = self.bias_tokenizer( text[:512], return_tensors="pt", truncation=True, max_length=512, padding=True ) with torch.no_grad(): logits = self.bias_model(**inputs).logits probs = torch.softmax(logits, dim=1)[0] # Label mapping depends on model, usually [Non-biased, Biased] bias_score = probs[1].item() label = " biased" if bias_score > 0.5 else "Non-biased" return {'score': bias_score, 'label': label, 'method': 'ML (d4data)'} except Exception as e: print(f"[NLP] ML Bias error: {e}") # Method 2: Heuristics biased_words = [ 'radical', 'extremist', 'disgraceful', 'shameful', 'corrupt', 'insane', 'idiot', 'disaster', 'propaganda', 'dictator', 'puppet', 'regime', 'tyrant', 'treason', 'traitor' ] text_lower = text.lower() count = sum(1 for w in biased_words if w in text_lower) score = min(1.0, count * 0.15) label = "Potentially Biased" if score > 0.3 else "Neutral" return {'score': score, 'label': label, 'method': 'Heuristic'} def _calculate_coherence(self, text: str) -> float: """Calculate semantic coherence score.""" sentences = re.split(r'[.!?]+', text) sentences = [s.strip() for s in sentences if len(s.split()) > 3] if len(sentences) < 2: return 0.7 # Default to neutral/good for short text, not perfect 1.0 # Method 1: SBERT Semantic Similarity if self.coherence_model and HAS_SBERT: try: embeddings = self.coherence_model.encode(sentences[:10]) # Limit to 10 sims = [] for i in range(len(embeddings) - 1): sim = util.pytorch_cos_sim(embeddings[i], embeddings[i+1]) sims.append(sim.item()) return sum(sims) / len(sims) if sims else 0.5 except Exception as e: print(f"[NLP] SBERT error: {e}") # Method 2: Heuristic (Sentence Length Variance & Repetition) lengths = [len(s.split()) for s in sentences] avg_len = sum(lengths) / len(lengths) variance = sum((l - avg_len) ** 2 for l in lengths) / len(lengths) # High variance suggests simpler/choppier writing usually score = 0.8 if variance > 100: score -= 0.2 if avg_len < 5: score -= 0.2 return max(0.0, score) def calculate_overall_score( self, rule_results: Dict, nlp_results: Dict ) -> float: """ Calculate overall credibility score based on User-Defined Metrics. """ score = 0.5 # Start neutral adjustments = 0.0 total_weight_used = 0.0 # 1. Source Reputation (25%) w_rep = self.weights.get('source_reputation', 0.25) reputation = rule_results['source_analysis'].get('reputation', 'Unknown') if reputation != 'Unknown' and "N/A" not in reputation: if reputation == 'High': adjustments += w_rep * 1.0 # Full boost elif reputation == 'Low': adjustments -= w_rep * 1.0 # Full penalty elif reputation == 'Medium': adjustments += w_rep * 0.2 # Slight boost total_weight_used += w_rep # 2. Domain Age (10%) w_age = self.weights.get('domain_age', 0.10) domain_age = rule_results['source_analysis'].get('domain_age_days') if domain_age is not None: if domain_age > 730: # > 2 years adjustments += w_age elif domain_age < 90: # < 3 months adjustments -= w_age total_weight_used += w_age # 3. Fact Check (20%) w_fc = self.weights.get('fact_check', 0.20) fact_checks = rule_results.get('fact_checking', []) if fact_checks: fc_score = 0 for fc in fact_checks: rating = fc.get('rating', '').lower() if rating in ['true', 'verified', 'correct']: fc_score += 1 elif rating in ['false', 'fake', 'incorrect']: fc_score -= 1 # Normalize fc_score (-1 to 1) roughly if fc_score > 0: adjustments += w_fc elif fc_score < 0: adjustments -= w_fc total_weight_used += w_fc # 4. Sentiment Neutrality (15%) # Extreme sentiment = lower score w_sent = self.weights.get('sentiment_neutrality', 0.15) sentiment = nlp_results.get('sentiment', {}) if sentiment: s_score = sentiment.get('score', 0.5) # If extremely positive or negative (>0.9), penalize if s_score > 0.9: adjustments -= w_sent * 0.5 # Penalty for extremism else: adjustments += w_sent * 0.2 # Slight boost for moderation total_weight_used += w_sent # 5. Entity Presence (15%) # Presence of Named Entities (PER, ORG, LOC) suggests verifyiability w_ent = self.weights.get('entity_presence', 0.15) entities = nlp_results.get('named_entities', []) if len(entities) > 0: # More entities = better (capped) boost = min(1.0, len(entities) * 0.2) adjustments += w_ent * boost total_weight_used += w_ent # 6. Text Coherence (12%) (Vocabulary Diversity) w_coh = self.weights.get('coherence', 0.12) coherence = nlp_results.get('coherence_score') if coherence is not None: # Coherence is usually 0.0 to 1.0 # Center around 0.5: >0.5 improves, <0.5 penalizes adjustments += (coherence - 0.5) * w_coh total_weight_used += w_coh # 7. [NEW] GraphRAG Context Score (15%) # This uses historical knowledge from the knowledge graph w_graph = self.weights.get('graph_context', 0.15) graph_context_data = rule_results.get('graph_context_data', {}) if graph_context_data and graph_context_data.get('confidence', 0) > 0: # Use combined score from GraphRAG graph_score = graph_context_data.get('combined_score', 0.5) confidence = graph_context_data.get('confidence', 0) # Scale adjustment by confidence (0 confidence = no effect) adjustment_factor = (graph_score - 0.5) * w_graph * confidence adjustments += adjustment_factor total_weight_used += w_graph * confidence # Partial weight based on confidence # 8. [NEW] Linguistic Markers Analysis (sensationalism penalty) # Penalize sensational language heavily, reward doubt markers (critical thinking) linguistic = rule_results.get('linguistic_markers', {}) sensationalism_count = linguistic.get('sensationalism', 0) doubt_count = linguistic.get('doubt', 0) certainty_count = linguistic.get('certainty', 0) # Sensationalism is a strong negative signal if sensationalism_count > 0: penalty = min(0.20, sensationalism_count * 0.05) # Max 20% penalty adjustments -= penalty # Excessive certainty without sources is suspicious if certainty_count > 2 and not fact_checks: adjustments -= 0.05 # Doubt markers indicate critical/questioning tone (slight positive) if doubt_count > 0: adjustments += min(0.05, doubt_count * 0.02) # Final calculation # Base 0.5 + sum of weighted adjustments # Adjustments are in range [-weight, +weight] final_score = 0.5 + adjustments return max(0.0, min(1.0, final_score)) # --- [NEW] TREC Evidence Retrieval Methods --- def retrieve_evidence( self, claim: str, k: int = 10, model: str = "bm25" ) -> List[Dict[str, Any]]: """ Retrieve evidence documents for a given claim using TREC methodology. This integrates the classic IR evaluation framework (TREC AP88-90) with the neuro-symbolic credibility verification system. Args: claim: The claim or statement to verify k: Number of evidence documents to retrieve model: Retrieval model ('bm25', 'qld', 'tfidf') Returns: List of evidence dictionaries with doc_id, text, score, rank """ if not self.trec_retriever: return [] try: result = self.trec_retriever.retrieve_evidence( claim=claim, k=k, model=model ) # Convert Evidence objects to dictionaries evidences = [e.to_dict() for e in result.evidences] # Add to ontology if available if self.ontology_manager: for e in result.evidences[:3]: # Top 3 only self.ontology_manager.add_evidence( evidence_id=e.doc_id, source=e.source or "trec_corpus", content=e.text[:500], score=e.score ) return evidences except Exception as ex: print(f"[SysCRED] Evidence retrieval error: {ex}") return [] def verify_with_evidence( self, claim: str, k: int = 5 ) -> Dict[str, Any]: """ Complete fact-checking pipeline with evidence retrieval. Combines: 1. TREC-style evidence retrieval 2. NLP analysis of claim 3. Evidence-claim comparison 4. Credibility scoring Args: claim: The claim to verify k: Number of evidence documents Returns: Verification result with evidence, analysis, and score """ result = { 'claim': claim, 'evidences': [], 'nlp_analysis': {}, 'evidence_support_score': 0.0, 'verification_verdict': 'UNKNOWN', 'confidence': 0.0 } # 1. Retrieve evidence evidences = self.retrieve_evidence(claim, k=k) result['evidences'] = evidences # 2. NLP analysis of claim cleaned_claim = self.preprocess(claim) result['nlp_analysis'] = self.nlp_analysis(cleaned_claim) # 3. Calculate evidence support score if evidences: # Use semantic similarity if SBERT available if self.coherence_model: try: claim_embedding = self.coherence_model.encode(claim) evidence_texts = [e.get('text', '') for e in evidences] evidence_embeddings = self.coherence_model.encode(evidence_texts) from sentence_transformers import util similarities = util.pytorch_cos_sim(claim_embedding, evidence_embeddings)[0] avg_similarity = similarities.mean().item() max_similarity = similarities.max().item() # Evidence support based on similarity result['evidence_support_score'] = round(max_similarity, 4) result['average_evidence_similarity'] = round(avg_similarity, 4) except Exception as e: print(f"[SysCRED] Similarity error: {e}") # Fallback: use retrieval scores result['evidence_support_score'] = evidences[0].get('score', 0) if evidences else 0 else: # Fallback: use retrieval scores result['evidence_support_score'] = evidences[0].get('score', 0) if evidences else 0 # 4. Determine verdict support_score = result['evidence_support_score'] if support_score > 0.7: result['verification_verdict'] = 'SUPPORTED' result['confidence'] = support_score elif support_score > 0.5: result['verification_verdict'] = 'PARTIALLY_SUPPORTED' result['confidence'] = support_score elif support_score > 0.3: result['verification_verdict'] = 'INSUFFICIENT_EVIDENCE' result['confidence'] = 0.5 else: result['verification_verdict'] = 'NOT_SUPPORTED' result['confidence'] = 1 - support_score return result # --- End TREC Evidence Methods --- def generate_report( self, input_data: str, cleaned_text: str, rule_results: Dict, nlp_results: Dict, external_data: ExternalData, overall_score: float, web_content: Optional[WebContent] = None, graph_context: str = "", # [NEW] evidences: List[Dict[str, Any]] = None # [NEW] TREC evidences ) -> Dict[str, Any]: """Generate the final evaluation report.""" # Determine credibility level if overall_score >= 0.75: niveau = "Élevée" elif overall_score >= 0.55: niveau = "Moyenne-Élevée" elif overall_score >= 0.45: niveau = "Moyenne" elif overall_score >= 0.25: niveau = "Faible-Moyenne" else: niveau = "Faible" report = { 'idRapport': f"report_{int(datetime.datetime.now().timestamp())}", 'informationEntree': input_data, 'dateGeneration': datetime.datetime.now().isoformat(), 'scoreCredibilite': round(overall_score, 2), 'niveauCredibilite': niveau, 'resumeAnalyse': "", 'detailsScore': { 'base': 0.5, 'weights': self.weights, 'factors': self._get_score_factors(rule_results, nlp_results) }, 'sourcesUtilisees': [], 'reglesAppliquees': rule_results, 'analyseNLP': { 'sentiment': nlp_results.get('sentiment'), 'bias_analysis': nlp_results.get('bias_analysis'), 'named_entities_count': len(nlp_results.get('named_entities', [])), 'coherence_score': nlp_results.get('coherence_score'), 'sentiment_explanation_preview': (nlp_results.get('sentiment_explanation') or [])[:3] }, # [NEW] GraphRAG section 'graphRAG': { 'context_text': graph_context, 'context_score': rule_results.get('graph_context_data', {}).get('combined_score'), 'confidence': rule_results.get('graph_context_data', {}).get('confidence', 0), 'has_history': rule_results.get('graph_context_data', {}).get('has_history', False), 'history_count': rule_results.get('graph_context_data', {}).get('history_count', 0), 'similar_claims_count': rule_results.get('graph_context_data', {}).get('similar_count', 0) }, # [NEW] TREC Evidence section 'evidences': evidences or [], 'metadonnees': {} } # Add web content metadata if available if web_content: if web_content.success: report['metadonnees']['page_title'] = web_content.title report['metadonnees']['meta_description'] = web_content.meta_description report['metadonnees']['links_count'] = len(web_content.links) else: report['metadonnees']['warning'] = f"Content scrape failed: {web_content.error}" # Generate summary summary_parts = [] if web_content and not web_content.success: summary_parts.append(f"⚠️ ATTENTION: Impossible de lire le texte de la page ({web_content.error}). Analyse basée uniquement sur la réputation du domaine.") if overall_score > 0.75: summary_parts.append("L'analyse suggère une crédibilité ÉLEVÉE.") elif overall_score > 0.55: summary_parts.append("L'analyse suggère une crédibilité MOYENNE à ÉLEVÉE.") elif overall_score > 0.45: summary_parts.append("L'analyse suggère une crédibilité MOYENNE.") elif overall_score > 0.25: summary_parts.append("L'analyse suggère une crédibilité FAIBLE à MOYENNE.") else: summary_parts.append("L'analyse suggère une crédibilité FAIBLE.") if external_data.source_reputation != 'Unknown': summary_parts.append(f"Réputation source : {external_data.source_reputation}.") if external_data.domain_age_days: years = external_data.domain_age_days / 365 summary_parts.append(f"Âge du domaine : {years:.1f} ans.") if external_data.fact_checks: summary_parts.append(f"{len(external_data.fact_checks)} vérification(s) de faits trouvée(s).") report['resumeAnalyse'] = " ".join(summary_parts) # List sources used if self.is_url(input_data): report['sourcesUtilisees'].append({ 'type': 'Primary URL', 'url': input_data }) report['sourcesUtilisees'].append({ 'type': 'WHOIS Lookup', 'status': 'Success' if (external_data.domain_info and external_data.domain_info.success) else 'Failed/N/A' }) report['sourcesUtilisees'].append({ 'type': 'Fact Check API', 'results_count': len(external_data.fact_checks) }) # [NEW] Add TREC evidence source if evidences: report['sourcesUtilisees'].append({ 'type': 'TREC Evidence Retrieval', 'method': 'BM25/TF-IDF', 'corpus': 'AP88-90', 'results_count': len(evidences) }) return report def _get_score_factors(self, rule_results: Dict, nlp_results: Dict) -> List[Dict]: """Get list of factors that influenced the score (For UI).""" factors = [] # 1. Reputation rep = rule_results['source_analysis'].get('reputation') if rep and "N/A" not in rep: factors.append({ 'factor': 'Source Reputation', 'value': rep, 'weight': f"{int(self.weights.get('source_reputation',0)*100)}%", 'impact': '+' if rep == 'High' else ('-' if rep == 'Low' else '0') }) # 2. Fact Checks if rule_results.get('fact_checking'): factors.append({ 'factor': 'Fact Checks', 'value': f"{len(rule_results['fact_checking'])} found", 'weight': f"{int(self.weights.get('fact_check',0)*100)}%", 'impact': 'Variable' }) # 3. Entities n_ent = len(nlp_results.get('named_entities', [])) if n_ent > 0: factors.append({ 'factor': 'Entity Presence', 'value': f"{n_ent} entities", 'weight': f"{int(self.weights.get('entity_presence',0)*100)}%", 'impact': '+' }) # 4. Sentiment sent = nlp_results.get('sentiment', {}) if sent: factors.append({ 'factor': 'Sentiment Neutrality', 'value': f"{sent.get('label')} ({sent.get('score',0):.2f})", 'weight': f"{int(self.weights.get('sentiment_neutrality',0)*100)}%", 'impact': '-' if sent.get('score', 0) > 0.9 else '0' }) # 5. GraphRAG Context (NEW) graph_data = rule_results.get('graph_context_data', {}) if graph_data.get('confidence', 0) > 0: graph_score = graph_data.get('combined_score', 0.5) impact = '+' if graph_score > 0.6 else ('-' if graph_score < 0.4 else '0') factors.append({ 'factor': 'Graph Context (History)', 'value': f"Score: {graph_score:.2f}, Confidence: {graph_data.get('confidence', 0):.0%}", 'weight': f"{int(self.weights.get('graph_context',0)*100)}%", 'impact': impact, 'history_count': graph_data.get('history_count', 0), 'similar_count': graph_data.get('similar_count', 0) }) return factors def verify_information(self, input_data: str) -> Dict[str, Any]: """ Main pipeline to verify credibility of input data. Args: input_data: URL or text to verify Returns: Complete evaluation report """ if not isinstance(input_data, str) or not input_data.strip(): return {"error": "L'entrée doit être une chaîne non vide."} print(f"\n[SysCRED] === Vérification: {input_data[:100]}... ===") # 1. Determine input type and fetch content text_to_analyze = "" web_content = None is_url = self.is_url(input_data) if is_url: print("[SysCRED] Fetching web content...") web_content = self.api_clients.fetch_web_content(input_data) if web_content.success: text_to_analyze = web_content.text_content print(f"[SysCRED] ✓ Content fetched: {len(text_to_analyze)} chars") else: print(f"[SysCRED] ⚠ Fetch failed: {web_content.error}") print("[SysCRED] Proceeding with Domain/Metadata analysis only.") text_to_analyze = "" # We don't return error anymore, we proceed! else: text_to_analyze = input_data # 2. Preprocess text cleaned_text = self.preprocess(text_to_analyze) # Only error on empty text if it wasn't a failed web fetch # If web fetch failed, we proceed with empty text to give metadata analysis if not cleaned_text and not (is_url and web_content and not web_content.success): return {"error": "Le texte est vide après prétraitement."} print(f"[SysCRED] Preprocessed text: {len(cleaned_text)} chars") # Determine best query for Fact Checking fact_check_query = input_data if text_to_analyze and len(text_to_analyze) > 10: # Use start of text if available fact_check_query = text_to_analyze[:200] elif is_url and web_content and web_content.title: # Fallback to page title if text is missing (e.g. 403) fact_check_query = web_content.title # 3. Fetch external data print(f"[SysCRED] Fetching external data (Query: {fact_check_query[:50]}...)...") external_data = self.api_clients.fetch_external_data(input_data, fc_query=fact_check_query) # [FIX] Handle text-only input reputation if not is_url: external_data.source_reputation = "N/A (User Input)" print(f"[SysCRED] ✓ Reputation: {external_data.source_reputation}, Age: {external_data.domain_age_days} days") # 4. Rule-based analysis print("[SysCRED] Running rule-based analysis...") rule_results = self.rule_based_analysis(cleaned_text, external_data) # 5. [MOVED] GraphRAG Context Retrieval (Before NLP for context) graph_context = "" similar_uris = [] graph_context_data = {} if self.graph_rag and 'source_analysis' in rule_results: domain = rule_results['source_analysis'].get('domain', '') # Pass keywords for text search if domain is empty or generic keywords = [] if cleaned_text: # Extract meaningful keywords (filter out short words) keywords = [w for w in cleaned_text.split()[:10] if len(w) > 4] # Get text context for display context = self.graph_rag.get_context(domain, keywords=keywords) graph_context = context.get('full_text', '') similar_uris = context.get('similar_uris', []) # Get numerical score for integration into scoring graph_context_data = self.graph_rag.compute_context_score(domain, keywords=keywords) # Add to rule_results for use in calculate_overall_score rule_results['graph_context_data'] = graph_context_data if graph_context_data.get('has_history'): print(f"[SysCRED] GraphRAG: Domain has {graph_context_data['history_count']} prior evaluations, " f"avg score: {graph_context_data['history_score']:.2f}") if graph_context_data.get('similar_count', 0) > 0: print(f"[SysCRED] GraphRAG: Found {graph_context_data['similar_count']} similar claims") # 6. NLP analysis print("[SysCRED] Running NLP analysis...") nlp_results = self.nlp_analysis(cleaned_text) # 6.5 [NER] Named Entity Recognition ner_entities = {} if self.ner_analyzer and cleaned_text: try: ner_entities = self.ner_analyzer.extract_entities(cleaned_text) total = sum(len(v) for v in ner_entities.values() if isinstance(v, list)) print(f"[SysCRED] NER: {total} entites detectees") except Exception as e: print(f"[SysCRED] NER failed: {e}") # 6.6 [E-E-A-T] Experience-Expertise-Authority-Trust scoring eeat_scores = {} if self.eeat_calculator: try: url_for_eeat = input_data if is_url else "" domain_age_years = None if external_data.domain_age_days: domain_age_years = external_data.domain_age_days / 365.0 eeat_raw = self.eeat_calculator.calculate( url=url_for_eeat, text=cleaned_text, nlp_analysis=nlp_results, fact_checks=rule_results.get('fact_checking', []), domain_age_years=domain_age_years, has_https=input_data.startswith("https://") if is_url else False ) eeat_scores = eeat_raw.to_dict() if hasattr(eeat_raw, 'to_dict') else ( eeat_raw if isinstance(eeat_raw, dict) else vars(eeat_raw) ) print(f"[SysCRED] E-E-A-T score: {eeat_scores.get('overall', 'N/A')}") except Exception as e: print(f"[SysCRED] E-E-A-T failed: {e}") # 7. Calculate score (Now includes GraphRAG context) overall_score = self.calculate_overall_score(rule_results, nlp_results) print(f"[SysCRED] ✓ Credibility score: {overall_score:.2f}") # 8. Generate report (Updated to include context) report = self.generate_report( input_data, cleaned_text, rule_results, nlp_results, external_data, overall_score, web_content, graph_context=graph_context ) # [NER + E-E-A-T] Always include in report (even if empty) report['ner_entities'] = ner_entities report['eeat_scores'] = eeat_scores # Add similar URIs to report for ontology linking if similar_uris: report['similar_claims_uris'] = similar_uris # 9. Save to ontology if self.ontology_manager: try: report_uri = self.ontology_manager.add_evaluation_triplets(report) report['ontology_uri'] = report_uri self.ontology_manager.save_data() except Exception as e: print(f"[SysCRED] Ontology save failed: {e}") print("[SysCRED] === Vérification terminée ===\n") return report # --- Main / Testing --- if __name__ == "__main__": import json print("=" * 60) print("SysCRED v2.0 - Système de Vérification de Crédibilité") print("(c) Dominique S. Loyer - PhD Thesis Prototype") print("=" * 60 + "\n") # Initialize system (without ML models for quick testing) system = CredibilityVerificationSystem( ontology_base_path="/Users/bk280625/documents041025/MonCode/sysCRED_onto26avrtil.ttl", ontology_data_path="/Users/bk280625/documents041025/MonCode/ontology/sysCRED_data.ttl", load_ml_models=False # Set to True for full analysis ) # Test cases test_cases = { "Test URL Crédible": "https://www.lemonde.fr", "Test URL Inconnu": "https://example.com/article", "Test Texte Simple": "This is a verified and authentic news report.", "Test Texte Suspect": "Shocking conspiracy revealed! They don't want you to know this secret!", } results = {} for name, test_input in test_cases.items(): print(f"\n{'='*50}") print(f"Test: {name}") print('='*50) result = system.verify_information(test_input) results[name] = result if 'error' not in result: print(f"\nScore: {result['scoreCredibilite']}") print(f"Résumé: {result['resumeAnalyse']}") else: print(f"Erreur: {result['error']}") print("\n" + "="*60) print("Résumé des tests:") print("="*60) for name, result in results.items(): if 'error' not in result: print(f" {name}: Score = {result['scoreCredibilite']:.2f}") else: print(f" {name}: ERREUR")