|
|
|
|
|
import json |
|
|
import logging |
|
|
from typing import Optional, Type |
|
|
from urllib.parse import urlparse |
|
|
import tldextract |
|
|
|
|
|
from ultradata_math_parser.parsers.article_parser import ArticleParser |
|
|
from ultradata_math_parser.parsers.forum_parser import ForumParser |
|
|
from ultradata_math_parser.parsers.custom_parser import CustomParser |
|
|
from ultradata_math_parser.parsers.unified_parser import UnifiedParser |
|
|
from ultradata_math_parser.utils import text_len, run_w3m_dump, W3MError |
|
|
from ultradata_math_parser.config import URL_PATTERNS_TO_HTML_TYPE, BUILTIN_SITE_RULES |
|
|
|
|
|
|
|
|
class GeneralParser: |
|
|
def __init__(self, config_path="", w3m_path: str = "w3m"): |
|
|
self.logger = logging.getLogger(__name__) |
|
|
if config_path: |
|
|
try: |
|
|
with open(config_path, 'r', encoding='utf-8') as f: |
|
|
self.rule = json.loads(f.read()) |
|
|
except: |
|
|
pass |
|
|
else: |
|
|
self.rule = {} |
|
|
self.w3m_path = w3m_path or "w3m" |
|
|
self.tld_extractor = tldextract.TLDExtract() |
|
|
|
|
|
def extract(self, html="", w3m_path: Optional[str] = None, **kwargs) -> dict: |
|
|
base_url = kwargs.get("base_url", "") |
|
|
netloc = urlparse(base_url).netloc if base_url else "" |
|
|
html_type = kwargs.pop("html_type", None) |
|
|
|
|
|
current_w3m_path = w3m_path or self.w3m_path |
|
|
|
|
|
|
|
|
if base_url and self._quick_check_builtin_rules(base_url): |
|
|
try: |
|
|
extracted = self.tld_extractor(base_url) |
|
|
domain = f"{extracted.domain}.{extracted.suffix}" |
|
|
self.logger.debug("TLD Extract result for %s: domain=%s, suffix=%s -> key=%s", base_url, extracted.domain, extracted.suffix, domain) |
|
|
|
|
|
if domain in BUILTIN_SITE_RULES: |
|
|
try: |
|
|
builtin_rule = BUILTIN_SITE_RULES[domain] |
|
|
new_kwargs = dict() |
|
|
new_kwargs["rule"] = builtin_rule |
|
|
new_kwargs.update(kwargs) |
|
|
self.logger.debug("Using builtin rule for domain: %s", domain) |
|
|
return self._run_extractor(CustomParser, html, new_kwargs, w3m_path=current_w3m_path) |
|
|
except Exception as exc: |
|
|
self.logger.debug("Builtin rule extractor failed for %s: %s", domain, exc) |
|
|
except Exception as e: |
|
|
self.logger.debug("Error extracting domain or checking builtin rules: %s", e) |
|
|
|
|
|
|
|
|
if not html_type and base_url: |
|
|
for pattern, type in URL_PATTERNS_TO_HTML_TYPE.items(): |
|
|
if pattern in base_url: |
|
|
html_type = type |
|
|
break |
|
|
|
|
|
|
|
|
if netloc in self.rule: |
|
|
try: |
|
|
new_kwargs = dict() |
|
|
new_kwargs["rule"] = self.rule[netloc] |
|
|
new_kwargs.update(kwargs) |
|
|
return self._run_extractor(CustomParser, html, new_kwargs, w3m_path=current_w3m_path) |
|
|
except Exception as exc: |
|
|
self.logger.debug("Custom extractor failed for %s: %s", netloc, exc) |
|
|
|
|
|
|
|
|
if html_type == "forum": |
|
|
return self._run_extractor(ForumParser, html, kwargs, w3m_path=current_w3m_path) |
|
|
if html_type == "article": |
|
|
return self._run_extractor(ArticleParser, html, kwargs, w3m_path=current_w3m_path) |
|
|
if html_type == "unified": |
|
|
return self._run_extractor(UnifiedParser, html, kwargs, w3m_path=current_w3m_path) |
|
|
|
|
|
|
|
|
return self._run_extractor(UnifiedParser, html, kwargs, w3m_path=current_w3m_path) |
|
|
|
|
|
def _quick_check_builtin_rules(self, url: str) -> bool: |
|
|
if not url: |
|
|
return False |
|
|
url_lower = url.lower() |
|
|
for domain in BUILTIN_SITE_RULES: |
|
|
if domain in url_lower: |
|
|
return True |
|
|
return False |
|
|
|
|
|
def _run_extractor(self, extractor_cls: Type, html: str, kwargs: dict, w3m_path: str): |
|
|
result = extractor_cls().extract(html=html, **dict(kwargs)) |
|
|
return self._apply_w3m(result, w3m_path=w3m_path) |
|
|
|
|
|
def _apply_w3m(self, result: Optional[dict], w3m_path: str) -> Optional[dict]: |
|
|
if not result: |
|
|
return result |
|
|
html_fragment = result.get("html") |
|
|
if not html_fragment: |
|
|
raise RuntimeError("Extraction result does not contain 'html' for w3m") |
|
|
text = run_w3m_dump(html_fragment, w3m_path) |
|
|
enriched = dict(result) |
|
|
enriched["text"] = text |
|
|
enriched["w3m_text"] = text |
|
|
enriched["text_length"] = text_len(text) |
|
|
return enriched |
|
|
|