Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| """ | |
| Verification System Module - SysCRED v2.0 | |
| ========================================== | |
| Main credibility verification system with real API integration. | |
| Refactored from sys-cred-Python-27avril2025.py | |
| (c) Dominique S. Loyer - PhD Thesis Prototype | |
| Citation Key: loyerModelingHybridSystem2025 | |
| """ | |
| import re | |
| import json | |
| import datetime | |
| from typing import Optional, Dict, Any, List | |
| from urllib.parse import urlparse | |
| # Transformers and ML | |
| try: | |
| from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification | |
| import numpy as np | |
| import torch | |
| from lime.lime_text import LimeTextExplainer | |
| HAS_ML = True | |
| except ImportError: | |
| HAS_ML = False | |
| print("Warning: ML libraries not fully installed. Run: pip install transformers torch lime numpy") | |
| try: | |
| from sentence_transformers import SentenceTransformer, util | |
| HAS_SBERT = True | |
| except ImportError: | |
| HAS_SBERT = False | |
| print("Warning: sentence-transformers not installed. Semantic coherence will use heuristics.") | |
| # Local imports - Support both syscred.module and relative imports | |
| try: | |
| from syscred.api_clients import ExternalAPIClients, WebContent, ExternalData | |
| from syscred.ontology_manager import OntologyManager | |
| from syscred.seo_analyzer import SEOAnalyzer | |
| from syscred.graph_rag import GraphRAG | |
| from syscred.trec_retriever import TRECRetriever, Evidence, RetrievalResult | |
| from syscred import config | |
| except ImportError: | |
| from api_clients import ExternalAPIClients, WebContent, ExternalData | |
| from ontology_manager import OntologyManager | |
| from seo_analyzer import SEOAnalyzer | |
| from graph_rag import GraphRAG | |
| from trec_retriever import TRECRetriever, Evidence, RetrievalResult | |
| import config | |
| # [NER + E-E-A-T] Imports optionnels - n'interferent pas avec les imports principaux | |
| HAS_NER_EEAT = False | |
| try: | |
| from syscred.ner_analyzer import NERAnalyzer | |
| from syscred.eeat_calculator import EEATCalculator, EEATScore | |
| HAS_NER_EEAT = True | |
| except ImportError: | |
| try: | |
| from ner_analyzer import NERAnalyzer | |
| from eeat_calculator import EEATCalculator, EEATScore | |
| HAS_NER_EEAT = True | |
| except ImportError: | |
| pass | |
| class CredibilityVerificationSystem: | |
| """ | |
| Système neuro-symbolique de vérification de crédibilité. | |
| Combine: | |
| - Analyse basée sur des règles (symbolique, transparent) | |
| - Analyse NLP/IA (apprentissage automatique) | |
| - Ontologie OWL pour la traçabilité | |
| - APIs externes pour les données réelles | |
| """ | |
| def __init__( | |
| self, | |
| google_api_key: Optional[str] = None, | |
| ontology_base_path: Optional[str] = None, | |
| ontology_data_path: Optional[str] = None, | |
| load_ml_models: bool = True | |
| ): | |
| """ | |
| Initialize the credibility verification system. | |
| Args: | |
| google_api_key: API key for Google Fact Check (optional) | |
| ontology_base_path: Path to base ontology TTL file | |
| ontology_data_path: Path to store accumulated data | |
| load_ml_models: Whether to load ML models (disable for testing) | |
| """ | |
| print("[SysCRED] Initializing Credibility Verification System v2.0...") | |
| # Initialize API clients | |
| self.api_clients = ExternalAPIClients(google_api_key=google_api_key) | |
| print("[SysCRED] API clients initialized") | |
| # Initialize ontology manager | |
| self.ontology_manager = None | |
| if ontology_base_path or ontology_data_path: | |
| try: | |
| self.ontology_manager = OntologyManager( | |
| base_ontology_path=ontology_base_path, | |
| data_path=ontology_data_path | |
| ) | |
| self.graph_rag = GraphRAG(self.ontology_manager) # [NEW] Init GraphRAG | |
| print("[SysCRED] Ontology manager & GraphRAG initialized") | |
| except Exception as e: | |
| print(f"[SysCRED] Ontology manager disabled: {e}") | |
| self.graph_rag = None | |
| else: | |
| self.graph_rag = None | |
| # [NEW] Initialize TREC Retriever for evidence gathering | |
| self.trec_retriever = None | |
| try: | |
| self.trec_retriever = TRECRetriever( | |
| index_path=config.Config.TREC_INDEX_PATH, | |
| corpus_path=config.Config.TREC_CORPUS_PATH, | |
| use_stemming=True, | |
| enable_prf=config.Config.ENABLE_PRF, | |
| prf_top_docs=config.Config.PRF_TOP_DOCS, | |
| prf_expansion_terms=config.Config.PRF_EXPANSION_TERMS | |
| ) | |
| print("[SysCRED] TREC Retriever initialized for evidence gathering") | |
| except Exception as e: | |
| print(f"[SysCRED] TREC Retriever disabled: {e}") | |
| # Initialize ML models | |
| self.sentiment_pipeline = None | |
| self.ner_pipeline = None | |
| self.bias_tokenizer = None | |
| self.bias_model = None | |
| self.coherence_model = None | |
| self.explainer = None | |
| if load_ml_models and HAS_ML: | |
| self._load_ml_models() | |
| # Weights for score calculation (configurable) | |
| # Weights for score calculation (Loaded from Config) | |
| self.weights = config.Config.SCORE_WEIGHTS | |
| print(f"[SysCRED] Using weights: {self.weights}") | |
| # [NER + E-E-A-T] Initialize analyzers | |
| self.ner_analyzer = None | |
| self.eeat_calculator = None | |
| if HAS_NER_EEAT: | |
| try: | |
| self.ner_analyzer = NERAnalyzer() | |
| self.eeat_calculator = EEATCalculator() | |
| print("[SysCRED] NER analyzer initialized") | |
| print("[SysCRED] E-E-A-T calculator initialized") | |
| except Exception as e: | |
| print(f"[SysCRED] NER/E-E-A-T init failed: {e}") | |
| print("[SysCRED] System ready!") | |
| def _load_ml_models(self): | |
| """Load ML models for NLP analysis.""" | |
| print("[SysCRED] Loading ML models (this may take a moment)...") | |
| try: | |
| # Sentiment analysis - modèle ultra-léger | |
| self.sentiment_pipeline = pipeline( | |
| "sentiment-analysis", | |
| model="distilbert-base-uncased-finetuned-sst-2-english", | |
| device=-1, | |
| model_kwargs={"low_cpu_mem_usage": True} | |
| ) | |
| print("[SysCRED] ✓ Sentiment model loaded (distilbert-base)") | |
| except Exception as e: | |
| print(f"[SysCRED] ✗ Sentiment model failed: {e}") | |
| try: | |
| # NER pipeline - modèle plus léger | |
| self.ner_pipeline = pipeline( | |
| "ner", | |
| model="dslim/bert-base-NER", | |
| grouped_entities=True, | |
| device=-1, | |
| model_kwargs={"low_cpu_mem_usage": True} | |
| ) | |
| print("[SysCRED] ✓ NER model loaded (dslim/bert-base-NER)") | |
| except Exception as e: | |
| print(f"[SysCRED] ✗ NER model failed: {e}") | |
| try: | |
| # Bias detection - modèle plus léger si possible | |
| bias_model_name = "typeform/distilbert-base-uncased-mnli" | |
| self.bias_tokenizer = AutoTokenizer.from_pretrained(bias_model_name) | |
| self.bias_model = AutoModelForSequenceClassification.from_pretrained(bias_model_name) | |
| print("[SysCRED] ✓ Bias model loaded (distilbert-mnli)") | |
| except Exception as e: | |
| print(f"[SysCRED] ✗ Bias model failed: {e}. Using heuristics.") | |
| try: | |
| # Semantic Coherence - modèle MiniLM (déjà léger) | |
| if HAS_SBERT: | |
| self.coherence_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| print("[SysCRED] ✓ Coherence model loaded (SBERT MiniLM)") | |
| except Exception as e: | |
| print(f"[SysCRED] ✗ Coherence model failed: {e}") | |
| try: | |
| # LIME explainer | |
| self.explainer = LimeTextExplainer(class_names=['NEGATIVE', 'POSITIVE']) | |
| print("[SysCRED] ✓ LIME explainer loaded") | |
| except Exception as e: | |
| print(f"[SysCRED] ✗ LIME explainer failed: {e}") | |
| def is_url(self, text: str) -> bool: | |
| """Check if a string is a valid URL.""" | |
| try: | |
| result = urlparse(text) | |
| return all([result.scheme, result.netloc]) | |
| except ValueError: | |
| return False | |
| def preprocess(self, text: str) -> str: | |
| """Clean and normalize text for analysis.""" | |
| if not isinstance(text, str): | |
| return "" | |
| # Remove URLs | |
| text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE) | |
| # Normalize whitespace | |
| text = re.sub(r'\s+', ' ', text) | |
| # Keep basic punctuation | |
| text = re.sub(r'[^\w\s\.\?,!]', '', text) | |
| return text.lower().strip() | |
| def rule_based_analysis(self, text: str, external_data: ExternalData) -> Dict[str, Any]: | |
| """ | |
| Perform rule-based analysis using symbolic reasoning. | |
| Args: | |
| text: Preprocessed text to analyze | |
| external_data: Data from external APIs | |
| Returns: | |
| Dictionary with rule-based analysis results | |
| """ | |
| results = { | |
| 'linguistic_markers': {}, | |
| 'source_analysis': {}, | |
| 'timeliness_flags': [], | |
| 'fact_checking': [] | |
| } | |
| # 1. Linguistic markers | |
| sensational_words = [ | |
| 'shocking', 'revealed', 'conspiracy', 'amazing', 'secret', | |
| 'breakthrough', 'miracle', 'unbelievable', 'exclusive', 'urgent' | |
| ] | |
| certainty_words = [ | |
| 'verified', 'authentic', 'credible', 'proven', 'fact', | |
| 'confirmed', 'official', 'legitimate', 'established' | |
| ] | |
| doubt_words = [ | |
| 'hoax', 'false', 'fake', 'unproven', 'rumor', 'allegedly', | |
| 'claim', 'debunked', 'misleading', 'disputed' | |
| ] | |
| text_lower = text.lower() | |
| results['linguistic_markers']['sensationalism'] = sum( | |
| 1 for word in sensational_words if word in text_lower | |
| ) | |
| results['linguistic_markers']['certainty'] = sum( | |
| 1 for word in certainty_words if word in text_lower | |
| ) | |
| results['linguistic_markers']['doubt'] = sum( | |
| 1 for word in doubt_words if word in text_lower | |
| ) | |
| # 2. Source analysis from external data | |
| results['source_analysis']['reputation'] = external_data.source_reputation | |
| results['source_analysis']['domain_age_days'] = external_data.domain_age_days | |
| if external_data.domain_info: | |
| results['source_analysis']['registrar'] = external_data.domain_info.registrar | |
| results['source_analysis']['domain'] = external_data.domain_info.domain | |
| # 3. Timeliness flags | |
| if external_data.domain_age_days is not None: | |
| if external_data.domain_age_days < 180: | |
| results['timeliness_flags'].append('Source domain is relatively new (<6 months)') | |
| elif external_data.domain_age_days < 365: | |
| results['timeliness_flags'].append('Source domain is less than 1 year old') | |
| # 4. Fact checking results | |
| for fc in external_data.fact_checks: | |
| results['fact_checking'].append({ | |
| 'claim': fc.claim, | |
| 'rating': fc.rating, | |
| 'publisher': fc.publisher, | |
| 'url': fc.url | |
| }) | |
| return results | |
| def nlp_analysis(self, text: str) -> Dict[str, Any]: | |
| """ | |
| Perform NLP-based analysis using ML models. | |
| Args: | |
| text: Preprocessed text to analyze | |
| Returns: | |
| Dictionary with NLP analysis results | |
| """ | |
| results = { | |
| 'sentiment': None, | |
| 'sentiment_explanation': None, | |
| 'bias_analysis': {'score': None, 'label': 'Unavailable'}, | |
| 'named_entities': [], | |
| 'coherence_score': None | |
| } | |
| if not text: | |
| results['sentiment'] = {'label': 'Neutral', 'score': 0.5} | |
| return results | |
| # 1. Sentiment analysis with LIME explanation | |
| if self.sentiment_pipeline: | |
| try: | |
| main_pred = self.sentiment_pipeline(text[:512])[0] | |
| results['sentiment'] = main_pred | |
| if self.explainer: | |
| def predict_proba(texts): | |
| if isinstance(texts, str): | |
| texts = [texts] | |
| predictions = self.sentiment_pipeline(list(texts)) | |
| probs = [] | |
| for pred in predictions: | |
| if pred['label'] == 'POSITIVE': | |
| probs.append([1 - pred['score'], pred['score']]) | |
| else: | |
| probs.append([pred['score'], 1 - pred['score']]) | |
| return np.array(probs) | |
| explanation = self.explainer.explain_instance( | |
| text[:512], predict_proba, num_features=6 | |
| ) | |
| results['sentiment_explanation'] = explanation.as_list() | |
| except Exception as e: | |
| print(f"[NLP] Sentiment error: {e}") | |
| results['sentiment'] = {'label': 'Error', 'score': 0.0} | |
| # 2. Bias analysis | |
| results['bias_analysis'] = self._analyze_bias(text) | |
| # 3. Named Entity Recognition | |
| if self.ner_pipeline: | |
| try: | |
| entities = self.ner_pipeline(text[:512]) | |
| results['named_entities'] = entities | |
| except Exception as e: | |
| print(f"[NLP] NER error: {e}") | |
| # 4. Semantic Coherence | |
| results['coherence_score'] = self._calculate_coherence(text) | |
| return results | |
| def _analyze_bias(self, text: str) -> Dict[str, Any]: | |
| """Analyze text for bias using ML or heuristics.""" | |
| # Method 1: ML Model | |
| if self.bias_model and self.bias_tokenizer: | |
| try: | |
| inputs = self.bias_tokenizer( | |
| text[:512], return_tensors="pt", | |
| truncation=True, max_length=512, padding=True | |
| ) | |
| with torch.no_grad(): | |
| logits = self.bias_model(**inputs).logits | |
| probs = torch.softmax(logits, dim=1)[0] | |
| # Label mapping depends on model, usually [Non-biased, Biased] | |
| bias_score = probs[1].item() | |
| label = " biased" if bias_score > 0.5 else "Non-biased" | |
| return {'score': bias_score, 'label': label, 'method': 'ML (d4data)'} | |
| except Exception as e: | |
| print(f"[NLP] ML Bias error: {e}") | |
| # Method 2: Heuristics | |
| biased_words = [ | |
| 'radical', 'extremist', 'disgraceful', 'shameful', 'corrupt', | |
| 'insane', 'idiot', 'disaster', 'propaganda', 'dictator', | |
| 'puppet', 'regime', 'tyrant', 'treason', 'traitor' | |
| ] | |
| text_lower = text.lower() | |
| count = sum(1 for w in biased_words if w in text_lower) | |
| score = min(1.0, count * 0.15) | |
| label = "Potentially Biased" if score > 0.3 else "Neutral" | |
| return {'score': score, 'label': label, 'method': 'Heuristic'} | |
| def _calculate_coherence(self, text: str) -> float: | |
| """Calculate semantic coherence score.""" | |
| sentences = re.split(r'[.!?]+', text) | |
| sentences = [s.strip() for s in sentences if len(s.split()) > 3] | |
| if len(sentences) < 2: | |
| return 0.7 # Default to neutral/good for short text, not perfect 1.0 | |
| # Method 1: SBERT Semantic Similarity | |
| if self.coherence_model and HAS_SBERT: | |
| try: | |
| embeddings = self.coherence_model.encode(sentences[:10]) # Limit to 10 | |
| sims = [] | |
| for i in range(len(embeddings) - 1): | |
| sim = util.pytorch_cos_sim(embeddings[i], embeddings[i+1]) | |
| sims.append(sim.item()) | |
| return sum(sims) / len(sims) if sims else 0.5 | |
| except Exception as e: | |
| print(f"[NLP] SBERT error: {e}") | |
| # Method 2: Heuristic (Sentence Length Variance & Repetition) | |
| lengths = [len(s.split()) for s in sentences] | |
| avg_len = sum(lengths) / len(lengths) | |
| variance = sum((l - avg_len) ** 2 for l in lengths) / len(lengths) | |
| # High variance suggests simpler/choppier writing usually | |
| score = 0.8 | |
| if variance > 100: score -= 0.2 | |
| if avg_len < 5: score -= 0.2 | |
| return max(0.0, score) | |
| def calculate_overall_score( | |
| self, | |
| rule_results: Dict, | |
| nlp_results: Dict | |
| ) -> float: | |
| """ | |
| Calculate overall credibility score based on User-Defined Metrics. | |
| """ | |
| score = 0.5 # Start neutral | |
| adjustments = 0.0 | |
| total_weight_used = 0.0 | |
| # 1. Source Reputation (25%) | |
| w_rep = self.weights.get('source_reputation', 0.25) | |
| reputation = rule_results['source_analysis'].get('reputation', 'Unknown') | |
| if reputation != 'Unknown' and "N/A" not in reputation: | |
| if reputation == 'High': | |
| adjustments += w_rep * 1.0 # Full boost | |
| elif reputation == 'Low': | |
| adjustments -= w_rep * 1.0 # Full penalty | |
| elif reputation == 'Medium': | |
| adjustments += w_rep * 0.2 # Slight boost | |
| total_weight_used += w_rep | |
| # 2. Domain Age (10%) | |
| w_age = self.weights.get('domain_age', 0.10) | |
| domain_age = rule_results['source_analysis'].get('domain_age_days') | |
| if domain_age is not None: | |
| if domain_age > 730: # > 2 years | |
| adjustments += w_age | |
| elif domain_age < 90: # < 3 months | |
| adjustments -= w_age | |
| total_weight_used += w_age | |
| # 3. Fact Check (20%) | |
| w_fc = self.weights.get('fact_check', 0.20) | |
| fact_checks = rule_results.get('fact_checking', []) | |
| if fact_checks: | |
| fc_score = 0 | |
| for fc in fact_checks: | |
| rating = fc.get('rating', '').lower() | |
| if rating in ['true', 'verified', 'correct']: | |
| fc_score += 1 | |
| elif rating in ['false', 'fake', 'incorrect']: | |
| fc_score -= 1 | |
| # Normalize fc_score (-1 to 1) roughly | |
| if fc_score > 0: adjustments += w_fc | |
| elif fc_score < 0: adjustments -= w_fc | |
| total_weight_used += w_fc | |
| # 4. Sentiment Neutrality (15%) | |
| # Extreme sentiment = lower score | |
| w_sent = self.weights.get('sentiment_neutrality', 0.15) | |
| sentiment = nlp_results.get('sentiment', {}) | |
| if sentiment: | |
| s_score = sentiment.get('score', 0.5) | |
| # If extremely positive or negative (>0.9), penalize | |
| if s_score > 0.9: | |
| adjustments -= w_sent * 0.5 # Penalty for extremism | |
| else: | |
| adjustments += w_sent * 0.2 # Slight boost for moderation | |
| total_weight_used += w_sent | |
| # 5. Entity Presence (15%) | |
| # Presence of Named Entities (PER, ORG, LOC) suggests verifyiability | |
| w_ent = self.weights.get('entity_presence', 0.15) | |
| entities = nlp_results.get('named_entities', []) | |
| if len(entities) > 0: | |
| # More entities = better (capped) | |
| boost = min(1.0, len(entities) * 0.2) | |
| adjustments += w_ent * boost | |
| total_weight_used += w_ent | |
| # 6. Text Coherence (12%) (Vocabulary Diversity) | |
| w_coh = self.weights.get('coherence', 0.12) | |
| coherence = nlp_results.get('coherence_score') | |
| if coherence is not None: | |
| # Coherence is usually 0.0 to 1.0 | |
| # Center around 0.5: >0.5 improves, <0.5 penalizes | |
| adjustments += (coherence - 0.5) * w_coh | |
| total_weight_used += w_coh | |
| # 7. [NEW] GraphRAG Context Score (15%) | |
| # This uses historical knowledge from the knowledge graph | |
| w_graph = self.weights.get('graph_context', 0.15) | |
| graph_context_data = rule_results.get('graph_context_data', {}) | |
| if graph_context_data and graph_context_data.get('confidence', 0) > 0: | |
| # Use combined score from GraphRAG | |
| graph_score = graph_context_data.get('combined_score', 0.5) | |
| confidence = graph_context_data.get('confidence', 0) | |
| # Scale adjustment by confidence (0 confidence = no effect) | |
| adjustment_factor = (graph_score - 0.5) * w_graph * confidence | |
| adjustments += adjustment_factor | |
| total_weight_used += w_graph * confidence # Partial weight based on confidence | |
| # 8. [NEW] Linguistic Markers Analysis (sensationalism penalty) | |
| # Penalize sensational language heavily, reward doubt markers (critical thinking) | |
| linguistic = rule_results.get('linguistic_markers', {}) | |
| sensationalism_count = linguistic.get('sensationalism', 0) | |
| doubt_count = linguistic.get('doubt', 0) | |
| certainty_count = linguistic.get('certainty', 0) | |
| # Sensationalism is a strong negative signal | |
| if sensationalism_count > 0: | |
| penalty = min(0.20, sensationalism_count * 0.05) # Max 20% penalty | |
| adjustments -= penalty | |
| # Excessive certainty without sources is suspicious | |
| if certainty_count > 2 and not fact_checks: | |
| adjustments -= 0.05 | |
| # Doubt markers indicate critical/questioning tone (slight positive) | |
| if doubt_count > 0: | |
| adjustments += min(0.05, doubt_count * 0.02) | |
| # Final calculation | |
| # Base 0.5 + sum of weighted adjustments | |
| # Adjustments are in range [-weight, +weight] | |
| final_score = 0.5 + adjustments | |
| return max(0.0, min(1.0, final_score)) | |
| # --- [NEW] TREC Evidence Retrieval Methods --- | |
| def retrieve_evidence( | |
| self, | |
| claim: str, | |
| k: int = 10, | |
| model: str = "bm25" | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve evidence documents for a given claim using TREC methodology. | |
| This integrates the classic IR evaluation framework (TREC AP88-90) | |
| with the neuro-symbolic credibility verification system. | |
| Args: | |
| claim: The claim or statement to verify | |
| k: Number of evidence documents to retrieve | |
| model: Retrieval model ('bm25', 'qld', 'tfidf') | |
| Returns: | |
| List of evidence dictionaries with doc_id, text, score, rank | |
| """ | |
| if not self.trec_retriever: | |
| return [] | |
| try: | |
| result = self.trec_retriever.retrieve_evidence( | |
| claim=claim, | |
| k=k, | |
| model=model | |
| ) | |
| # Convert Evidence objects to dictionaries | |
| evidences = [e.to_dict() for e in result.evidences] | |
| # Add to ontology if available | |
| if self.ontology_manager: | |
| for e in result.evidences[:3]: # Top 3 only | |
| self.ontology_manager.add_evidence( | |
| evidence_id=e.doc_id, | |
| source=e.source or "trec_corpus", | |
| content=e.text[:500], | |
| score=e.score | |
| ) | |
| return evidences | |
| except Exception as ex: | |
| print(f"[SysCRED] Evidence retrieval error: {ex}") | |
| return [] | |
| def verify_with_evidence( | |
| self, | |
| claim: str, | |
| k: int = 5 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Complete fact-checking pipeline with evidence retrieval. | |
| Combines: | |
| 1. TREC-style evidence retrieval | |
| 2. NLP analysis of claim | |
| 3. Evidence-claim comparison | |
| 4. Credibility scoring | |
| Args: | |
| claim: The claim to verify | |
| k: Number of evidence documents | |
| Returns: | |
| Verification result with evidence, analysis, and score | |
| """ | |
| result = { | |
| 'claim': claim, | |
| 'evidences': [], | |
| 'nlp_analysis': {}, | |
| 'evidence_support_score': 0.0, | |
| 'verification_verdict': 'UNKNOWN', | |
| 'confidence': 0.0 | |
| } | |
| # 1. Retrieve evidence | |
| evidences = self.retrieve_evidence(claim, k=k) | |
| result['evidences'] = evidences | |
| # 2. NLP analysis of claim | |
| cleaned_claim = self.preprocess(claim) | |
| result['nlp_analysis'] = self.nlp_analysis(cleaned_claim) | |
| # 3. Calculate evidence support score | |
| if evidences: | |
| # Use semantic similarity if SBERT available | |
| if self.coherence_model: | |
| try: | |
| claim_embedding = self.coherence_model.encode(claim) | |
| evidence_texts = [e.get('text', '') for e in evidences] | |
| evidence_embeddings = self.coherence_model.encode(evidence_texts) | |
| from sentence_transformers import util | |
| similarities = util.pytorch_cos_sim(claim_embedding, evidence_embeddings)[0] | |
| avg_similarity = similarities.mean().item() | |
| max_similarity = similarities.max().item() | |
| # Evidence support based on similarity | |
| result['evidence_support_score'] = round(max_similarity, 4) | |
| result['average_evidence_similarity'] = round(avg_similarity, 4) | |
| except Exception as e: | |
| print(f"[SysCRED] Similarity error: {e}") | |
| # Fallback: use retrieval scores | |
| result['evidence_support_score'] = evidences[0].get('score', 0) if evidences else 0 | |
| else: | |
| # Fallback: use retrieval scores | |
| result['evidence_support_score'] = evidences[0].get('score', 0) if evidences else 0 | |
| # 4. Determine verdict | |
| support_score = result['evidence_support_score'] | |
| if support_score > 0.7: | |
| result['verification_verdict'] = 'SUPPORTED' | |
| result['confidence'] = support_score | |
| elif support_score > 0.5: | |
| result['verification_verdict'] = 'PARTIALLY_SUPPORTED' | |
| result['confidence'] = support_score | |
| elif support_score > 0.3: | |
| result['verification_verdict'] = 'INSUFFICIENT_EVIDENCE' | |
| result['confidence'] = 0.5 | |
| else: | |
| result['verification_verdict'] = 'NOT_SUPPORTED' | |
| result['confidence'] = 1 - support_score | |
| return result | |
| # --- End TREC Evidence Methods --- | |
| def generate_report( | |
| self, | |
| input_data: str, | |
| cleaned_text: str, | |
| rule_results: Dict, | |
| nlp_results: Dict, | |
| external_data: ExternalData, | |
| overall_score: float, | |
| web_content: Optional[WebContent] = None, | |
| graph_context: str = "", # [NEW] | |
| evidences: List[Dict[str, Any]] = None # [NEW] TREC evidences | |
| ) -> Dict[str, Any]: | |
| """Generate the final evaluation report.""" | |
| # Determine credibility level | |
| if overall_score >= 0.75: | |
| niveau = "Élevée" | |
| elif overall_score >= 0.55: | |
| niveau = "Moyenne-Élevée" | |
| elif overall_score >= 0.45: | |
| niveau = "Moyenne" | |
| elif overall_score >= 0.25: | |
| niveau = "Faible-Moyenne" | |
| else: | |
| niveau = "Faible" | |
| report = { | |
| 'idRapport': f"report_{int(datetime.datetime.now().timestamp())}", | |
| 'informationEntree': input_data, | |
| 'dateGeneration': datetime.datetime.now().isoformat(), | |
| 'scoreCredibilite': round(overall_score, 2), | |
| 'niveauCredibilite': niveau, | |
| 'resumeAnalyse': "", | |
| 'detailsScore': { | |
| 'base': 0.5, | |
| 'weights': self.weights, | |
| 'factors': self._get_score_factors(rule_results, nlp_results) | |
| }, | |
| 'sourcesUtilisees': [], | |
| 'reglesAppliquees': rule_results, | |
| 'analyseNLP': { | |
| 'sentiment': nlp_results.get('sentiment'), | |
| 'bias_analysis': nlp_results.get('bias_analysis'), | |
| 'named_entities_count': len(nlp_results.get('named_entities', [])), | |
| 'coherence_score': nlp_results.get('coherence_score'), | |
| 'sentiment_explanation_preview': (nlp_results.get('sentiment_explanation') or [])[:3] | |
| }, | |
| # [NEW] GraphRAG section | |
| 'graphRAG': { | |
| 'context_text': graph_context, | |
| 'context_score': rule_results.get('graph_context_data', {}).get('combined_score'), | |
| 'confidence': rule_results.get('graph_context_data', {}).get('confidence', 0), | |
| 'has_history': rule_results.get('graph_context_data', {}).get('has_history', False), | |
| 'history_count': rule_results.get('graph_context_data', {}).get('history_count', 0), | |
| 'similar_claims_count': rule_results.get('graph_context_data', {}).get('similar_count', 0) | |
| }, | |
| # [NEW] TREC Evidence section | |
| 'evidences': evidences or [], | |
| 'metadonnees': {} | |
| } | |
| # Add web content metadata if available | |
| if web_content: | |
| if web_content.success: | |
| report['metadonnees']['page_title'] = web_content.title | |
| report['metadonnees']['meta_description'] = web_content.meta_description | |
| report['metadonnees']['links_count'] = len(web_content.links) | |
| else: | |
| report['metadonnees']['warning'] = f"Content scrape failed: {web_content.error}" | |
| # Generate summary | |
| summary_parts = [] | |
| if web_content and not web_content.success: | |
| summary_parts.append(f"⚠️ ATTENTION: Impossible de lire le texte de la page ({web_content.error}). Analyse basée uniquement sur la réputation du domaine.") | |
| if overall_score > 0.75: | |
| summary_parts.append("L'analyse suggère une crédibilité ÉLEVÉE.") | |
| elif overall_score > 0.55: | |
| summary_parts.append("L'analyse suggère une crédibilité MOYENNE à ÉLEVÉE.") | |
| elif overall_score > 0.45: | |
| summary_parts.append("L'analyse suggère une crédibilité MOYENNE.") | |
| elif overall_score > 0.25: | |
| summary_parts.append("L'analyse suggère une crédibilité FAIBLE à MOYENNE.") | |
| else: | |
| summary_parts.append("L'analyse suggère une crédibilité FAIBLE.") | |
| if external_data.source_reputation != 'Unknown': | |
| summary_parts.append(f"Réputation source : {external_data.source_reputation}.") | |
| if external_data.domain_age_days: | |
| years = external_data.domain_age_days / 365 | |
| summary_parts.append(f"Âge du domaine : {years:.1f} ans.") | |
| if external_data.fact_checks: | |
| summary_parts.append(f"{len(external_data.fact_checks)} vérification(s) de faits trouvée(s).") | |
| report['resumeAnalyse'] = " ".join(summary_parts) | |
| # List sources used | |
| if self.is_url(input_data): | |
| report['sourcesUtilisees'].append({ | |
| 'type': 'Primary URL', | |
| 'url': input_data | |
| }) | |
| report['sourcesUtilisees'].append({ | |
| 'type': 'WHOIS Lookup', | |
| 'status': 'Success' if (external_data.domain_info and external_data.domain_info.success) else 'Failed/N/A' | |
| }) | |
| report['sourcesUtilisees'].append({ | |
| 'type': 'Fact Check API', | |
| 'results_count': len(external_data.fact_checks) | |
| }) | |
| # [NEW] Add TREC evidence source | |
| if evidences: | |
| report['sourcesUtilisees'].append({ | |
| 'type': 'TREC Evidence Retrieval', | |
| 'method': 'BM25/TF-IDF', | |
| 'corpus': 'AP88-90', | |
| 'results_count': len(evidences) | |
| }) | |
| return report | |
| def _get_score_factors(self, rule_results: Dict, nlp_results: Dict) -> List[Dict]: | |
| """Get list of factors that influenced the score (For UI).""" | |
| factors = [] | |
| # 1. Reputation | |
| rep = rule_results['source_analysis'].get('reputation') | |
| if rep and "N/A" not in rep: | |
| factors.append({ | |
| 'factor': 'Source Reputation', | |
| 'value': rep, | |
| 'weight': f"{int(self.weights.get('source_reputation',0)*100)}%", | |
| 'impact': '+' if rep == 'High' else ('-' if rep == 'Low' else '0') | |
| }) | |
| # 2. Fact Checks | |
| if rule_results.get('fact_checking'): | |
| factors.append({ | |
| 'factor': 'Fact Checks', | |
| 'value': f"{len(rule_results['fact_checking'])} found", | |
| 'weight': f"{int(self.weights.get('fact_check',0)*100)}%", | |
| 'impact': 'Variable' | |
| }) | |
| # 3. Entities | |
| n_ent = len(nlp_results.get('named_entities', [])) | |
| if n_ent > 0: | |
| factors.append({ | |
| 'factor': 'Entity Presence', | |
| 'value': f"{n_ent} entities", | |
| 'weight': f"{int(self.weights.get('entity_presence',0)*100)}%", | |
| 'impact': '+' | |
| }) | |
| # 4. Sentiment | |
| sent = nlp_results.get('sentiment', {}) | |
| if sent: | |
| factors.append({ | |
| 'factor': 'Sentiment Neutrality', | |
| 'value': f"{sent.get('label')} ({sent.get('score',0):.2f})", | |
| 'weight': f"{int(self.weights.get('sentiment_neutrality',0)*100)}%", | |
| 'impact': '-' if sent.get('score', 0) > 0.9 else '0' | |
| }) | |
| # 5. GraphRAG Context (NEW) | |
| graph_data = rule_results.get('graph_context_data', {}) | |
| if graph_data.get('confidence', 0) > 0: | |
| graph_score = graph_data.get('combined_score', 0.5) | |
| impact = '+' if graph_score > 0.6 else ('-' if graph_score < 0.4 else '0') | |
| factors.append({ | |
| 'factor': 'Graph Context (History)', | |
| 'value': f"Score: {graph_score:.2f}, Confidence: {graph_data.get('confidence', 0):.0%}", | |
| 'weight': f"{int(self.weights.get('graph_context',0)*100)}%", | |
| 'impact': impact, | |
| 'history_count': graph_data.get('history_count', 0), | |
| 'similar_count': graph_data.get('similar_count', 0) | |
| }) | |
| return factors | |
| def verify_information(self, input_data: str) -> Dict[str, Any]: | |
| """ | |
| Main pipeline to verify credibility of input data. | |
| Args: | |
| input_data: URL or text to verify | |
| Returns: | |
| Complete evaluation report | |
| """ | |
| if not isinstance(input_data, str) or not input_data.strip(): | |
| return {"error": "L'entrée doit être une chaîne non vide."} | |
| print(f"\n[SysCRED] === Vérification: {input_data[:100]}... ===") | |
| # 1. Determine input type and fetch content | |
| text_to_analyze = "" | |
| web_content = None | |
| is_url = self.is_url(input_data) | |
| if is_url: | |
| print("[SysCRED] Fetching web content...") | |
| web_content = self.api_clients.fetch_web_content(input_data) | |
| if web_content.success: | |
| text_to_analyze = web_content.text_content | |
| print(f"[SysCRED] ✓ Content fetched: {len(text_to_analyze)} chars") | |
| else: | |
| print(f"[SysCRED] ⚠ Fetch failed: {web_content.error}") | |
| print("[SysCRED] Proceeding with Domain/Metadata analysis only.") | |
| text_to_analyze = "" | |
| # We don't return error anymore, we proceed! | |
| else: | |
| text_to_analyze = input_data | |
| # 2. Preprocess text | |
| cleaned_text = self.preprocess(text_to_analyze) | |
| # Only error on empty text if it wasn't a failed web fetch | |
| # If web fetch failed, we proceed with empty text to give metadata analysis | |
| if not cleaned_text and not (is_url and web_content and not web_content.success): | |
| return {"error": "Le texte est vide après prétraitement."} | |
| print(f"[SysCRED] Preprocessed text: {len(cleaned_text)} chars") | |
| # Determine best query for Fact Checking | |
| fact_check_query = input_data | |
| if text_to_analyze and len(text_to_analyze) > 10: | |
| # Use start of text if available | |
| fact_check_query = text_to_analyze[:200] | |
| elif is_url and web_content and web_content.title: | |
| # Fallback to page title if text is missing (e.g. 403) | |
| fact_check_query = web_content.title | |
| # 3. Fetch external data | |
| print(f"[SysCRED] Fetching external data (Query: {fact_check_query[:50]}...)...") | |
| external_data = self.api_clients.fetch_external_data(input_data, fc_query=fact_check_query) | |
| # [FIX] Handle text-only input reputation | |
| if not is_url: | |
| external_data.source_reputation = "N/A (User Input)" | |
| print(f"[SysCRED] ✓ Reputation: {external_data.source_reputation}, Age: {external_data.domain_age_days} days") | |
| # 4. Rule-based analysis | |
| print("[SysCRED] Running rule-based analysis...") | |
| rule_results = self.rule_based_analysis(cleaned_text, external_data) | |
| # 5. [MOVED] GraphRAG Context Retrieval (Before NLP for context) | |
| graph_context = "" | |
| similar_uris = [] | |
| graph_context_data = {} | |
| if self.graph_rag and 'source_analysis' in rule_results: | |
| domain = rule_results['source_analysis'].get('domain', '') | |
| # Pass keywords for text search if domain is empty or generic | |
| keywords = [] | |
| if cleaned_text: | |
| # Extract meaningful keywords (filter out short words) | |
| keywords = [w for w in cleaned_text.split()[:10] if len(w) > 4] | |
| # Get text context for display | |
| context = self.graph_rag.get_context(domain, keywords=keywords) | |
| graph_context = context.get('full_text', '') | |
| similar_uris = context.get('similar_uris', []) | |
| # Get numerical score for integration into scoring | |
| graph_context_data = self.graph_rag.compute_context_score(domain, keywords=keywords) | |
| # Add to rule_results for use in calculate_overall_score | |
| rule_results['graph_context_data'] = graph_context_data | |
| if graph_context_data.get('has_history'): | |
| print(f"[SysCRED] GraphRAG: Domain has {graph_context_data['history_count']} prior evaluations, " | |
| f"avg score: {graph_context_data['history_score']:.2f}") | |
| if graph_context_data.get('similar_count', 0) > 0: | |
| print(f"[SysCRED] GraphRAG: Found {graph_context_data['similar_count']} similar claims") | |
| # 6. NLP analysis | |
| print("[SysCRED] Running NLP analysis...") | |
| nlp_results = self.nlp_analysis(cleaned_text) | |
| # 6.5 [NER] Named Entity Recognition | |
| ner_entities = {} | |
| if self.ner_analyzer and cleaned_text: | |
| try: | |
| ner_entities = self.ner_analyzer.extract_entities(cleaned_text) | |
| total = sum(len(v) for v in ner_entities.values() if isinstance(v, list)) | |
| print(f"[SysCRED] NER: {total} entites detectees") | |
| except Exception as e: | |
| print(f"[SysCRED] NER failed: {e}") | |
| # 6.6 [E-E-A-T] Experience-Expertise-Authority-Trust scoring | |
| eeat_scores = {} | |
| if self.eeat_calculator: | |
| try: | |
| url_for_eeat = input_data if is_url else "" | |
| domain_age_years = None | |
| if external_data.domain_age_days: | |
| domain_age_years = external_data.domain_age_days / 365.0 | |
| eeat_raw = self.eeat_calculator.calculate( | |
| url=url_for_eeat, | |
| text=cleaned_text, | |
| nlp_analysis=nlp_results, | |
| fact_checks=rule_results.get('fact_checking', []), | |
| domain_age_years=domain_age_years, | |
| has_https=input_data.startswith("https://") if is_url else False | |
| ) | |
| eeat_scores = eeat_raw.to_dict() if hasattr(eeat_raw, 'to_dict') else ( | |
| eeat_raw if isinstance(eeat_raw, dict) else vars(eeat_raw) | |
| ) | |
| print(f"[SysCRED] E-E-A-T score: {eeat_scores.get('overall', 'N/A')}") | |
| except Exception as e: | |
| print(f"[SysCRED] E-E-A-T failed: {e}") | |
| # 7. Calculate score (Now includes GraphRAG context) | |
| overall_score = self.calculate_overall_score(rule_results, nlp_results) | |
| print(f"[SysCRED] ✓ Credibility score: {overall_score:.2f}") | |
| # 8. Generate report (Updated to include context) | |
| report = self.generate_report( | |
| input_data, cleaned_text, rule_results, | |
| nlp_results, external_data, overall_score, web_content, | |
| graph_context=graph_context | |
| ) | |
| # [NER + E-E-A-T] Always include in report (even if empty) | |
| report['ner_entities'] = ner_entities | |
| report['eeat_scores'] = eeat_scores | |
| # Add similar URIs to report for ontology linking | |
| if similar_uris: | |
| report['similar_claims_uris'] = similar_uris | |
| # 9. Save to ontology | |
| if self.ontology_manager: | |
| try: | |
| report_uri = self.ontology_manager.add_evaluation_triplets(report) | |
| report['ontology_uri'] = report_uri | |
| self.ontology_manager.save_data() | |
| except Exception as e: | |
| print(f"[SysCRED] Ontology save failed: {e}") | |
| print("[SysCRED] === Vérification terminée ===\n") | |
| return report | |
| # --- Main / Testing --- | |
| if __name__ == "__main__": | |
| import json | |
| print("=" * 60) | |
| print("SysCRED v2.0 - Système de Vérification de Crédibilité") | |
| print("(c) Dominique S. Loyer - PhD Thesis Prototype") | |
| print("=" * 60 + "\n") | |
| # Initialize system (without ML models for quick testing) | |
| system = CredibilityVerificationSystem( | |
| ontology_base_path="/Users/bk280625/documents041025/MonCode/sysCRED_onto26avrtil.ttl", | |
| ontology_data_path="/Users/bk280625/documents041025/MonCode/ontology/sysCRED_data.ttl", | |
| load_ml_models=False # Set to True for full analysis | |
| ) | |
| # Test cases | |
| test_cases = { | |
| "Test URL Crédible": "https://www.lemonde.fr", | |
| "Test URL Inconnu": "https://example.com/article", | |
| "Test Texte Simple": "This is a verified and authentic news report.", | |
| "Test Texte Suspect": "Shocking conspiracy revealed! They don't want you to know this secret!", | |
| } | |
| results = {} | |
| for name, test_input in test_cases.items(): | |
| print(f"\n{'='*50}") | |
| print(f"Test: {name}") | |
| print('='*50) | |
| result = system.verify_information(test_input) | |
| results[name] = result | |
| if 'error' not in result: | |
| print(f"\nScore: {result['scoreCredibilite']}") | |
| print(f"Résumé: {result['resumeAnalyse']}") | |
| else: | |
| print(f"Erreur: {result['error']}") | |
| print("\n" + "="*60) | |
| print("Résumé des tests:") | |
| print("="*60) | |
| for name, result in results.items(): | |
| if 'error' not in result: | |
| print(f" {name}: Score = {result['scoreCredibilite']:.2f}") | |
| else: | |
| print(f" {name}: ERREUR") | |