File size: 2,391 Bytes
d23039a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
"""Phase 5 smoke: unit-test news_lookup classification + endpoint wiring."""
from __future__ import annotations

import asyncio
import sys
from pathlib import Path

sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

from services.news_lookup import (
    _domain_of, _is_factcheck, _relevance, search_news_full,
)


def test_domain():
    assert _domain_of("https://www.reuters.com/article/x") == "reuters.com"
    assert _domain_of("https://snopes.com/fact-check/abc") == "snopes.com"
    print("[OK] _domain_of")


def test_factcheck_detection():
    assert _is_factcheck("https://snopes.com/x", "Claim about moon")
    assert _is_factcheck("https://factly.in/x", "")
    assert _is_factcheck("https://example.com/x", "FACT CHECK: viral video debunked")
    assert not _is_factcheck("https://bbc.com/news/world-123", "Election results")
    print("[OK] _is_factcheck")


def test_relevance():
    assert _relevance("https://reuters.com/x") == 1.0
    assert _relevance("https://ndtv.com/x") == 0.85
    assert _relevance("https://random-blog.xyz/x") == 0.5
    print("[OK] _relevance weights")


async def test_empty_key_returns_empty():
    res = await search_news_full(["modi", "election"])
    assert res.trusted_sources == []
    assert res.contradicting_evidence == []
    assert res.total_articles == 0
    print(f"[OK] empty-key path -> {res}")


async def test_endpoint_wiring():
    import httpx
    body = {"text": "BREAKING!!! You won't BELIEVE this SHOCKING miracle cure doctors don't want you to know!!! Click now!"}
    async with httpx.AsyncClient(timeout=180.0) as c:
        r = await c.post("http://127.0.0.1:8000/api/v1/analyze/text", json=body)
    r.raise_for_status()
    j = r.json()
    assert j["media_type"] == "text"
    assert "trusted_sources" in j
    assert "contradicting_evidence" in j
    assert "news_lookup" in j["processing_summary"]["stages_completed"]
    print(f"[OK] /analyze/text -> verdict={j['verdict']['label']} "
          f"score={j['verdict']['authenticity_score']} "
          f"trusted={len(j['trusted_sources'])} contradictions={len(j['contradicting_evidence'])}")


async def main():
    test_domain()
    test_factcheck_detection()
    test_relevance()
    await test_empty_key_returns_empty()
    await test_endpoint_wiring()
    print("\n=== Phase 5 smoke PASS ===")


if __name__ == "__main__":
    asyncio.run(main())