Spaces:
Running
Running
File size: 44,597 Bytes
e70050b ea9303b 3700c55 ea9303b 3700c55 ea9303b 3700c55 ea9303b 3700c55 ea9303b 3700c55 ea9303b 3700c55 ea9303b 3700c55 e70050b ea9303b e70050b ea9303b e70050b ea9303b e70050b ea9303b e70050b ea9303b e70050b ea9303b e70050b ea9303b e70050b ea9303b e70050b ea9303b e70050b ea9303b e70050b ea9303b e70050b ea9303b e70050b 8e97fc5 e70050b 8e97fc5 ea9303b e70050b ea9303b e70050b ea9303b e70050b 8e97fc5 e70050b ea9303b e70050b 8e97fc5 e70050b 8e97fc5 e70050b ea9303b 8e97fc5 e70050b ea9303b e70050b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 | # -*- coding: utf-8 -*-
"""
Verification System Module - SysCRED v2.0
==========================================
Main credibility verification system with real API integration.
Refactored from sys-cred-Python-27avril2025.py
(c) Dominique S. Loyer - PhD Thesis Prototype
Citation Key: loyerModelingHybridSystem2025
"""
import re
import json
import datetime
from typing import Optional, Dict, Any, List
from urllib.parse import urlparse
# Transformers and ML
try:
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
import numpy as np
import torch
from lime.lime_text import LimeTextExplainer
HAS_ML = True
except ImportError:
HAS_ML = False
print("Warning: ML libraries not fully installed. Run: pip install transformers torch lime numpy")
try:
from sentence_transformers import SentenceTransformer, util
HAS_SBERT = True
except ImportError:
HAS_SBERT = False
print("Warning: sentence-transformers not installed. Semantic coherence will use heuristics.")
# Local imports - Support both syscred.module and relative imports
try:
from syscred.api_clients import ExternalAPIClients, WebContent, ExternalData
from syscred.ontology_manager import OntologyManager
from syscred.seo_analyzer import SEOAnalyzer
from syscred.graph_rag import GraphRAG
from syscred.trec_retriever import TRECRetriever, Evidence, RetrievalResult
from syscred import config
except ImportError:
from api_clients import ExternalAPIClients, WebContent, ExternalData
from ontology_manager import OntologyManager
from seo_analyzer import SEOAnalyzer
from graph_rag import GraphRAG
from trec_retriever import TRECRetriever, Evidence, RetrievalResult
import config
# [NER + E-E-A-T] Imports optionnels - n'interferent pas avec les imports principaux
HAS_NER_EEAT = False
try:
from syscred.ner_analyzer import NERAnalyzer
from syscred.eeat_calculator import EEATCalculator, EEATScore
HAS_NER_EEAT = True
except ImportError:
try:
from ner_analyzer import NERAnalyzer
from eeat_calculator import EEATCalculator, EEATScore
HAS_NER_EEAT = True
except ImportError:
pass
class CredibilityVerificationSystem:
"""
Système neuro-symbolique de vérification de crédibilité.
Combine:
- Analyse basée sur des règles (symbolique, transparent)
- Analyse NLP/IA (apprentissage automatique)
- Ontologie OWL pour la traçabilité
- APIs externes pour les données réelles
"""
def __init__(
self,
google_api_key: Optional[str] = None,
ontology_base_path: Optional[str] = None,
ontology_data_path: Optional[str] = None,
load_ml_models: bool = True
):
"""
Initialize the credibility verification system.
Args:
google_api_key: API key for Google Fact Check (optional)
ontology_base_path: Path to base ontology TTL file
ontology_data_path: Path to store accumulated data
load_ml_models: Whether to load ML models (disable for testing)
"""
print("[SysCRED] Initializing Credibility Verification System v2.0...")
# Initialize API clients
self.api_clients = ExternalAPIClients(google_api_key=google_api_key)
print("[SysCRED] API clients initialized")
# Initialize ontology manager
self.ontology_manager = None
if ontology_base_path or ontology_data_path:
try:
self.ontology_manager = OntologyManager(
base_ontology_path=ontology_base_path,
data_path=ontology_data_path
)
self.graph_rag = GraphRAG(self.ontology_manager) # [NEW] Init GraphRAG
print("[SysCRED] Ontology manager & GraphRAG initialized")
except Exception as e:
print(f"[SysCRED] Ontology manager disabled: {e}")
self.graph_rag = None
else:
self.graph_rag = None
# [NEW] Initialize TREC Retriever for evidence gathering
self.trec_retriever = None
try:
self.trec_retriever = TRECRetriever(
index_path=config.Config.TREC_INDEX_PATH,
corpus_path=config.Config.TREC_CORPUS_PATH,
use_stemming=True,
enable_prf=config.Config.ENABLE_PRF,
prf_top_docs=config.Config.PRF_TOP_DOCS,
prf_expansion_terms=config.Config.PRF_EXPANSION_TERMS
)
print("[SysCRED] TREC Retriever initialized for evidence gathering")
except Exception as e:
print(f"[SysCRED] TREC Retriever disabled: {e}")
# Initialize ML models
self.sentiment_pipeline = None
self.ner_pipeline = None
self.bias_tokenizer = None
self.bias_model = None
self.coherence_model = None
self.explainer = None
if load_ml_models and HAS_ML:
self._load_ml_models()
# Weights for score calculation (configurable)
# Weights for score calculation (Loaded from Config)
self.weights = config.Config.SCORE_WEIGHTS
print(f"[SysCRED] Using weights: {self.weights}")
# [NER + E-E-A-T] Initialize analyzers
self.ner_analyzer = None
self.eeat_calculator = None
if HAS_NER_EEAT:
try:
self.ner_analyzer = NERAnalyzer()
self.eeat_calculator = EEATCalculator()
print("[SysCRED] NER analyzer initialized")
print("[SysCRED] E-E-A-T calculator initialized")
except Exception as e:
print(f"[SysCRED] NER/E-E-A-T init failed: {e}")
print("[SysCRED] System ready!")
def _load_ml_models(self):
"""Load ML models for NLP analysis."""
print("[SysCRED] Loading ML models (this may take a moment)...")
try:
# Sentiment analysis - modèle ultra-léger
self.sentiment_pipeline = pipeline(
"sentiment-analysis",
model="distilbert-base-uncased-finetuned-sst-2-english",
device=-1,
model_kwargs={"low_cpu_mem_usage": True}
)
print("[SysCRED] ✓ Sentiment model loaded (distilbert-base)")
except Exception as e:
print(f"[SysCRED] ✗ Sentiment model failed: {e}")
try:
# NER pipeline - modèle plus léger
self.ner_pipeline = pipeline(
"ner",
model="dslim/bert-base-NER",
grouped_entities=True,
device=-1,
model_kwargs={"low_cpu_mem_usage": True}
)
print("[SysCRED] ✓ NER model loaded (dslim/bert-base-NER)")
except Exception as e:
print(f"[SysCRED] ✗ NER model failed: {e}")
try:
# Bias detection - modèle plus léger si possible
bias_model_name = "typeform/distilbert-base-uncased-mnli"
self.bias_tokenizer = AutoTokenizer.from_pretrained(bias_model_name)
self.bias_model = AutoModelForSequenceClassification.from_pretrained(bias_model_name)
print("[SysCRED] ✓ Bias model loaded (distilbert-mnli)")
except Exception as e:
print(f"[SysCRED] ✗ Bias model failed: {e}. Using heuristics.")
try:
# Semantic Coherence - modèle MiniLM (déjà léger)
if HAS_SBERT:
self.coherence_model = SentenceTransformer('all-MiniLM-L6-v2')
print("[SysCRED] ✓ Coherence model loaded (SBERT MiniLM)")
except Exception as e:
print(f"[SysCRED] ✗ Coherence model failed: {e}")
try:
# LIME explainer
self.explainer = LimeTextExplainer(class_names=['NEGATIVE', 'POSITIVE'])
print("[SysCRED] ✓ LIME explainer loaded")
except Exception as e:
print(f"[SysCRED] ✗ LIME explainer failed: {e}")
def is_url(self, text: str) -> bool:
"""Check if a string is a valid URL."""
try:
result = urlparse(text)
return all([result.scheme, result.netloc])
except ValueError:
return False
def preprocess(self, text: str) -> str:
"""Clean and normalize text for analysis."""
if not isinstance(text, str):
return ""
# Remove URLs
text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text)
# Keep basic punctuation
text = re.sub(r'[^\w\s\.\?,!]', '', text)
return text.lower().strip()
def rule_based_analysis(self, text: str, external_data: ExternalData) -> Dict[str, Any]:
"""
Perform rule-based analysis using symbolic reasoning.
Args:
text: Preprocessed text to analyze
external_data: Data from external APIs
Returns:
Dictionary with rule-based analysis results
"""
results = {
'linguistic_markers': {},
'source_analysis': {},
'timeliness_flags': [],
'fact_checking': []
}
# 1. Linguistic markers
sensational_words = [
'shocking', 'revealed', 'conspiracy', 'amazing', 'secret',
'breakthrough', 'miracle', 'unbelievable', 'exclusive', 'urgent'
]
certainty_words = [
'verified', 'authentic', 'credible', 'proven', 'fact',
'confirmed', 'official', 'legitimate', 'established'
]
doubt_words = [
'hoax', 'false', 'fake', 'unproven', 'rumor', 'allegedly',
'claim', 'debunked', 'misleading', 'disputed'
]
text_lower = text.lower()
results['linguistic_markers']['sensationalism'] = sum(
1 for word in sensational_words if word in text_lower
)
results['linguistic_markers']['certainty'] = sum(
1 for word in certainty_words if word in text_lower
)
results['linguistic_markers']['doubt'] = sum(
1 for word in doubt_words if word in text_lower
)
# 2. Source analysis from external data
results['source_analysis']['reputation'] = external_data.source_reputation
results['source_analysis']['domain_age_days'] = external_data.domain_age_days
if external_data.domain_info:
results['source_analysis']['registrar'] = external_data.domain_info.registrar
results['source_analysis']['domain'] = external_data.domain_info.domain
# 3. Timeliness flags
if external_data.domain_age_days is not None:
if external_data.domain_age_days < 180:
results['timeliness_flags'].append('Source domain is relatively new (<6 months)')
elif external_data.domain_age_days < 365:
results['timeliness_flags'].append('Source domain is less than 1 year old')
# 4. Fact checking results
for fc in external_data.fact_checks:
results['fact_checking'].append({
'claim': fc.claim,
'rating': fc.rating,
'publisher': fc.publisher,
'url': fc.url
})
return results
def nlp_analysis(self, text: str) -> Dict[str, Any]:
"""
Perform NLP-based analysis using ML models.
Args:
text: Preprocessed text to analyze
Returns:
Dictionary with NLP analysis results
"""
results = {
'sentiment': None,
'sentiment_explanation': None,
'bias_analysis': {'score': None, 'label': 'Unavailable'},
'named_entities': [],
'coherence_score': None
}
if not text:
results['sentiment'] = {'label': 'Neutral', 'score': 0.5}
return results
# 1. Sentiment analysis with LIME explanation
if self.sentiment_pipeline:
try:
main_pred = self.sentiment_pipeline(text[:512])[0]
results['sentiment'] = main_pred
if self.explainer:
def predict_proba(texts):
if isinstance(texts, str):
texts = [texts]
predictions = self.sentiment_pipeline(list(texts))
probs = []
for pred in predictions:
if pred['label'] == 'POSITIVE':
probs.append([1 - pred['score'], pred['score']])
else:
probs.append([pred['score'], 1 - pred['score']])
return np.array(probs)
explanation = self.explainer.explain_instance(
text[:512], predict_proba, num_features=6
)
results['sentiment_explanation'] = explanation.as_list()
except Exception as e:
print(f"[NLP] Sentiment error: {e}")
results['sentiment'] = {'label': 'Error', 'score': 0.0}
# 2. Bias analysis
results['bias_analysis'] = self._analyze_bias(text)
# 3. Named Entity Recognition
if self.ner_pipeline:
try:
entities = self.ner_pipeline(text[:512])
results['named_entities'] = entities
except Exception as e:
print(f"[NLP] NER error: {e}")
# 4. Semantic Coherence
results['coherence_score'] = self._calculate_coherence(text)
return results
def _analyze_bias(self, text: str) -> Dict[str, Any]:
"""Analyze text for bias using ML or heuristics."""
# Method 1: ML Model
if self.bias_model and self.bias_tokenizer:
try:
inputs = self.bias_tokenizer(
text[:512], return_tensors="pt",
truncation=True, max_length=512, padding=True
)
with torch.no_grad():
logits = self.bias_model(**inputs).logits
probs = torch.softmax(logits, dim=1)[0]
# Label mapping depends on model, usually [Non-biased, Biased]
bias_score = probs[1].item()
label = " biased" if bias_score > 0.5 else "Non-biased"
return {'score': bias_score, 'label': label, 'method': 'ML (d4data)'}
except Exception as e:
print(f"[NLP] ML Bias error: {e}")
# Method 2: Heuristics
biased_words = [
'radical', 'extremist', 'disgraceful', 'shameful', 'corrupt',
'insane', 'idiot', 'disaster', 'propaganda', 'dictator',
'puppet', 'regime', 'tyrant', 'treason', 'traitor'
]
text_lower = text.lower()
count = sum(1 for w in biased_words if w in text_lower)
score = min(1.0, count * 0.15)
label = "Potentially Biased" if score > 0.3 else "Neutral"
return {'score': score, 'label': label, 'method': 'Heuristic'}
def _calculate_coherence(self, text: str) -> float:
"""Calculate semantic coherence score."""
sentences = re.split(r'[.!?]+', text)
sentences = [s.strip() for s in sentences if len(s.split()) > 3]
if len(sentences) < 2:
return 0.7 # Default to neutral/good for short text, not perfect 1.0
# Method 1: SBERT Semantic Similarity
if self.coherence_model and HAS_SBERT:
try:
embeddings = self.coherence_model.encode(sentences[:10]) # Limit to 10
sims = []
for i in range(len(embeddings) - 1):
sim = util.pytorch_cos_sim(embeddings[i], embeddings[i+1])
sims.append(sim.item())
return sum(sims) / len(sims) if sims else 0.5
except Exception as e:
print(f"[NLP] SBERT error: {e}")
# Method 2: Heuristic (Sentence Length Variance & Repetition)
lengths = [len(s.split()) for s in sentences]
avg_len = sum(lengths) / len(lengths)
variance = sum((l - avg_len) ** 2 for l in lengths) / len(lengths)
# High variance suggests simpler/choppier writing usually
score = 0.8
if variance > 100: score -= 0.2
if avg_len < 5: score -= 0.2
return max(0.0, score)
def calculate_overall_score(
self,
rule_results: Dict,
nlp_results: Dict
) -> float:
"""
Calculate overall credibility score based on User-Defined Metrics.
"""
score = 0.5 # Start neutral
adjustments = 0.0
total_weight_used = 0.0
# 1. Source Reputation (25%)
w_rep = self.weights.get('source_reputation', 0.25)
reputation = rule_results['source_analysis'].get('reputation', 'Unknown')
if reputation != 'Unknown' and "N/A" not in reputation:
if reputation == 'High':
adjustments += w_rep * 1.0 # Full boost
elif reputation == 'Low':
adjustments -= w_rep * 1.0 # Full penalty
elif reputation == 'Medium':
adjustments += w_rep * 0.2 # Slight boost
total_weight_used += w_rep
# 2. Domain Age (10%)
w_age = self.weights.get('domain_age', 0.10)
domain_age = rule_results['source_analysis'].get('domain_age_days')
if domain_age is not None:
if domain_age > 730: # > 2 years
adjustments += w_age
elif domain_age < 90: # < 3 months
adjustments -= w_age
total_weight_used += w_age
# 3. Fact Check (20%)
w_fc = self.weights.get('fact_check', 0.20)
fact_checks = rule_results.get('fact_checking', [])
if fact_checks:
fc_score = 0
for fc in fact_checks:
rating = fc.get('rating', '').lower()
if rating in ['true', 'verified', 'correct']:
fc_score += 1
elif rating in ['false', 'fake', 'incorrect']:
fc_score -= 1
# Normalize fc_score (-1 to 1) roughly
if fc_score > 0: adjustments += w_fc
elif fc_score < 0: adjustments -= w_fc
total_weight_used += w_fc
# 4. Sentiment Neutrality (15%)
# Extreme sentiment = lower score
w_sent = self.weights.get('sentiment_neutrality', 0.15)
sentiment = nlp_results.get('sentiment', {})
if sentiment:
s_score = sentiment.get('score', 0.5)
# If extremely positive or negative (>0.9), penalize
if s_score > 0.9:
adjustments -= w_sent * 0.5 # Penalty for extremism
else:
adjustments += w_sent * 0.2 # Slight boost for moderation
total_weight_used += w_sent
# 5. Entity Presence (15%)
# Presence of Named Entities (PER, ORG, LOC) suggests verifyiability
w_ent = self.weights.get('entity_presence', 0.15)
entities = nlp_results.get('named_entities', [])
if len(entities) > 0:
# More entities = better (capped)
boost = min(1.0, len(entities) * 0.2)
adjustments += w_ent * boost
total_weight_used += w_ent
# 6. Text Coherence (12%) (Vocabulary Diversity)
w_coh = self.weights.get('coherence', 0.12)
coherence = nlp_results.get('coherence_score')
if coherence is not None:
# Coherence is usually 0.0 to 1.0
# Center around 0.5: >0.5 improves, <0.5 penalizes
adjustments += (coherence - 0.5) * w_coh
total_weight_used += w_coh
# 7. [NEW] GraphRAG Context Score (15%)
# This uses historical knowledge from the knowledge graph
w_graph = self.weights.get('graph_context', 0.15)
graph_context_data = rule_results.get('graph_context_data', {})
if graph_context_data and graph_context_data.get('confidence', 0) > 0:
# Use combined score from GraphRAG
graph_score = graph_context_data.get('combined_score', 0.5)
confidence = graph_context_data.get('confidence', 0)
# Scale adjustment by confidence (0 confidence = no effect)
adjustment_factor = (graph_score - 0.5) * w_graph * confidence
adjustments += adjustment_factor
total_weight_used += w_graph * confidence # Partial weight based on confidence
# 8. [NEW] Linguistic Markers Analysis (sensationalism penalty)
# Penalize sensational language heavily, reward doubt markers (critical thinking)
linguistic = rule_results.get('linguistic_markers', {})
sensationalism_count = linguistic.get('sensationalism', 0)
doubt_count = linguistic.get('doubt', 0)
certainty_count = linguistic.get('certainty', 0)
# Sensationalism is a strong negative signal
if sensationalism_count > 0:
penalty = min(0.20, sensationalism_count * 0.05) # Max 20% penalty
adjustments -= penalty
# Excessive certainty without sources is suspicious
if certainty_count > 2 and not fact_checks:
adjustments -= 0.05
# Doubt markers indicate critical/questioning tone (slight positive)
if doubt_count > 0:
adjustments += min(0.05, doubt_count * 0.02)
# Final calculation
# Base 0.5 + sum of weighted adjustments
# Adjustments are in range [-weight, +weight]
final_score = 0.5 + adjustments
return max(0.0, min(1.0, final_score))
# --- [NEW] TREC Evidence Retrieval Methods ---
def retrieve_evidence(
self,
claim: str,
k: int = 10,
model: str = "bm25"
) -> List[Dict[str, Any]]:
"""
Retrieve evidence documents for a given claim using TREC methodology.
This integrates the classic IR evaluation framework (TREC AP88-90)
with the neuro-symbolic credibility verification system.
Args:
claim: The claim or statement to verify
k: Number of evidence documents to retrieve
model: Retrieval model ('bm25', 'qld', 'tfidf')
Returns:
List of evidence dictionaries with doc_id, text, score, rank
"""
if not self.trec_retriever:
return []
try:
result = self.trec_retriever.retrieve_evidence(
claim=claim,
k=k,
model=model
)
# Convert Evidence objects to dictionaries
evidences = [e.to_dict() for e in result.evidences]
# Add to ontology if available
if self.ontology_manager:
for e in result.evidences[:3]: # Top 3 only
self.ontology_manager.add_evidence(
evidence_id=e.doc_id,
source=e.source or "trec_corpus",
content=e.text[:500],
score=e.score
)
return evidences
except Exception as ex:
print(f"[SysCRED] Evidence retrieval error: {ex}")
return []
def verify_with_evidence(
self,
claim: str,
k: int = 5
) -> Dict[str, Any]:
"""
Complete fact-checking pipeline with evidence retrieval.
Combines:
1. TREC-style evidence retrieval
2. NLP analysis of claim
3. Evidence-claim comparison
4. Credibility scoring
Args:
claim: The claim to verify
k: Number of evidence documents
Returns:
Verification result with evidence, analysis, and score
"""
result = {
'claim': claim,
'evidences': [],
'nlp_analysis': {},
'evidence_support_score': 0.0,
'verification_verdict': 'UNKNOWN',
'confidence': 0.0
}
# 1. Retrieve evidence
evidences = self.retrieve_evidence(claim, k=k)
result['evidences'] = evidences
# 2. NLP analysis of claim
cleaned_claim = self.preprocess(claim)
result['nlp_analysis'] = self.nlp_analysis(cleaned_claim)
# 3. Calculate evidence support score
if evidences:
# Use semantic similarity if SBERT available
if self.coherence_model:
try:
claim_embedding = self.coherence_model.encode(claim)
evidence_texts = [e.get('text', '') for e in evidences]
evidence_embeddings = self.coherence_model.encode(evidence_texts)
from sentence_transformers import util
similarities = util.pytorch_cos_sim(claim_embedding, evidence_embeddings)[0]
avg_similarity = similarities.mean().item()
max_similarity = similarities.max().item()
# Evidence support based on similarity
result['evidence_support_score'] = round(max_similarity, 4)
result['average_evidence_similarity'] = round(avg_similarity, 4)
except Exception as e:
print(f"[SysCRED] Similarity error: {e}")
# Fallback: use retrieval scores
result['evidence_support_score'] = evidences[0].get('score', 0) if evidences else 0
else:
# Fallback: use retrieval scores
result['evidence_support_score'] = evidences[0].get('score', 0) if evidences else 0
# 4. Determine verdict
support_score = result['evidence_support_score']
if support_score > 0.7:
result['verification_verdict'] = 'SUPPORTED'
result['confidence'] = support_score
elif support_score > 0.5:
result['verification_verdict'] = 'PARTIALLY_SUPPORTED'
result['confidence'] = support_score
elif support_score > 0.3:
result['verification_verdict'] = 'INSUFFICIENT_EVIDENCE'
result['confidence'] = 0.5
else:
result['verification_verdict'] = 'NOT_SUPPORTED'
result['confidence'] = 1 - support_score
return result
# --- End TREC Evidence Methods ---
def generate_report(
self,
input_data: str,
cleaned_text: str,
rule_results: Dict,
nlp_results: Dict,
external_data: ExternalData,
overall_score: float,
web_content: Optional[WebContent] = None,
graph_context: str = "", # [NEW]
evidences: List[Dict[str, Any]] = None # [NEW] TREC evidences
) -> Dict[str, Any]:
"""Generate the final evaluation report."""
# Determine credibility level
if overall_score >= 0.75:
niveau = "Élevée"
elif overall_score >= 0.55:
niveau = "Moyenne-Élevée"
elif overall_score >= 0.45:
niveau = "Moyenne"
elif overall_score >= 0.25:
niveau = "Faible-Moyenne"
else:
niveau = "Faible"
report = {
'idRapport': f"report_{int(datetime.datetime.now().timestamp())}",
'informationEntree': input_data,
'dateGeneration': datetime.datetime.now().isoformat(),
'scoreCredibilite': round(overall_score, 2),
'niveauCredibilite': niveau,
'resumeAnalyse': "",
'detailsScore': {
'base': 0.5,
'weights': self.weights,
'factors': self._get_score_factors(rule_results, nlp_results)
},
'sourcesUtilisees': [],
'reglesAppliquees': rule_results,
'analyseNLP': {
'sentiment': nlp_results.get('sentiment'),
'bias_analysis': nlp_results.get('bias_analysis'),
'named_entities_count': len(nlp_results.get('named_entities', [])),
'coherence_score': nlp_results.get('coherence_score'),
'sentiment_explanation_preview': (nlp_results.get('sentiment_explanation') or [])[:3]
},
# [NEW] GraphRAG section
'graphRAG': {
'context_text': graph_context,
'context_score': rule_results.get('graph_context_data', {}).get('combined_score'),
'confidence': rule_results.get('graph_context_data', {}).get('confidence', 0),
'has_history': rule_results.get('graph_context_data', {}).get('has_history', False),
'history_count': rule_results.get('graph_context_data', {}).get('history_count', 0),
'similar_claims_count': rule_results.get('graph_context_data', {}).get('similar_count', 0)
},
# [NEW] TREC Evidence section
'evidences': evidences or [],
'metadonnees': {}
}
# Add web content metadata if available
if web_content:
if web_content.success:
report['metadonnees']['page_title'] = web_content.title
report['metadonnees']['meta_description'] = web_content.meta_description
report['metadonnees']['links_count'] = len(web_content.links)
else:
report['metadonnees']['warning'] = f"Content scrape failed: {web_content.error}"
# Generate summary
summary_parts = []
if web_content and not web_content.success:
summary_parts.append(f"⚠️ ATTENTION: Impossible de lire le texte de la page ({web_content.error}). Analyse basée uniquement sur la réputation du domaine.")
if overall_score > 0.75:
summary_parts.append("L'analyse suggère une crédibilité ÉLEVÉE.")
elif overall_score > 0.55:
summary_parts.append("L'analyse suggère une crédibilité MOYENNE à ÉLEVÉE.")
elif overall_score > 0.45:
summary_parts.append("L'analyse suggère une crédibilité MOYENNE.")
elif overall_score > 0.25:
summary_parts.append("L'analyse suggère une crédibilité FAIBLE à MOYENNE.")
else:
summary_parts.append("L'analyse suggère une crédibilité FAIBLE.")
if external_data.source_reputation != 'Unknown':
summary_parts.append(f"Réputation source : {external_data.source_reputation}.")
if external_data.domain_age_days:
years = external_data.domain_age_days / 365
summary_parts.append(f"Âge du domaine : {years:.1f} ans.")
if external_data.fact_checks:
summary_parts.append(f"{len(external_data.fact_checks)} vérification(s) de faits trouvée(s).")
report['resumeAnalyse'] = " ".join(summary_parts)
# List sources used
if self.is_url(input_data):
report['sourcesUtilisees'].append({
'type': 'Primary URL',
'url': input_data
})
report['sourcesUtilisees'].append({
'type': 'WHOIS Lookup',
'status': 'Success' if (external_data.domain_info and external_data.domain_info.success) else 'Failed/N/A'
})
report['sourcesUtilisees'].append({
'type': 'Fact Check API',
'results_count': len(external_data.fact_checks)
})
# [NEW] Add TREC evidence source
if evidences:
report['sourcesUtilisees'].append({
'type': 'TREC Evidence Retrieval',
'method': 'BM25/TF-IDF',
'corpus': 'AP88-90',
'results_count': len(evidences)
})
return report
def _get_score_factors(self, rule_results: Dict, nlp_results: Dict) -> List[Dict]:
"""Get list of factors that influenced the score (For UI)."""
factors = []
# 1. Reputation
rep = rule_results['source_analysis'].get('reputation')
if rep and "N/A" not in rep:
factors.append({
'factor': 'Source Reputation',
'value': rep,
'weight': f"{int(self.weights.get('source_reputation',0)*100)}%",
'impact': '+' if rep == 'High' else ('-' if rep == 'Low' else '0')
})
# 2. Fact Checks
if rule_results.get('fact_checking'):
factors.append({
'factor': 'Fact Checks',
'value': f"{len(rule_results['fact_checking'])} found",
'weight': f"{int(self.weights.get('fact_check',0)*100)}%",
'impact': 'Variable'
})
# 3. Entities
n_ent = len(nlp_results.get('named_entities', []))
if n_ent > 0:
factors.append({
'factor': 'Entity Presence',
'value': f"{n_ent} entities",
'weight': f"{int(self.weights.get('entity_presence',0)*100)}%",
'impact': '+'
})
# 4. Sentiment
sent = nlp_results.get('sentiment', {})
if sent:
factors.append({
'factor': 'Sentiment Neutrality',
'value': f"{sent.get('label')} ({sent.get('score',0):.2f})",
'weight': f"{int(self.weights.get('sentiment_neutrality',0)*100)}%",
'impact': '-' if sent.get('score', 0) > 0.9 else '0'
})
# 5. GraphRAG Context (NEW)
graph_data = rule_results.get('graph_context_data', {})
if graph_data.get('confidence', 0) > 0:
graph_score = graph_data.get('combined_score', 0.5)
impact = '+' if graph_score > 0.6 else ('-' if graph_score < 0.4 else '0')
factors.append({
'factor': 'Graph Context (History)',
'value': f"Score: {graph_score:.2f}, Confidence: {graph_data.get('confidence', 0):.0%}",
'weight': f"{int(self.weights.get('graph_context',0)*100)}%",
'impact': impact,
'history_count': graph_data.get('history_count', 0),
'similar_count': graph_data.get('similar_count', 0)
})
return factors
def verify_information(self, input_data: str) -> Dict[str, Any]:
"""
Main pipeline to verify credibility of input data.
Args:
input_data: URL or text to verify
Returns:
Complete evaluation report
"""
if not isinstance(input_data, str) or not input_data.strip():
return {"error": "L'entrée doit être une chaîne non vide."}
print(f"\n[SysCRED] === Vérification: {input_data[:100]}... ===")
# 1. Determine input type and fetch content
text_to_analyze = ""
web_content = None
is_url = self.is_url(input_data)
if is_url:
print("[SysCRED] Fetching web content...")
web_content = self.api_clients.fetch_web_content(input_data)
if web_content.success:
text_to_analyze = web_content.text_content
print(f"[SysCRED] ✓ Content fetched: {len(text_to_analyze)} chars")
else:
print(f"[SysCRED] ⚠ Fetch failed: {web_content.error}")
print("[SysCRED] Proceeding with Domain/Metadata analysis only.")
text_to_analyze = ""
# We don't return error anymore, we proceed!
else:
text_to_analyze = input_data
# 2. Preprocess text
cleaned_text = self.preprocess(text_to_analyze)
# Only error on empty text if it wasn't a failed web fetch
# If web fetch failed, we proceed with empty text to give metadata analysis
if not cleaned_text and not (is_url and web_content and not web_content.success):
return {"error": "Le texte est vide après prétraitement."}
print(f"[SysCRED] Preprocessed text: {len(cleaned_text)} chars")
# Determine best query for Fact Checking
fact_check_query = input_data
if text_to_analyze and len(text_to_analyze) > 10:
# Use start of text if available
fact_check_query = text_to_analyze[:200]
elif is_url and web_content and web_content.title:
# Fallback to page title if text is missing (e.g. 403)
fact_check_query = web_content.title
# 3. Fetch external data
print(f"[SysCRED] Fetching external data (Query: {fact_check_query[:50]}...)...")
external_data = self.api_clients.fetch_external_data(input_data, fc_query=fact_check_query)
# [FIX] Handle text-only input reputation
if not is_url:
external_data.source_reputation = "N/A (User Input)"
print(f"[SysCRED] ✓ Reputation: {external_data.source_reputation}, Age: {external_data.domain_age_days} days")
# 4. Rule-based analysis
print("[SysCRED] Running rule-based analysis...")
rule_results = self.rule_based_analysis(cleaned_text, external_data)
# 5. [MOVED] GraphRAG Context Retrieval (Before NLP for context)
graph_context = ""
similar_uris = []
graph_context_data = {}
if self.graph_rag and 'source_analysis' in rule_results:
domain = rule_results['source_analysis'].get('domain', '')
# Pass keywords for text search if domain is empty or generic
keywords = []
if cleaned_text:
# Extract meaningful keywords (filter out short words)
keywords = [w for w in cleaned_text.split()[:10] if len(w) > 4]
# Get text context for display
context = self.graph_rag.get_context(domain, keywords=keywords)
graph_context = context.get('full_text', '')
similar_uris = context.get('similar_uris', [])
# Get numerical score for integration into scoring
graph_context_data = self.graph_rag.compute_context_score(domain, keywords=keywords)
# Add to rule_results for use in calculate_overall_score
rule_results['graph_context_data'] = graph_context_data
if graph_context_data.get('has_history'):
print(f"[SysCRED] GraphRAG: Domain has {graph_context_data['history_count']} prior evaluations, "
f"avg score: {graph_context_data['history_score']:.2f}")
if graph_context_data.get('similar_count', 0) > 0:
print(f"[SysCRED] GraphRAG: Found {graph_context_data['similar_count']} similar claims")
# 6. NLP analysis
print("[SysCRED] Running NLP analysis...")
nlp_results = self.nlp_analysis(cleaned_text)
# 6.5 [NER] Named Entity Recognition
ner_entities = {}
if self.ner_analyzer and cleaned_text:
try:
ner_entities = self.ner_analyzer.extract_entities(cleaned_text)
total = sum(len(v) for v in ner_entities.values() if isinstance(v, list))
print(f"[SysCRED] NER: {total} entites detectees")
except Exception as e:
print(f"[SysCRED] NER failed: {e}")
# 6.6 [E-E-A-T] Experience-Expertise-Authority-Trust scoring
eeat_scores = {}
if self.eeat_calculator:
try:
url_for_eeat = input_data if is_url else ""
domain_age_years = None
if external_data.domain_age_days:
domain_age_years = external_data.domain_age_days / 365.0
eeat_raw = self.eeat_calculator.calculate(
url=url_for_eeat,
text=cleaned_text,
nlp_analysis=nlp_results,
fact_checks=rule_results.get('fact_checking', []),
domain_age_years=domain_age_years,
has_https=input_data.startswith("https://") if is_url else False
)
eeat_scores = eeat_raw.to_dict() if hasattr(eeat_raw, 'to_dict') else (
eeat_raw if isinstance(eeat_raw, dict) else vars(eeat_raw)
)
print(f"[SysCRED] E-E-A-T score: {eeat_scores.get('overall', 'N/A')}")
except Exception as e:
print(f"[SysCRED] E-E-A-T failed: {e}")
# 7. Calculate score (Now includes GraphRAG context)
overall_score = self.calculate_overall_score(rule_results, nlp_results)
print(f"[SysCRED] ✓ Credibility score: {overall_score:.2f}")
# 8. Generate report (Updated to include context)
report = self.generate_report(
input_data, cleaned_text, rule_results,
nlp_results, external_data, overall_score, web_content,
graph_context=graph_context
)
# [NER + E-E-A-T] Always include in report (even if empty)
report['ner_entities'] = ner_entities
report['eeat_scores'] = eeat_scores
# Add similar URIs to report for ontology linking
if similar_uris:
report['similar_claims_uris'] = similar_uris
# 9. Save to ontology
if self.ontology_manager:
try:
report_uri = self.ontology_manager.add_evaluation_triplets(report)
report['ontology_uri'] = report_uri
self.ontology_manager.save_data()
except Exception as e:
print(f"[SysCRED] Ontology save failed: {e}")
print("[SysCRED] === Vérification terminée ===\n")
return report
# --- Main / Testing ---
if __name__ == "__main__":
import json
print("=" * 60)
print("SysCRED v2.0 - Système de Vérification de Crédibilité")
print("(c) Dominique S. Loyer - PhD Thesis Prototype")
print("=" * 60 + "\n")
# Initialize system (without ML models for quick testing)
system = CredibilityVerificationSystem(
ontology_base_path="/Users/bk280625/documents041025/MonCode/sysCRED_onto26avrtil.ttl",
ontology_data_path="/Users/bk280625/documents041025/MonCode/ontology/sysCRED_data.ttl",
load_ml_models=False # Set to True for full analysis
)
# Test cases
test_cases = {
"Test URL Crédible": "https://www.lemonde.fr",
"Test URL Inconnu": "https://example.com/article",
"Test Texte Simple": "This is a verified and authentic news report.",
"Test Texte Suspect": "Shocking conspiracy revealed! They don't want you to know this secret!",
}
results = {}
for name, test_input in test_cases.items():
print(f"\n{'='*50}")
print(f"Test: {name}")
print('='*50)
result = system.verify_information(test_input)
results[name] = result
if 'error' not in result:
print(f"\nScore: {result['scoreCredibilite']}")
print(f"Résumé: {result['resumeAnalyse']}")
else:
print(f"Erreur: {result['error']}")
print("\n" + "="*60)
print("Résumé des tests:")
print("="*60)
for name, result in results.items():
if 'error' not in result:
print(f" {name}: Score = {result['scoreCredibilite']:.2f}")
else:
print(f" {name}: ERREUR")
|