Spaces:
Running
Running
File size: 11,248 Bytes
e70050b 8e97fc5 e70050b 8e97fc5 e70050b 8e97fc5 e70050b 8e97fc5 e70050b 8e97fc5 e70050b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 | # -*- coding: utf-8 -*-
"""
GraphRAG Module - SysCRED
=========================
Retrieves context from the Knowledge Graph to enhance verification.
Transforms "Passive" Graph into "Active" Context.
(c) Dominique S. Loyer - PhD Thesis Prototype
"""
from typing import List, Dict, Any, Optional
from syscred.ontology_manager import OntologyManager
class GraphRAG:
"""
Retrieval Augmented Generation using the Semantic Knowledge Graph.
"""
def __init__(self, ontology_manager: OntologyManager):
self.om = ontology_manager
def get_context(self, domain: str, keywords: List[str] = []) -> Dict[str, str]:
"""
Retrieve context for a specific verification task.
Args:
domain: The domain being analyzed (e.g., 'lemonde.fr')
keywords: List of keywords from the claim (not yet used in V1)
Returns:
Dictionary with natural language context strings.
"""
if not self.om:
return {"graph_context": "No ontology manager available."}
context_parts = []
# 1. Source History
source_history = self._get_source_history(domain)
if source_history:
context_parts.append(source_history)
# 2. Pattern Matching (Similar Claims)
similar_uris = []
if keywords:
similar_result = self._find_similar_claims(keywords)
if similar_result["text"]:
context_parts.append(similar_result["text"])
similar_uris = similar_result["uris"]
full_context = "\n\n".join(context_parts) if context_parts else "No prior knowledge found in the graph."
return {
"full_text": full_context,
"source_history": source_history,
"similar_uris": similar_uris # [NEW] Return URIs for linking
}
def _get_source_history(self, domain: str) -> str:
"""
Query the graph for all previous evaluations of this domain.
"""
if not domain:
return ""
# We reuse the specific query logic but tailored for retrieval
query = """
PREFIX cred: <https://github.com/DominiqueLoyer/systemFactChecking#>
SELECT ?score ?level ?timestamp
WHERE {
?info cred:informationURL ?url .
?request cred:concernsInformation ?info .
?report cred:isReportOf ?request .
?report cred:credibilityScoreValue ?score .
?report cred:assignsCredibilityLevel ?level .
?report cred:completionTimestamp ?timestamp .
FILTER(CONTAINS(STR(?url), "%s"))
}
ORDER BY DESC(?timestamp)
LIMIT 5
""" % domain
results = []
try:
combined = self.om.base_graph + self.om.data_graph
for row in combined.query(query):
results.append({
"score": float(row.score),
"level": str(row.level).split('#')[-1],
"date": str(row.timestamp).split('T')[0]
})
except Exception as e:
print(f"[GraphRAG] Query error: {e}")
return ""
if not results:
return f"The graph contains no previous evaluations for {domain}."
# Summarize
count = len(results)
avg_score = sum(r['score'] for r in results) / count
last_verdict = results[0]['level']
summary = (
f"Graph Memory for '{domain}':\n"
f"- Analyzed {count} times previously.\n"
f"- Average Credibility Score: {avg_score:.2f} / 1.0\n"
f"- Most recent verdict ({results[0]['date']}): {last_verdict}.\n"
)
return summary
def _find_similar_claims(self, keywords: List[str]) -> Dict[str, Any]:
"""
Find evaluation history for content containing specific keywords.
Returns dict with 'text' (for LLM) and 'uris' (for Graph linking).
"""
if not keywords:
return {"text": "", "uris": [], "scores": []}
# Build REGEX filter for keywords (OR logic)
# e.g., (fake|hoax|conspiracy)
clean_kws = [k for k in keywords if len(k) > 3] # Skip short words
if not clean_kws:
return {"text": "", "uris": [], "scores": []}
regex_pattern = "|".join(clean_kws)
query = """
PREFIX cred: <https://github.com/DominiqueLoyer/systemFactChecking#>
SELECT ?report ?content ?score ?level ?timestamp
WHERE {
?info cred:informationContent ?content .
?request cred:concernsInformation ?info .
?report cred:isReportOf ?request .
?report cred:credibilityScoreValue ?score .
?report cred:assignsCredibilityLevel ?level .
?report cred:completionTimestamp ?timestamp .
FILTER(REGEX(?content, "%s", "i"))
}
ORDER BY DESC(?timestamp)
LIMIT 3
""" % regex_pattern
results = []
try:
combined = self.om.base_graph + self.om.data_graph
for row in combined.query(query):
results.append({
"uri": str(row.report),
"content": str(row.content)[:100] + "...",
"score": float(row.score),
"verdict": str(row.level).split('#')[-1]
})
except Exception as e:
print(f"[GraphRAG] Similar claims error: {e}")
return {"text": "", "uris": [], "scores": []}
if not results:
return {"text": "", "uris": [], "scores": []}
lines = [f"Found {len(results)} similar claims in history:"]
for r in results:
lines.append(f"- \"{r['content']}\" ({r['verdict']}, Score: {r['score']:.2f})")
return {
"text": "\n".join(lines),
"uris": [r['uri'] for r in results],
"scores": [r['score'] for r in results]
}
def compute_context_score(self, domain: str, keywords: List[str] = []) -> Dict[str, float]:
"""
Compute numerical context scores for integration into credibility scoring.
This transforms the GraphRAG context into actionable numerical scores
that can be directly used in the calculate_overall_score() function.
Args:
domain: The domain being analyzed (e.g., 'lemonde.fr')
keywords: List of keywords from the claim
Returns:
Dictionary with:
- 'history_score': 0.0-1.0 based on past evaluations of this domain
- 'pattern_score': 0.0-1.0 based on similar claims in the graph
- 'combined_score': Weighted average (0.7 * history + 0.3 * pattern)
- 'confidence': How confident we are (based on amount of data)
- 'has_history': Boolean if domain has prior evaluations
"""
result = {
'history_score': 0.5, # Neutral default
'pattern_score': 0.5,
'combined_score': 0.5,
'confidence': 0.0,
'has_history': False,
'history_count': 0,
'similar_count': 0
}
if not self.om:
return result
# 1. Get source history score
history_data = self._get_source_history_data(domain)
if history_data['count'] > 0:
result['history_score'] = history_data['avg_score']
result['has_history'] = True
result['history_count'] = history_data['count']
# Confidence increases with more data points (max at 5)
history_confidence = min(1.0, history_data['count'] / 5)
else:
history_confidence = 0.0
# 2. Get pattern score from similar claims
if keywords:
similar_result = self._find_similar_claims(keywords)
scores = similar_result.get('scores', [])
if scores:
result['pattern_score'] = sum(scores) / len(scores)
result['similar_count'] = len(scores)
pattern_confidence = min(1.0, len(scores) / 3)
else:
pattern_confidence = 0.0
else:
pattern_confidence = 0.0
# 3. Calculate combined score
# Weight history more heavily than pattern matching
if result['has_history'] and result['similar_count'] > 0:
result['combined_score'] = 0.7 * result['history_score'] + 0.3 * result['pattern_score']
result['confidence'] = 0.6 * history_confidence + 0.4 * pattern_confidence
elif result['has_history']:
result['combined_score'] = result['history_score']
result['confidence'] = history_confidence * 0.8 # Reduce confidence without pattern
elif result['similar_count'] > 0:
result['combined_score'] = result['pattern_score']
result['confidence'] = pattern_confidence * 0.5 # Lower confidence with only patterns
else:
# No data available - return neutral
result['combined_score'] = 0.5
result['confidence'] = 0.0
return result
def _get_source_history_data(self, domain: str) -> Dict[str, Any]:
"""
Query the graph for evaluation statistics of this domain.
Returns:
Dictionary with 'count', 'avg_score', 'last_verdict', 'scores'
"""
if not domain:
return {'count': 0, 'avg_score': 0.5, 'scores': []}
query = """
PREFIX cred: <https://github.com/DominiqueLoyer/systemFactChecking#>
SELECT ?score ?level ?timestamp
WHERE {
?info cred:informationURL ?url .
?request cred:concernsInformation ?info .
?report cred:isReportOf ?request .
?report cred:credibilityScoreValue ?score .
?report cred:assignsCredibilityLevel ?level .
?report cred:completionTimestamp ?timestamp .
FILTER(CONTAINS(STR(?url), "%s"))
}
ORDER BY DESC(?timestamp)
LIMIT 10
""" % domain
scores = []
last_verdict = None
try:
combined = self.om.base_graph + self.om.data_graph
for i, row in enumerate(combined.query(query)):
scores.append(float(row.score))
if i == 0:
last_verdict = str(row.level).split('#')[-1]
except Exception as e:
print(f"[GraphRAG] History data query error: {e}")
return {'count': 0, 'avg_score': 0.5, 'scores': []}
if not scores:
return {'count': 0, 'avg_score': 0.5, 'scores': []}
return {
'count': len(scores),
'avg_score': sum(scores) / len(scores),
'last_verdict': last_verdict,
'scores': scores
}
|