|
|
|
|
|
|
|
|
import os |
|
|
import re |
|
|
import logging |
|
|
import subprocess |
|
|
import tempfile |
|
|
from gzip import decompress |
|
|
|
|
|
import numpy as np |
|
|
from lxml import etree |
|
|
from lxml.html import Element, HtmlElement, HTMLParser, fromstring, tostring |
|
|
from lxml.html.clean import Cleaner |
|
|
from urllib3.response import HTTPResponse |
|
|
from ultradata_math_parser.config import Unique_ID |
|
|
|
|
|
try: |
|
|
import brotli |
|
|
except ImportError: |
|
|
brotli = None |
|
|
|
|
|
try: |
|
|
from cchardet import detect as cchardet_detect |
|
|
except ImportError: |
|
|
cchardet_detect = None |
|
|
|
|
|
from difflib import SequenceMatcher |
|
|
|
|
|
from charset_normalizer import from_bytes |
|
|
|
|
|
HTML_PARSER = HTMLParser( |
|
|
collect_ids=False, |
|
|
default_doctype=False, |
|
|
encoding="utf-8", |
|
|
remove_comments=True, |
|
|
remove_pis=True, |
|
|
) |
|
|
DOCTYPE_TAG = re.compile("^< ?! ?DOCTYPE.+?/ ?>", re.I) |
|
|
UNICODE_ALIASES = {"utf-8", "utf_8"} |
|
|
|
|
|
HTML_CLEANER = Cleaner( |
|
|
annoying_tags=False, |
|
|
comments=True, |
|
|
embedded=False, |
|
|
forms=False, |
|
|
frames=False, |
|
|
javascript=False, |
|
|
links=False, |
|
|
meta=False, |
|
|
page_structure=False, |
|
|
processing_instructions=True, |
|
|
remove_unknown_tags=False, |
|
|
safe_attrs_only=False, |
|
|
scripts=False, |
|
|
style=False, |
|
|
) |
|
|
|
|
|
color_regex = re.compile(r"\\textcolor\[.*?\]\{.*?\}") |
|
|
|
|
|
latex_image_class_names = [ |
|
|
"latexcenter", |
|
|
"latex", |
|
|
"tex", |
|
|
"latexdisplay", |
|
|
"latexblock", |
|
|
"latexblockcenter", |
|
|
] |
|
|
|
|
|
|
|
|
def _translator(): |
|
|
old_log_level = logging.getLogger().level |
|
|
try: |
|
|
import py_asciimath.translator.translator as _translator |
|
|
|
|
|
return _translator |
|
|
finally: |
|
|
logging.getLogger().setLevel(old_log_level) |
|
|
|
|
|
|
|
|
def ASCIIMath2Tex(*args, **kwargs): |
|
|
return _translator().ASCIIMath2Tex(*args, **kwargs) |
|
|
|
|
|
|
|
|
def MathML2Tex(*args, **kwargs): |
|
|
return _translator().MathML2Tex(*args, **kwargs) |
|
|
|
|
|
|
|
|
asciimath2tex = ASCIIMath2Tex(log=False) |
|
|
|
|
|
|
|
|
def lcs_of_2(a, b): |
|
|
match = SequenceMatcher(None, a, b).find_longest_match(0, len(a), 0, len(b)) |
|
|
return a[match[0]: match[0] + match[2]] |
|
|
|
|
|
|
|
|
def lcs_of_list(*args): |
|
|
if len(args) == 2: |
|
|
return lcs_of_2(args[0], args[1]) |
|
|
first = args[0] |
|
|
remains = args[1:] |
|
|
return lcs_of_2(first, lcs_of_list(*remains)) |
|
|
|
|
|
|
|
|
def isutf8(data): |
|
|
try: |
|
|
data.decode("UTF-8") |
|
|
except UnicodeDecodeError: |
|
|
return False |
|
|
return True |
|
|
|
|
|
|
|
|
def handle_compressed_file(filecontent): |
|
|
if isinstance(filecontent, bytes): |
|
|
if filecontent[:2] == b"\x1f\x8b": |
|
|
try: |
|
|
filecontent = decompress(filecontent) |
|
|
except (EOFError, OSError): |
|
|
pass |
|
|
elif brotli is not None: |
|
|
try: |
|
|
filecontent = brotli.decompress(filecontent) |
|
|
except brotli.error: |
|
|
pass |
|
|
return filecontent |
|
|
|
|
|
|
|
|
def detect_encoding(bytesobject): |
|
|
if isutf8(bytesobject): |
|
|
return ["utf-8"] |
|
|
guesses = [] |
|
|
if cchardet_detect is not None: |
|
|
cchardet_guess = cchardet_detect(bytesobject)["encoding"] |
|
|
if cchardet_guess is not None: |
|
|
guesses.append(cchardet_guess.lower()) |
|
|
detection_results = from_bytes(bytesobject[:15000]) or from_bytes(bytesobject) |
|
|
if len(detection_results) > 0: |
|
|
guesses.extend([r.encoding for r in detection_results]) |
|
|
return [g for g in guesses if g not in UNICODE_ALIASES] |
|
|
|
|
|
|
|
|
def decode_file(filecontent): |
|
|
if isinstance(filecontent, str): |
|
|
return filecontent |
|
|
htmltext = None |
|
|
filecontent = handle_compressed_file(filecontent) |
|
|
for guessed_encoding in detect_encoding(filecontent): |
|
|
try: |
|
|
htmltext = filecontent.decode(guessed_encoding) |
|
|
except (LookupError, UnicodeDecodeError): |
|
|
htmltext = None |
|
|
else: |
|
|
break |
|
|
return htmltext or str(filecontent, encoding="utf-8", errors="replace") |
|
|
|
|
|
|
|
|
def strip_faulty_doctypes(htmlstring: str, beginning: str) -> str: |
|
|
if "doctype" in beginning: |
|
|
firstline, _, rest = htmlstring.partition("\n") |
|
|
return DOCTYPE_TAG.sub("", firstline, count=1) + "\n" + rest |
|
|
return htmlstring |
|
|
|
|
|
|
|
|
def is_dubious_html(beginning: str) -> bool: |
|
|
return "html" not in beginning |
|
|
|
|
|
|
|
|
def fromstring_bytes(htmlobject): |
|
|
tree = None |
|
|
try: |
|
|
tree = fromstring( |
|
|
htmlobject.encode("utf8", "surrogatepass"), parser=HTML_PARSER |
|
|
) |
|
|
except Exception as err: |
|
|
pass |
|
|
return tree |
|
|
|
|
|
|
|
|
def ancestor_node_check(node: HtmlElement, tags: list): |
|
|
for tag in tags: |
|
|
if node.xpath(f'ancestor::{tag}[1]'): |
|
|
return True |
|
|
return False |
|
|
|
|
|
|
|
|
def load_html(htmlobject): |
|
|
if isinstance(htmlobject, HtmlElement): |
|
|
return htmlobject |
|
|
if isinstance(htmlobject, HTTPResponse) or hasattr(htmlobject, "data"): |
|
|
htmlobject = htmlobject.data |
|
|
if not isinstance(htmlobject, (bytes, str)): |
|
|
raise TypeError("incompatible input type", type(htmlobject)) |
|
|
tree = None |
|
|
htmlobject = decode_file(htmlobject) |
|
|
beginning = htmlobject[:50].lower() |
|
|
check_flag = is_dubious_html(beginning) |
|
|
htmlobject = strip_faulty_doctypes(htmlobject, beginning) |
|
|
fallback_parse = False |
|
|
try: |
|
|
tree = fromstring(htmlobject, parser=HTML_PARSER) |
|
|
except ValueError: |
|
|
tree = fromstring_bytes(htmlobject) |
|
|
fallback_parse = True |
|
|
except Exception as err: |
|
|
pass |
|
|
if (tree is None or len(tree) < 1) and not fallback_parse: |
|
|
tree = fromstring_bytes(htmlobject) |
|
|
if tree is not None and check_flag is True and len(tree) < 2: |
|
|
tree = None |
|
|
return tree |
|
|
|
|
|
|
|
|
class W3MError(RuntimeError): |
|
|
"""Raised when w3m rendering fails.""" |
|
|
|
|
|
|
|
|
def run_w3m_dump(html_content: str, w3m_path: str, *, columns: int = 200) -> str: |
|
|
""" |
|
|
Render HTML content into plain text using w3m. |
|
|
|
|
|
:param html_content: HTML snippet to render. |
|
|
:param w3m_path: Path to the w3m executable. |
|
|
:param columns: Column width passed to w3m (-cols). |
|
|
:return: Rendered plain text. |
|
|
:raises RuntimeError: if w3m is unavailable or returns a non-zero exit code. |
|
|
""" |
|
|
if not w3m_path: |
|
|
raise W3MError("w3m path must be provided") |
|
|
|
|
|
tmp_file = tempfile.NamedTemporaryFile( |
|
|
mode="w", suffix=".html", delete=False, encoding="utf-8" |
|
|
) |
|
|
try: |
|
|
tmp_file.write(html_content or "") |
|
|
tmp_file.flush() |
|
|
tmp_file.close() |
|
|
|
|
|
try: |
|
|
completed = subprocess.run( |
|
|
[ |
|
|
w3m_path, |
|
|
"-dump", |
|
|
"-T", |
|
|
"text/html", |
|
|
"-cols", |
|
|
str(columns), |
|
|
tmp_file.name, |
|
|
], |
|
|
check=True, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
) |
|
|
except FileNotFoundError as exc: |
|
|
raise W3MError(f"w3m executable not found at '{w3m_path}'") from exc |
|
|
except subprocess.CalledProcessError as exc: |
|
|
stderr = (exc.stderr or "").strip() |
|
|
message = f"w3m exited with status {exc.returncode}" |
|
|
if stderr: |
|
|
message = f"{message}: {stderr}" |
|
|
raise W3MError(message) from exc |
|
|
|
|
|
return completed.stdout |
|
|
finally: |
|
|
try: |
|
|
os.unlink(tmp_file.name) |
|
|
except OSError: |
|
|
pass |
|
|
|
|
|
|
|
|
def is_empty_element(node: HtmlElement): |
|
|
return not node.getchildren() and not node.text |
|
|
|
|
|
|
|
|
def iter_node(element: HtmlElement): |
|
|
yield element |
|
|
for sub_element in element: |
|
|
if isinstance(sub_element, HtmlElement): |
|
|
yield from iter_node(sub_element) |
|
|
|
|
|
|
|
|
def img_div_check(tree): |
|
|
""" |
|
|
如果一个div中只有一张图,且子节点数小于4则保留 |
|
|
""" |
|
|
if len(tree.xpath(".//img")) == 1 and len(tree.xpath(".//*")) < 4: |
|
|
return False |
|
|
else: |
|
|
return True |
|
|
|
|
|
|
|
|
def text_len(s): |
|
|
s = re.sub(" +", " ", s) |
|
|
s = re.sub("[\n\t\r]+", "\n", s) |
|
|
english_words = s.split() |
|
|
chinese_characters = re.findall(r"[\u4e00-\u9fff]", s) |
|
|
japanese_characters = re.findall(r"[\u3040-\u309F\u30A0-\u30FF]", s) |
|
|
arabic_characters = re.findall(r"[\u0600-\u06FF]", s) |
|
|
return ( |
|
|
len(english_words) |
|
|
+ len(chinese_characters) |
|
|
+ len(japanese_characters) |
|
|
+ len(arabic_characters) |
|
|
) |
|
|
|
|
|
|
|
|
def alias(element): |
|
|
if element is None: |
|
|
return "" |
|
|
tag = element.tag |
|
|
|
|
|
if tag in ["html", "body"]: |
|
|
return tag |
|
|
attribs = [tag] |
|
|
for k, v in element.attrib.items(): |
|
|
if k == Unique_ID: |
|
|
continue |
|
|
k, v = re.sub(r"\s*", "", k), re.sub(r"\s*", "", v) |
|
|
v = re.sub(r"-\d+", "", v) |
|
|
attribs.append(f'[{k}="{v}"]' if v else f"[{k}]") |
|
|
result = "".join(attribs) |
|
|
|
|
|
|
|
|
nth = "" |
|
|
for child in element.getchildren(): |
|
|
if child.tag in ["dt", "dd", "li"]: |
|
|
try: |
|
|
|
|
|
nth += str(len(list(child.getchildren()))) |
|
|
except: |
|
|
pass |
|
|
continue |
|
|
attribs = [child.tag] |
|
|
for k, v in child.attrib.items(): |
|
|
if k == Unique_ID: |
|
|
continue |
|
|
k, v = re.sub(r"\s*", "", k), re.sub(r"\s*", "", v) |
|
|
v = re.sub(r"-\d+", "", v) |
|
|
attribs.append(f"[{k}]" if v else f"[{k}]") |
|
|
nth += "".join(attribs) |
|
|
|
|
|
result += f":{nth}" |
|
|
return result |
|
|
|
|
|
|
|
|
def similarity2(s1, s2): |
|
|
if not s1 or not s2: |
|
|
return 0 |
|
|
s1_set = set(list(s1)) |
|
|
s2_set = set(list(s2)) |
|
|
intersection = s1_set.intersection(s2_set) |
|
|
union = s1_set.union(s2_set) |
|
|
return len(intersection) / len(union) |
|
|
|
|
|
|
|
|
def similarity_with_element(element1, element2): |
|
|
alias1 = alias(element1) |
|
|
alias2 = alias(element2) |
|
|
return similarity2(alias1, alias2) |
|
|
|
|
|
|
|
|
def similarity_with_siblings(element, siblings): |
|
|
scores = [] |
|
|
for sibling in siblings: |
|
|
|
|
|
scores.append(similarity_with_element(element, sibling)) |
|
|
if not scores: |
|
|
return 0 |
|
|
|
|
|
min_value = min(scores) |
|
|
scores.remove(min_value) |
|
|
return np.mean(scores) |
|
|
|
|
|
|
|
|
def number_of_a_char(ele, xpath=".//a//text()"): |
|
|
s = "".join(ele.xpath(xpath)).strip() |
|
|
return text_len(s) |
|
|
|
|
|
|
|
|
def number_of_char(ele, xpath=".//text()"): |
|
|
s = "".join(ele.xpath(xpath)).strip() |
|
|
return text_len(s) + 1 |
|
|
|
|
|
|
|
|
def density_of_a_text(ele, pre=0.7): |
|
|
a_char = number_of_a_char(ele) |
|
|
t_char = number_of_char(ele) |
|
|
if a_char / t_char >= pre: |
|
|
return True |
|
|
else: |
|
|
return False |
|
|
|
|
|
|
|
|
def uniquify_list(l): |
|
|
return list(dict.fromkeys(l)) |
|
|
|
|
|
|
|
|
def trim(string): |
|
|
"""Remove unnecessary spaces within a text string""" |
|
|
try: |
|
|
return " ".join(string.split()).strip() |
|
|
except (AttributeError, TypeError): |
|
|
return None |
|
|
|
|
|
|
|
|
def collect_link_info(links_xpath, favor_precision=False): |
|
|
shortelems, mylist = 0, [] |
|
|
threshold = 10 if not favor_precision else 50 |
|
|
for subelem in links_xpath: |
|
|
subelemtext = trim(subelem.text_content()) |
|
|
if subelemtext: |
|
|
mylist.append(subelemtext) |
|
|
if len(subelemtext) < threshold: |
|
|
shortelems += 1 |
|
|
lengths = sum(len(text) for text in mylist) |
|
|
return lengths, len(mylist), shortelems, mylist |
|
|
|
|
|
|
|
|
def link_density_test(element, text, favor_precision=False): |
|
|
links_xpath, mylist = element.findall(".//a"), [] |
|
|
if links_xpath: |
|
|
if element.tag == "p": |
|
|
if favor_precision is False: |
|
|
if element.getnext() is None: |
|
|
limitlen, threshold = 60, 0.8 |
|
|
else: |
|
|
limitlen, threshold = 30, 0.8 |
|
|
else: |
|
|
limitlen, threshold = 200, 0.8 |
|
|
else: |
|
|
if element.getnext() is None: |
|
|
limitlen, threshold = 300, 0.8 |
|
|
else: |
|
|
limitlen, threshold = 100, 0.8 |
|
|
elemlen = len(text) |
|
|
if elemlen < limitlen: |
|
|
linklen, elemnum, shortelems, mylist = collect_link_info( |
|
|
links_xpath, favor_precision |
|
|
) |
|
|
if elemnum == 0: |
|
|
return True, mylist |
|
|
if density_of_a_text(element, 0.5): |
|
|
if linklen > threshold * elemlen or ( |
|
|
elemnum > 1 and shortelems / elemnum > 0.8 |
|
|
): |
|
|
return True, mylist |
|
|
return False, mylist |
|
|
|
|
|
|
|
|
def text_strip(text): |
|
|
return text.strip() if text else text |
|
|
|
|
|
|
|
|
def wrap_math(s, display=False): |
|
|
s = re.sub(r"\s+", " ", s) |
|
|
s = color_regex.sub("", s) |
|
|
s = s.replace("$", "") |
|
|
s = s.replace("\n", " ").replace("\\n", "") |
|
|
s = s.strip() |
|
|
if len(s) == 0: |
|
|
return s |
|
|
|
|
|
if "align" in s: |
|
|
return s |
|
|
if display: |
|
|
return "$$" + s + "$$" |
|
|
return "$" + s + "$" |
|
|
|
|
|
|
|
|
def extract_asciimath(s): |
|
|
parsed = asciimath2tex.translate(s) |
|
|
return parsed |
|
|
|
|
|
|
|
|
cur_file = os.path.abspath(__file__) |
|
|
xsl_path = os.path.join(os.path.dirname(cur_file), "mmltex/mmltex.xsl") |
|
|
|
|
|
xslt = etree.parse(xsl_path) |
|
|
transform = etree.XSLT(xslt) |
|
|
|
|
|
|
|
|
def mml_to_latex(mml_code): |
|
|
|
|
|
mml_code = re.sub(r"(<math.*?>)", r"\1", mml_code) |
|
|
mml_ns = mml_code.replace( |
|
|
"<math>", '<math xmlns="http://www.w3.org/1998/Math/MathML">' |
|
|
) |
|
|
|
|
|
mml_ns = mml_ns.replace(""", '"') |
|
|
mml_ns = mml_ns.replace("'\\\"", '"').replace("\\\"'", '"') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pattern = r'"([^"]+?)\'' |
|
|
mml_ns = re.sub(pattern, r'"\1"', mml_ns) |
|
|
|
|
|
mml_dom = etree.fromstring(mml_ns) |
|
|
mmldom = transform(mml_dom) |
|
|
latex_code = str(mmldom) |
|
|
return latex_code |
|
|
|