ZhouChuYue
init
a579dd2
# -*- coding: utf-8 -*-
import json
import logging
from typing import Optional, Type
from urllib.parse import urlparse
import tldextract
from ultradata_math_parser.parsers.article_parser import ArticleParser
from ultradata_math_parser.parsers.forum_parser import ForumParser
from ultradata_math_parser.parsers.custom_parser import CustomParser
from ultradata_math_parser.parsers.unified_parser import UnifiedParser
from ultradata_math_parser.utils import text_len, run_w3m_dump, W3MError
from ultradata_math_parser.config import URL_PATTERNS_TO_HTML_TYPE, BUILTIN_SITE_RULES
class GeneralParser:
def __init__(self, config_path="", w3m_path: str = "w3m"):
self.logger = logging.getLogger(__name__)
if config_path:
try:
with open(config_path, 'r', encoding='utf-8') as f:
self.rule = json.loads(f.read())
except:
pass
else:
self.rule = {}
self.w3m_path = w3m_path or "w3m"
self.tld_extractor = tldextract.TLDExtract()
def extract(self, html="", w3m_path: Optional[str] = None, **kwargs) -> dict:
base_url = kwargs.get("base_url", "")
netloc = urlparse(base_url).netloc if base_url else ""
html_type = kwargs.pop("html_type", None)
current_w3m_path = w3m_path or self.w3m_path
# 检查 URL 是否匹配内置规则
if base_url and self._quick_check_builtin_rules(base_url):
try:
extracted = self.tld_extractor(base_url)
domain = f"{extracted.domain}.{extracted.suffix}"
self.logger.debug("TLD Extract result for %s: domain=%s, suffix=%s -> key=%s", base_url, extracted.domain, extracted.suffix, domain)
if domain in BUILTIN_SITE_RULES:
try:
builtin_rule = BUILTIN_SITE_RULES[domain]
new_kwargs = dict()
new_kwargs["rule"] = builtin_rule
new_kwargs.update(kwargs)
self.logger.debug("Using builtin rule for domain: %s", domain)
return self._run_extractor(CustomParser, html, new_kwargs, w3m_path=current_w3m_path)
except Exception as exc:
self.logger.debug("Builtin rule extractor failed for %s: %s", domain, exc)
except Exception as e:
self.logger.debug("Error extracting domain or checking builtin rules: %s", e)
# 检查 URL 类型模式
if not html_type and base_url:
for pattern, type in URL_PATTERNS_TO_HTML_TYPE.items():
if pattern in base_url:
html_type = type
break
# 使用用户配置的规则
if netloc in self.rule:
try:
new_kwargs = dict()
new_kwargs["rule"] = self.rule[netloc]
new_kwargs.update(kwargs)
return self._run_extractor(CustomParser, html, new_kwargs, w3m_path=current_w3m_path)
except Exception as exc:
self.logger.debug("Custom extractor failed for %s: %s", netloc, exc)
# 根据 html_type 选择提取模式
if html_type == "forum":
return self._run_extractor(ForumParser, html, kwargs, w3m_path=current_w3m_path)
if html_type == "article":
return self._run_extractor(ArticleParser, html, kwargs, w3m_path=current_w3m_path)
if html_type == "unified":
return self._run_extractor(UnifiedParser, html, kwargs, w3m_path=current_w3m_path)
# 默认使用统一模式
return self._run_extractor(UnifiedParser, html, kwargs, w3m_path=current_w3m_path)
def _quick_check_builtin_rules(self, url: str) -> bool:
if not url:
return False
url_lower = url.lower()
for domain in BUILTIN_SITE_RULES:
if domain in url_lower:
return True
return False
def _run_extractor(self, extractor_cls: Type, html: str, kwargs: dict, w3m_path: str):
result = extractor_cls().extract(html=html, **dict(kwargs))
return self._apply_w3m(result, w3m_path=w3m_path)
def _apply_w3m(self, result: Optional[dict], w3m_path: str) -> Optional[dict]:
if not result:
return result
html_fragment = result.get("html")
if not html_fragment:
raise RuntimeError("Extraction result does not contain 'html' for w3m")
text = run_w3m_dump(html_fragment, w3m_path)
enriched = dict(result)
enriched["text"] = text
enriched["w3m_text"] = text
enriched["text_length"] = text_len(text)
return enriched