Scipaths / src /step_01_fetch /fetch_metadata.py
Eric Chamoun
Initial SciPaths Space release
0a55f0f
from pathlib import Path
import argparse
import json
import os
import random
import re
import tarfile
import time
import arxiv
import requests
from config import ACL_IDS_PATH
from process_tex_source import preprocess_tex, extract_introduction_and_related
from semanticscholar_client import get_paper, get_paper_links, search_by_title
def load_ids(path: Path):
return json.loads(path.read_text(encoding="utf-8"))
def ensure_dir(path: Path):
path.mkdir(parents=True, exist_ok=True)
_ARXIV_LAST_TS = 0.0
def _cleanup_partial_source_dir(source_dir: Path) -> None:
for pattern in ("*.tar.gz", "*.tgz", "*.tar"):
for path in source_dir.glob(pattern):
try:
path.unlink()
except Exception:
pass
def _download_arxiv_source_with_retries(paper, source_dir: Path, arxiv_id: str) -> Path | None:
max_retries = int(os.getenv("ARXIV_SOURCE_MAX_RETRIES", "4"))
base_sleep = float(os.getenv("ARXIV_SOURCE_BASE_SLEEP", "2.0"))
max_sleep = float(os.getenv("ARXIV_MAX_BACKOFF", "60"))
last_exc = None
for attempt in range(max_retries):
_cleanup_partial_source_dir(source_dir)
try:
_arxiv_min_interval_sleep()
tar_path = Path(paper.download_source(dirpath=str(source_dir)))
if not tar_path.exists():
raise FileNotFoundError(f"download_source returned {tar_path}, but the file does not exist")
if tar_path.stat().st_size < 1024:
raise IOError(f"downloaded source archive is unexpectedly small ({tar_path.stat().st_size} bytes)")
return tar_path
except Exception as exc:
last_exc = exc
sleep = min(base_sleep * (2**attempt), max_sleep) + random.uniform(0.0, 0.5)
print(f"[WARN] Failed to download source for {arxiv_id} on attempt {attempt + 1}/{max_retries}: {exc}")
if attempt + 1 < max_retries:
print(f"[INFO] Retrying source download in {sleep:.2f}s")
time.sleep(sleep)
print(f"[WARN] Source download failed for {arxiv_id} after {max_retries} attempts: {last_exc}")
return None
def _arxiv_min_interval_sleep() -> None:
"""Global throttle to avoid arXiv API rate limits."""
global _ARXIV_LAST_TS
min_interval = float(os.getenv("ARXIV_MIN_INTERVAL", "1.0"))
now = time.monotonic()
elapsed = now - _ARXIV_LAST_TS
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
_ARXIV_LAST_TS = time.monotonic()
def download_arxiv_tex(arxiv_id: str, base_dir: Path) -> Path | None:
"""
Download LaTeX source from arXiv and return the path to a merged TeX file.
- arxiv_id: e.g. "2410.22815"
- base_dir: paper directory where source should be unpacked
"""
source_dir = base_dir / f"tex_{arxiv_id}"
source_dir.mkdir(parents=True, exist_ok=True)
search = arxiv.Search(id_list=[arxiv_id])
max_retries = int(os.getenv("ARXIV_MAX_RETRIES", "6"))
base_sleep = float(os.getenv("ARXIV_BASE_SLEEP", "2.0"))
max_sleep = float(os.getenv("ARXIV_MAX_BACKOFF", "60"))
paper = None
for attempt in range(max_retries):
try:
_arxiv_min_interval_sleep()
paper = next(search.results())
break
except StopIteration:
print(f"[WARN] No arXiv paper found for ID {arxiv_id}")
return None
except arxiv.HTTPError as exc:
if getattr(exc, "status", None) == 429 or "429" in str(exc):
sleep = min(base_sleep * (2**attempt), max_sleep) + random.uniform(0.0, 0.5)
print(f"[WARN] arXiv 429 → retrying in {sleep:.2f}s")
time.sleep(sleep)
continue
print(f"[WARN] arXiv HTTP error for {arxiv_id}: {exc}")
return None
except Exception as exc:
sleep = min(base_sleep * (2**attempt), max_sleep) + random.uniform(0.0, 0.5)
print(f"[WARN] arXiv error {exc} → retrying in {sleep:.2f}s")
time.sleep(sleep)
continue
if paper is None:
print(f"[ERROR] Giving up after {max_retries} attempts for arXiv ID {arxiv_id}")
return None
tar_path = _download_arxiv_source_with_retries(paper, source_dir, arxiv_id)
if tar_path is None:
return None
try:
with tarfile.open(tar_path) as tar:
tar.extractall(path=source_dir)
os.remove(tar_path)
except Exception as exc:
print(f"[WARN] Failed to extract source for {arxiv_id}: {exc}")
return None
processed_tex = preprocess_tex(source_dir)
if processed_tex:
extract_introduction_and_related(processed_tex)
if not processed_tex or not processed_tex.exists():
print(f"[WARN] Could not produce merged TeX for {arxiv_id}")
return None
print(f"[INFO] Processed LaTeX for {arxiv_id} at {processed_tex}")
return processed_tex
def _extract_arxiv_id_from_text(text: str) -> str | None:
if not text:
return None
match = re.search(r"\b(\d{4}\.\d{4,5}(?:v\d+)?)\b", text)
if match:
return match.group(1)
match = re.search(r"arxiv[:\s/]*(\d{4}\.\d{4,5}(?:v\d+)?)", text, re.IGNORECASE)
if match:
return match.group(1)
return None
def _safe_write_json(path: Path, payload) -> None:
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def _safe_write_text(path: Path, text: str) -> None:
path.write_text(text, encoding="utf-8")
def _query_openreview_for_paper(openreview_id: str) -> dict | None:
"""Query OpenReview using a real OpenReview note/forum id."""
if not openreview_id:
return None
try_urls = [
f"https://api.openreview.net/notes?forum={openreview_id}",
f"https://api2.openreview.net/notes?forum={openreview_id}",
f"https://api.openreview.net/notes?id={openreview_id}",
f"https://api2.openreview.net/notes?id={openreview_id}",
]
for url in try_urls:
try:
response = requests.get(url, timeout=20)
if response.status_code != 200:
continue
payload = response.json()
except Exception:
continue
notes = None
if isinstance(payload, dict) and isinstance(payload.get("notes"), list):
notes = payload["notes"]
elif isinstance(payload, dict) and payload.get("content"):
notes = [payload]
elif isinstance(payload, list):
notes = payload
if not notes:
continue
note = notes[0]
content = note.get("content") if isinstance(note, dict) else None
title = None
arxiv_id = None
pdf_url = None
if isinstance(content, dict):
raw_title = content.get("title") or content.get("paperTitle")
title = raw_title.get("value") if isinstance(raw_title, dict) else raw_title
raw_pdf = content.get("pdf")
pdf_url = raw_pdf.get("value") if isinstance(raw_pdf, dict) else raw_pdf
for value in content.values():
if isinstance(value, dict):
value = value.get("value")
if isinstance(value, list):
value = " ".join(str(item) for item in value)
if isinstance(value, str):
arxiv_id = _extract_arxiv_id_from_text(value)
if arxiv_id:
break
if not title and isinstance(note, dict):
title = note.get("title") or note.get("forumTitle")
if not arxiv_id and isinstance(note, dict):
for value in note.values():
if isinstance(value, str):
arxiv_id = _extract_arxiv_id_from_text(value)
if arxiv_id:
break
return {
"title": title,
"arxiv_id": arxiv_id,
"pdf_url": pdf_url,
"openreview_id": openreview_id,
"source_url": url,
}
return None
def _treat_as_openreview(paper: dict) -> bool:
acl_id = str(paper.get("id", "")).lower()
id_type = str(paper.get("id_type", "")).lower()
return (
id_type == "openreview"
or bool(paper.get("openreview_id"))
or acl_id.startswith("neurips-")
or acl_id.startswith("icml-")
)
def _fetch_s2_by_title(title: str, acl_id: str) -> tuple[int, dict | None]:
if not title:
print(f"[WARN] no title available for {acl_id} → skipping.")
return 0, None
hit = search_by_title(title)
if not hit:
print(f"[WARN] no S2 match for {acl_id} ({title}) → skipping.")
return 0, None
s2_id = hit["paperId"]
print(f"[DEBUG] title search matched semantic scholar paperId={s2_id}")
return get_paper(s2_id, id_type="SemanticScholar")
def _best_arxiv_id(*values: str) -> str | None:
for value in values:
arxiv_id = _extract_arxiv_id_from_text(value or "")
if arxiv_id:
return arxiv_id
return None
def _write_openreview_snapshot(paper_dir: Path, payload: dict) -> None:
if payload:
_safe_write_json(paper_dir / "openreview_metadata.json", payload)
def _write_metadata_outputs(paper_dir: Path, acl_id: str, data: dict) -> None:
meta_path = paper_dir / "paper_metadata.json"
_safe_write_json(meta_path, [data])
print(f"[DEBUG] wrote metadata to {meta_path}")
external_ids = data.get("externalIds", {}) or {}
arxiv_id = external_ids.get("ArXiv")
if arxiv_id:
download_arxiv_tex(arxiv_id=arxiv_id, base_dir=paper_dir)
sections_dir = paper_dir / "sections"
sections_dir.mkdir(exist_ok=True)
abstract = data.get("abstract")
if abstract:
_safe_write_text(sections_dir / "abstract.txt", abstract)
tldr_obj = data.get("tldr")
if isinstance(tldr_obj, dict) and tldr_obj.get("text"):
_safe_write_text(sections_dir / "tldr.txt", tldr_obj["text"])
semantic_id = data.get("paperId")
if not semantic_id:
print(f"[WARN] no semantic_id for {acl_id} → skip refs/cites.")
return
citation_count = data.get("citationCount", 0)
reference_count = data.get("referenceCount", 0)
ref_status, refs = get_paper_links(semantic_id, "references", reference_count)
if ref_status == 200:
_safe_write_json(paper_dir / "references_metadata.json", refs)
cit_status, cits = get_paper_links(semantic_id, "citations", citation_count)
if cit_status == 200:
_safe_write_json(paper_dir / "citations_metadata.json", cits)
if "ArXiv" not in external_ids:
_safe_write_text(paper_dir / "no_arxiv.txt", "no arxiv for this paper")
def fetch_one_acl_id(paper: dict, base_dir: Path):
acl_id = paper["id"]
title = (paper.get("title") or "").strip()
id_type = paper.get("id_type", "ACL")
openreview_id = paper.get("openreview_id", "")
input_pdf_url = paper.get("pdf_url", "")
s2_key = os.getenv("SEMANTIC_SCHOLAR_API_KEY", "")
print(
f"[DEBUG] fetch_one_acl_id: id={acl_id} id_type={id_type} "
f"title_len={len(title)} s2_key_present={'yes' if bool(s2_key) else 'no'} "
f"s2_key_len={len(s2_key)}"
)
paper_dir = base_dir / acl_id
ensure_dir(paper_dir)
meta_path = paper_dir / "paper_metadata.json"
if meta_path.exists():
return
status, data = 0, None
fetch_label = f"{id_type}:{acl_id}"
is_openreview = _treat_as_openreview(paper)
openreview_meta = None
attempted_title_search = False
if is_openreview:
try:
openreview_meta = _query_openreview_for_paper(openreview_id or acl_id)
except Exception as exc:
print(f"[WARN] OpenReview lookup failed for {acl_id}: {exc}")
openreview_meta = None
if openreview_meta:
_write_openreview_snapshot(paper_dir, openreview_meta)
or_title = (openreview_meta.get("title") or title or "").strip()
arxiv_id = (
_best_arxiv_id(
openreview_meta.get("arxiv_id", ""),
openreview_meta.get("pdf_url", ""),
input_pdf_url,
)
or ""
)
if arxiv_id:
print(f"[DEBUG] OpenReview -> found ArXiv {arxiv_id} for {acl_id}")
status, data = get_paper(arxiv_id, id_type="ArXiv")
fetch_label = f"ArXiv:{arxiv_id}"
title = or_title or title
elif or_title:
print(f"[DEBUG] OpenReview -> no arXiv for {acl_id}, title-searching")
status, data = _fetch_s2_by_title(or_title, acl_id)
fetch_label = f"title:{or_title[:80]}"
title = or_title
attempted_title_search = True
else:
print(f"[WARN] OpenReview metadata for {acl_id} had neither title nor arXiv")
else:
print(f"[WARN] no OpenReview metadata for {acl_id} (openreview_id={openreview_id or acl_id})")
if data is None and title and not attempted_title_search:
print(f"[DEBUG] OpenReview fallback -> title-searching extracted title for {acl_id}")
status, data = _fetch_s2_by_title(title, acl_id)
fetch_label = f"title:{title[:80]}"
attempted_title_search = True
if data is None and not is_openreview:
status, data = get_paper(acl_id, id_type=id_type)
fetch_label = f"{id_type}:{acl_id}"
if data is None and not attempted_title_search:
print(
f"[WARN] direct fetch failed for {fetch_label} "
f"(status={status}) → trying title search with title_len={len(title)}"
)
status, data = _fetch_s2_by_title(title, acl_id)
if status != 200 or data is None:
print(f"[WARN] still no data for {acl_id} → skipping.")
return
_write_metadata_outputs(paper_dir, acl_id, data)
print("[SUCCESS]")
def fetch_all_metadata(acl_ids_path: Path, out_dir: Path, start_from: str | None = None, resume: bool = False):
raw = json.loads(acl_ids_path.read_text(encoding="utf-8"))
papers = raw if isinstance(raw[0], dict) else [{"id": x, "title": ""} for x in raw]
start_seen = start_from is None
for paper in papers:
pid = str(paper.get("id", ""))
if not start_seen:
if pid == start_from:
start_seen = True
else:
continue
if resume:
paper_dir = out_dir / pid
if (paper_dir / "paper_metadata.json").exists():
continue
fetch_one_acl_id(paper, out_dir)
return "Meta Data Completed"
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ids", type=str, required=True, help="Path to JSON file with paper IDs.")
parser.add_argument("--outdir", type=str, default="papers", help="Output directory for metadata.")
parser.add_argument("--start-from", type=str, default=None, help="Start from this paper ID.")
parser.add_argument("--resume", action="store_true", help="Skip papers that already have paper_metadata.json.")
args = parser.parse_args()
ACL_IDS_PATH = Path(args.ids).expanduser().resolve()
OUTDIR = Path(args.outdir).expanduser().resolve()
if not ACL_IDS_PATH.exists():
raise FileNotFoundError(f"Could not find {ACL_IDS_PATH}")
print(f"[INFO] Using ID list from {ACL_IDS_PATH}")
print(f"[INFO] Output will be saved to {OUTDIR}")
start = time.time()
fetch_all_metadata(acl_ids_path=ACL_IDS_PATH, out_dir=OUTDIR, start_from=args.start_from, resume=args.resume)
print("done in", time.time() - start, "s")