from __future__ import annotations import argparse import json import os import shutil import tempfile import time from collections import defaultdict from datetime import UTC, datetime from pathlib import Path from typing import Any from huggingface_hub import HfApi from slop_farmer.app_config import command_defaults, extract_cli_config_path from slop_farmer.config import ( DatasetRefreshOptions, NewContributorReportOptions, PrScopeOptions, RepoRef, resolve_github_token, ) from slop_farmer.data.dataset_card import build_hf_dataset_card from slop_farmer.data.github_api import GitHubClient from slop_farmer.data.hf_dataset_repo import ( list_remote_paths, load_remote_file, load_remote_json_file, stable_snapshot_candidates, ) from slop_farmer.data.links import build_pr_duplicate_candidate_rows, build_text_link_rows from slop_farmer.data.normalize import ( issue_url_to_number, normalize_comment, normalize_issue, normalize_pr_diff, normalize_pr_file, normalize_pull_request, normalize_review, normalize_review_comment, normalize_timeline_event, ) from slop_farmer.data.parquet_io import ( SCHEMAS, read_parquet_rows, write_json, write_parquet, write_text, ) from slop_farmer.reports.new_contributor_report import run_new_contributor_report from slop_farmer.reports.pr_scope import run_pr_scope_report PRIMARY_KEYS: dict[str, tuple[str, ...]] = { "issues": ("github_id",), "pull_requests": ("github_id",), "comments": ("github_id",), "reviews": ("github_id",), "review_comments": ("github_id",), "pr_files": ("repo", "pull_request_number", "filename"), "pr_diffs": ("repo", "pull_request_number"), "links": ( "repo", "source_type", "source_number", "source_github_id", "target_owner", "target_repo", "target_number", "link_type", "link_origin", ), "events": ( "repo", "parent_kind", "parent_number", "event", "created_at", "actor_login", "source_issue_number", "source_issue_url", "commit_id", "label_name", ), } CHECKPOINT_PREFIXES = ("_checkpoints", "checkpoints") def log(message: str) -> None: stamp = datetime.now(tz=UTC).strftime("%H:%M:%SZ") print(f"[{stamp}] {message}", flush=True) def iso_now() -> str: return datetime.now(tz=UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z") def snapshot_id() -> str: return datetime.now(tz=UTC).strftime("%Y%m%dT%H%M%SZ") def row_key(row: dict[str, Any], fields: tuple[str, ...]) -> str: return json.dumps([row.get(field) for field in fields], default=str) def merge_rows( table_name: str, previous_rows: list[dict[str, Any]], delta_rows: list[dict[str, Any]], ) -> list[dict[str, Any]]: if table_name == "pr_files": refreshed_prs = { (row.get("repo"), row.get("pull_request_number")) for row in delta_rows if row.get("pull_request_number") is not None } previous_rows = [ row for row in previous_rows if (row.get("repo"), row.get("pull_request_number")) not in refreshed_prs ] merged: dict[str, dict[str, Any]] = {} for row in previous_rows: merged[row_key(row, PRIMARY_KEYS[table_name])] = row for row in delta_rows: merged[row_key(row, PRIMARY_KEYS[table_name])] = row return list(merged.values()) def checkpoint_dirs(remote_paths: set[str]) -> list[tuple[str, str]]: by_snapshot_id: dict[str, str] = {} for path in remote_paths: parts = path.split("/") if len(parts) < 3 or parts[0] not in CHECKPOINT_PREFIXES: continue snapshot_key = parts[1] prefix = parts[0] current = by_snapshot_id.get(snapshot_key) if current is None or current.startswith("checkpoints/"): by_snapshot_id[snapshot_key] = f"{prefix}/{snapshot_key}" return [(sid, by_snapshot_id[sid]) for sid in sorted(by_snapshot_id)] def copy_remote_file_from_candidates( api: HfApi, repo_id: str, local_dir: Path, destination: Path, candidate_paths: list[str], ) -> bool: for candidate in candidate_paths: downloaded = load_remote_file(api, repo_id, candidate, local_dir) if downloaded is None: continue destination.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(downloaded, destination) return True return False def materialize_previous_snapshot_dir( *, api: Any, repo_id: str, previous_root: Path, stable_snapshot_id: str | None, latest_pointer: dict[str, Any] | None, previous_tables: dict[str, list[dict[str, Any]]], ) -> Path | None: if not stable_snapshot_id: return None snapshot_dir = (previous_root / "materialized-snapshots" / stable_snapshot_id).resolve() snapshot_dir.mkdir(parents=True, exist_ok=True) for table_name, rows in previous_tables.items(): write_parquet(rows, snapshot_dir / f"{table_name}.parquet", table_name) for artifact_name in ( "manifest.json", "new_contributors.parquet", "new-contributors-report.json", "new-contributors-report.md", ): copy_remote_file_from_candidates( api, repo_id, previous_root, snapshot_dir / artifact_name, stable_snapshot_candidates(latest_pointer, artifact_name), ) return snapshot_dir def load_remote_table_from_candidates( api: HfApi, repo_id: str, table_name: str, local_dir: Path, candidate_paths: list[str], ) -> list[dict[str, Any]]: for candidate in candidate_paths: downloaded = load_remote_file(api, repo_id, candidate, local_dir) if downloaded is not None: return read_parquet_rows(downloaded) return [] def viewer_comment_rows( comments: list[dict[str, Any]], pull_requests: list[dict[str, Any]], ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: pr_numbers = {int(row["number"]) for row in pull_requests if row.get("number") is not None} issue_comments: list[dict[str, Any]] = [] pr_comments: list[dict[str, Any]] = [] for row in comments: parent_number = row.get("parent_number") parent_kind = row.get("parent_kind") if parent_kind == "pull_request" or parent_number in pr_numbers: pr_comments.append(row) else: issue_comments.append(row) return issue_comments, pr_comments def upload_delta_checkpoint( *, api: HfApi, repo_id: str, work_dir: Path, repo_slug: str, sid: str, stage: str, delta_tables: dict[str, list[dict[str, Any]]], progress: dict[str, Any], ) -> None: checkpoint_root = work_dir / "checkpoint_upload" if checkpoint_root.exists(): shutil.rmtree(checkpoint_root) checkpoint_root.mkdir(parents=True, exist_ok=True) for table_name, rows in delta_tables.items(): write_parquet(rows, checkpoint_root / f"{table_name}.parquet", table_name) write_json( {"repo": repo_slug, "snapshot_id": sid, **progress}, checkpoint_root / "progress.json" ) write_json( {"repo": repo_slug, "snapshot_id": sid, **progress}, checkpoint_root / "state" / "in_progress.json", ) api.upload_folder( folder_path=str(checkpoint_root), path_in_repo=f"_checkpoints/{sid}", repo_id=repo_id, repo_type="dataset", commit_message=f"Checkpoint {sid} ({stage})", ) def remaining_limit(limit: int | None, used: int) -> int | None: if limit is None: return None return max(limit - used, 0) def _build_argument_parser(*, config_path: Path | None = None) -> argparse.ArgumentParser: defaults = command_defaults("refresh-dataset", config_path=config_path) parser = argparse.ArgumentParser() parser.add_argument("--config", type=Path, help="Optional repo config file.") parser.add_argument("--repo", default=defaults.get("repo", "huggingface/transformers")) parser.add_argument("--hf-repo-id", default=defaults.get("hf-repo-id")) parser.add_argument("--max-issues", type=int, default=defaults.get("max-issues")) parser.add_argument("--max-prs", type=int, default=defaults.get("max-prs")) parser.add_argument( "--max-issue-comments", type=int, default=defaults.get("max-issue-comments"), ) parser.add_argument( "--max-reviews-per-pr", type=int, default=defaults.get("max-reviews-per-pr"), ) parser.add_argument( "--max-review-comments-per-pr", type=int, default=defaults.get("max-review-comments-per-pr"), ) parser.add_argument( "--fetch-timeline", action="store_true", default=bool(defaults.get("fetch-timeline", False)), ) parser.add_argument( "--new-contributor-report", dest="new_contributor_report", action="store_true", default=bool(defaults.get("new-contributor-report", True)), ) parser.add_argument( "--no-new-contributor-report", dest="new_contributor_report", action="store_false", ) parser.add_argument( "--new-contributor-window-days", type=int, default=int(defaults.get("new-contributor-window-days", 42)), ) parser.add_argument( "--new-contributor-max-authors", type=int, default=int(defaults.get("new-contributor-max-authors", 25)), ) parser.add_argument("--http-timeout", type=int, default=300) parser.add_argument("--http-max-retries", type=int, default=8) parser.add_argument("--checkpoint-every-comments", type=int, default=1000) parser.add_argument("--checkpoint-every-prs", type=int, default=25) parser.add_argument( "--private-hf-repo", dest="private_hf_repo", action="store_true", default=bool(defaults.get("private-hf-repo", False)), ) parser.add_argument("--private", dest="private_hf_repo", action="store_true") parser.set_defaults( cluster_suppression_rules=tuple(defaults.get("cluster-suppression-rules", ())) ) return parser def parse_args(argv: list[str] | None = None) -> argparse.Namespace: config_path = extract_cli_config_path(argv) parser = _build_argument_parser(config_path=config_path) args = parser.parse_args(argv) if not args.hf_repo_id: parser.error("--hf-repo-id is required (or set dataset_id in --config)") return args def run_dataset_refresh(options: DatasetRefreshOptions) -> dict[str, Any]: hf_token = os.getenv("HF_TOKEN") github_token = resolve_github_token() if not github_token: raise RuntimeError("GITHUB_TOKEN must be set or resolvable via gh auth/.env") repo_slug = options.repo.slug owner, repo_name = options.repo.owner, options.repo.name sid = snapshot_id() crawl_started_at = iso_now() extracted_at = iso_now() api = HfApi(token=hf_token) api.create_repo( repo_id=options.hf_repo_id, repo_type="dataset", private=options.private_hf_repo, exist_ok=True, ) with tempfile.TemporaryDirectory(prefix="slop-farmer-job-") as tmp: root = Path(tmp) previous_root = root / "previous" output_root = root / "output" previous_root.mkdir(parents=True, exist_ok=True) output_root.mkdir(parents=True, exist_ok=True) remote_paths = list_remote_paths(api, options.hf_repo_id) previous_watermark = load_remote_json_file( api, options.hf_repo_id, "state/watermark.json", previous_root ) remote_manifest = load_remote_json_file( api, options.hf_repo_id, "manifest.json", previous_root ) latest_pointer = ( load_remote_json_file(api, options.hf_repo_id, "snapshots/latest.json", previous_root) if "snapshots/latest.json" in remote_paths else None ) stable_snapshot_id = None if previous_watermark: stable_snapshot_id = previous_watermark.get("last_successful_snapshot_id") elif latest_pointer: stable_snapshot_id = latest_pointer.get("latest_snapshot_id") elif remote_manifest: stable_snapshot_id = remote_manifest.get("snapshot_id") log(f"Starting dataset refresh for {repo_slug}") log(f"Target dataset repo: {options.hf_repo_id}") previous_tables = { table_name: [] for table_name in SCHEMAS if table_name != "new_contributors" } for table_name in previous_tables: previous_tables[table_name] = load_remote_table_from_candidates( api, options.hf_repo_id, table_name, previous_root, stable_snapshot_candidates(latest_pointer, f"{table_name}.parquet"), ) checkpoint_progress: dict[str, Any] | None = None best_comment_checkpoint_progress: dict[str, Any] | None = None for checkpoint_sid, checkpoint_dir in checkpoint_dirs(remote_paths): if stable_snapshot_id is not None and checkpoint_sid <= str(stable_snapshot_id): continue progress_payload = load_remote_json_file( api, options.hf_repo_id, f"{checkpoint_dir}/progress.json", previous_root ) or load_remote_json_file( api, options.hf_repo_id, f"{checkpoint_dir}/state/in_progress.json", previous_root, ) if progress_payload is not None: checkpoint_progress = progress_payload if ( progress_payload.get("effective_since") is None and (progress_payload.get("counts") or {}).get("comments", 0) > 0 and ( best_comment_checkpoint_progress is None or (progress_payload.get("counts") or {}).get("comments", 0) > (best_comment_checkpoint_progress.get("counts") or {}).get("comments", 0) ) ): best_comment_checkpoint_progress = progress_payload for table_name in previous_tables: checkpoint_rows = load_remote_table_from_candidates( api, options.hf_repo_id, table_name, previous_root, [f"{checkpoint_dir}/{table_name}.parquet"], ) if checkpoint_rows: previous_tables[table_name] = merge_rows( table_name, previous_tables[table_name], checkpoint_rows, ) effective_since = None if checkpoint_progress and checkpoint_progress.get("effective_since") is not None: effective_since = checkpoint_progress.get("effective_since") log(f"Resuming from incomplete checkpoint window starting at {effective_since}") elif previous_watermark and previous_watermark.get("next_since") is not None: effective_since = previous_watermark.get("next_since") log(f"Resuming from remote watermark {effective_since}") elif ( remote_manifest and isinstance(remote_manifest.get("watermark"), dict) and remote_manifest["watermark"].get("next_since") is not None ): effective_since = remote_manifest["watermark"].get("next_since") log(f"Bootstrapping remote watermark from root manifest {effective_since}") else: log("No successful watermark found; running full snapshot") client = GitHubClient( token=github_token, timeout=options.http_timeout, max_retries=options.http_max_retries, log=log, ) previous_snapshot_dir = materialize_previous_snapshot_dir( api=api, repo_id=options.hf_repo_id, previous_root=previous_root, stable_snapshot_id=str(stable_snapshot_id) if stable_snapshot_id is not None else None, latest_pointer=latest_pointer, previous_tables=previous_tables, ) rate_limit = client.get_json("/rate_limit") core = (rate_limit.get("resources") or {}).get("core") or {} limit = core.get("limit") remaining = core.get("remaining") reset_at = core.get("reset") log(f"GitHub core rate limit: limit={limit} remaining={remaining} reset={reset_at}") if limit is not None and int(limit) <= 60: raise RuntimeError("GITHUB_TOKEN appears to be missing, invalid, or not being applied") if remaining == 0 and reset_at: sleep_for = max(int(reset_at) - int(time.time()), 1) log(f"GitHub token exhausted before bootstrap; sleeping {sleep_for}s until reset") time.sleep(sleep_for) log("Fetching changed issue and pull request stubs from GitHub") issue_stubs = list( client.iter_repo_issues(owner, repo_name, effective_since, options.max_issues) ) issues = [item for item in issue_stubs if "pull_request" not in item] pr_stubs = [item for item in issue_stubs if "pull_request" in item] if options.max_prs is not None: pr_stubs = pr_stubs[: options.max_prs] log(f"Fetched {len(issue_stubs)} changed stubs") issue_number_to_kind = { item["number"]: ("pull_request" if "pull_request" in item else "issue") for item in issue_stubs } issue_rows = [normalize_issue(repo_slug, item, sid, extracted_at) for item in issues] comment_rows: list[dict[str, Any]] = [] next_comment_checkpoint = options.checkpoint_every_comments reuse_checkpoint_comments = ( stable_snapshot_id is None and effective_since is None and best_comment_checkpoint_progress is not None and bool(previous_tables["comments"]) ) if reuse_checkpoint_comments: log( f"Reusing {len(previous_tables['comments'])} checkpoint comments from prior partial runs" ) else: for index, item in enumerate(issue_stubs, start=1): if not item.get("comments"): continue remaining_comments = remaining_limit(options.max_issue_comments, len(comment_rows)) if remaining_comments == 0: break if index == 1 or index % 25 == 0: log(f"Collecting discussion comments; {len(comment_rows)} collected so far") for comment in client.iter_issue_comments_for_number( owner, repo_name, int(item["number"]), effective_since, remaining_comments, ): parent_number = issue_url_to_number(comment.get("issue_url")) parent_kind = issue_number_to_kind.get(parent_number, "issue_or_pr") comment_rows.append( normalize_comment( repo_slug, comment, parent_kind, parent_number, sid, extracted_at, ) ) remaining_comments = remaining_limit( options.max_issue_comments, len(comment_rows), ) if ( options.checkpoint_every_comments and len(comment_rows) >= next_comment_checkpoint ): log(f"Pushing comment checkpoint to Hub at {len(comment_rows)} comments") upload_delta_checkpoint( api=api, repo_id=options.hf_repo_id, work_dir=root, repo_slug=repo_slug, sid=sid, stage="comments", delta_tables={ "issues": issue_rows, "pull_requests": [], "comments": comment_rows, "reviews": [], "review_comments": [], "pr_files": [], "pr_diffs": [], "links": [], "events": [], }, progress={ "stage": "comments", "effective_since": effective_since, "counts": { "issues": len(issue_rows), "comments": len(comment_rows), "pull_requests": 0, "reviews": 0, "review_comments": 0, "pr_files": 0, "pr_diffs": 0, "links": 0, "events": 0, }, }, ) next_comment_checkpoint += options.checkpoint_every_comments if remaining_comments == 0: break pr_rows: list[dict[str, Any]] = [] review_rows: list[dict[str, Any]] = [] review_comment_rows: list[dict[str, Any]] = [] pr_file_rows: list[dict[str, Any]] = [] pr_diff_rows: list[dict[str, Any]] = [] event_rows: list[dict[str, Any]] = [] next_pr_checkpoint = options.checkpoint_every_prs previous_pr_rows_by_number = { int(row["number"]): row for row in previous_tables["pull_requests"] if row.get("number") is not None } previous_review_rows_by_number: defaultdict[int, list[dict[str, Any]]] = defaultdict(list) for row in previous_tables["reviews"]: if row.get("pull_request_number") is not None: previous_review_rows_by_number[int(row["pull_request_number"])].append(row) previous_review_comment_rows_by_number: defaultdict[int, list[dict[str, Any]]] = ( defaultdict(list) ) for row in previous_tables["review_comments"]: if row.get("pull_request_number") is not None: previous_review_comment_rows_by_number[int(row["pull_request_number"])].append(row) previous_pr_file_rows_by_number: defaultdict[int, list[dict[str, Any]]] = defaultdict(list) for row in previous_tables["pr_files"]: if row.get("pull_request_number") is not None: previous_pr_file_rows_by_number[int(row["pull_request_number"])].append(row) previous_pr_diff_rows_by_number = { int(row["pull_request_number"]): row for row in previous_tables["pr_diffs"] if row.get("pull_request_number") is not None } previous_pr_event_rows_by_number: defaultdict[int, list[dict[str, Any]]] = defaultdict(list) for row in previous_tables["events"]: if row.get("parent_kind") == "pull_request" and row.get("parent_number") is not None: previous_pr_event_rows_by_number[int(row["parent_number"])].append(row) hydration_pr_stubs: list[dict[str, Any]] = [] for pr_stub in pr_stubs: number = int(pr_stub["number"]) previous_pr_row = previous_pr_rows_by_number.get(number) if previous_pr_row and previous_pr_row.get("updated_at") == pr_stub.get("updated_at"): pr_rows.append(previous_pr_row) review_rows.extend(previous_review_rows_by_number[number]) review_comment_rows.extend(previous_review_comment_rows_by_number[number]) pr_file_rows.extend(previous_pr_file_rows_by_number[number]) if number in previous_pr_diff_rows_by_number: pr_diff_rows.append(previous_pr_diff_rows_by_number[number]) event_rows.extend(previous_pr_event_rows_by_number[number]) continue hydration_pr_stubs.append(pr_stub) reused_pr_count = len(pr_rows) if reused_pr_count: log(f"Reusing hydrated data for {reused_pr_count} pull requests from prior checkpoints") if options.checkpoint_every_prs: while reused_pr_count >= next_pr_checkpoint: next_pr_checkpoint += options.checkpoint_every_prs total_prs = len(pr_stubs) remaining_prs = len(hydration_pr_stubs) for index, pr_stub in enumerate(hydration_pr_stubs, start=1): number = int(pr_stub["number"]) hydrated_count = reused_pr_count + index if index == 1 or hydrated_count % 10 == 0 or index == remaining_prs: log(f"Hydrating pull requests: {hydrated_count}/{total_prs}") detail = client.get_pull_request(owner, repo_name, number) pr_rows.append(normalize_pull_request(repo_slug, pr_stub, detail, sid, extracted_at)) for review in client.iter_pull_reviews( owner, repo_name, number, options.max_reviews_per_pr ): review_rows.append(normalize_review(repo_slug, number, review, sid, extracted_at)) for comment in client.iter_pull_review_comments( owner, repo_name, number, options.max_review_comments_per_pr, ): review_comment_rows.append( normalize_review_comment(repo_slug, number, comment, sid, extracted_at) ) for pr_file in client.iter_pull_files(owner, repo_name, number): pr_file_rows.append( normalize_pr_file(repo_slug, number, pr_file, sid, extracted_at) ) pr_diff_rows.append( normalize_pr_diff( repo_slug, number, pr_stub.get("html_url"), pr_stub.get("url"), client.get_pull_request_diff(owner, repo_name, number), sid, extracted_at, ) ) if options.fetch_timeline: for event in client.iter_issue_timeline(owner, repo_name, number): event_rows.append( normalize_timeline_event( repo_slug, number, "pull_request", event, sid, extracted_at, ) ) if options.checkpoint_every_prs and len(pr_rows) >= next_pr_checkpoint: log(f"Pushing PR checkpoint to Hub at {len(pr_rows)} hydrated PRs") upload_delta_checkpoint( api=api, repo_id=options.hf_repo_id, work_dir=root, repo_slug=repo_slug, sid=sid, stage="pull_requests", delta_tables={ "issues": issue_rows, "pull_requests": pr_rows, "comments": comment_rows, "reviews": review_rows, "review_comments": review_comment_rows, "pr_files": pr_file_rows, "pr_diffs": pr_diff_rows, "links": [], "events": event_rows, }, progress={ "stage": "pull_requests", "effective_since": effective_since, "counts": { "issues": len(issue_rows), "comments": len(comment_rows), "pull_requests": len(pr_rows), "reviews": len(review_rows), "review_comments": len(review_comment_rows), "pr_files": len(pr_file_rows), "pr_diffs": len(pr_diff_rows), "links": 0, "events": len(event_rows), }, }, ) next_pr_checkpoint += options.checkpoint_every_prs if options.fetch_timeline: log(f"Fetching issue timelines for {len(issues)} changed issues") for issue in issues: for event in client.iter_issue_timeline(owner, repo_name, int(issue["number"])): event_rows.append( normalize_timeline_event( repo_slug, int(issue["number"]), "issue", event, sid, extracted_at, ) ) link_rows: list[dict[str, Any]] = [] for row in issue_rows: link_rows.extend( build_text_link_rows( repo=repo_slug, owner=owner, repo_name=repo_name, source_type="issue", source_number=row["number"], source_id=row["github_id"], body=row["body"], snapshot_id=sid, extracted_at=extracted_at, ) ) for row in pr_rows: link_rows.extend( build_text_link_rows( repo=repo_slug, owner=owner, repo_name=repo_name, source_type="pull_request", source_number=row["number"], source_id=row["github_id"], body=row["body"], snapshot_id=sid, extracted_at=extracted_at, ) ) for row in comment_rows or previous_tables["comments"]: if row["parent_number"] is None: continue link_rows.extend( build_text_link_rows( repo=repo_slug, owner=owner, repo_name=repo_name, source_type="comment", source_number=row["parent_number"], source_id=row["github_id"], body=row["body"], snapshot_id=sid, extracted_at=extracted_at, ) ) for row in review_rows: link_rows.extend( build_text_link_rows( repo=repo_slug, owner=owner, repo_name=repo_name, source_type="review", source_number=row["pull_request_number"], source_id=row["github_id"], body=row["body"], snapshot_id=sid, extracted_at=extracted_at, ) ) for row in review_comment_rows: link_rows.extend( build_text_link_rows( repo=repo_slug, owner=owner, repo_name=repo_name, source_type="review_comment", source_number=row["pull_request_number"], source_id=row["github_id"], body=row["body"], snapshot_id=sid, extracted_at=extracted_at, ) ) link_rows.extend( build_pr_duplicate_candidate_rows( repo=repo_slug, pull_requests=pr_rows, link_rows=link_rows, snapshot_id=sid, extracted_at=extracted_at, ) ) for event in event_rows: if event.get("source_issue_number"): link_rows.append( { "repo": repo_slug, "source_type": event["parent_kind"], "source_number": event["parent_number"], "source_github_id": None, "target_owner": owner, "target_repo": repo_name, "target_number": event["source_issue_number"], "link_type": f"timeline:{event['event']}", "link_origin": "timeline", "snapshot_id": sid, "extracted_at": extracted_at, } ) delta_tables = { "issues": issue_rows, "pull_requests": pr_rows, "comments": comment_rows, "reviews": review_rows, "review_comments": review_comment_rows, "pr_files": pr_file_rows, "pr_diffs": pr_diff_rows, "links": link_rows, "events": event_rows, } if any(delta_tables.values()): log("Pushing final delta checkpoint to Hub before merge upload") upload_delta_checkpoint( api=api, repo_id=options.hf_repo_id, work_dir=root, repo_slug=repo_slug, sid=sid, stage="final-delta", delta_tables=delta_tables, progress={ "stage": "final-delta", "effective_since": effective_since, "counts": {name: len(rows) for name, rows in delta_tables.items()}, }, ) final_tables = { table_name: merge_rows(table_name, previous_tables[table_name], delta_rows) for table_name, delta_rows in delta_tables.items() } manifest: dict[str, Any] = { "repo": repo_slug, "snapshot_id": sid, "crawl_started_at": crawl_started_at, "extracted_at": extracted_at, "watermark": { "effective_since": effective_since, "next_since": crawl_started_at, "previous_snapshot_dir": ( str(previous_snapshot_dir) if previous_snapshot_dir is not None else None ), }, "delta_counts": { "issue_stubs": len(issue_stubs), "issues": len(issue_rows), "pull_requests": len(pr_rows), "comments": len(comment_rows), "reviews": len(review_rows), "review_comments": len(review_comment_rows), "pr_files": len(pr_file_rows), "pr_diffs": len(pr_diff_rows), "timeline_events": len(event_rows), "links": len(link_rows), }, "counts": { "issues": len(final_tables["issues"]), "pull_requests": len(final_tables["pull_requests"]), "comments": len(final_tables["comments"]), "reviews": len(final_tables["reviews"]), "review_comments": len(final_tables["review_comments"]), "pr_files": len(final_tables["pr_files"]), "pr_diffs": len(final_tables["pr_diffs"]), "timeline_events": len(final_tables["events"]), "links": len(final_tables["links"]), }, } log("Writing updated dataset files") for table_name, rows in final_tables.items(): write_parquet(rows, output_root / f"{table_name}.parquet", table_name) issue_comment_rows, pr_comment_rows = viewer_comment_rows( final_tables["comments"], final_tables["pull_requests"], ) write_parquet(issue_comment_rows, output_root / "issue_comments.parquet", "comments") write_parquet(pr_comment_rows, output_root / "pr_comments.parquet", "comments") archived_snapshot_dir = output_root / "snapshots" / sid archived_snapshot_dir.mkdir(parents=True, exist_ok=True) write_json(manifest, output_root / "manifest.json") log("Generating PR scope clusters") pr_scope_path = run_pr_scope_report( PrScopeOptions( snapshot_dir=output_root, output_dir=output_root, output=output_root / "pr-scope-clusters.json", hf_repo_id=None, hf_revision=None, hf_materialize_dir=None, cluster_suppression_rules=options.cluster_suppression_rules, ) ) shutil.copy2(pr_scope_path, archived_snapshot_dir / pr_scope_path.name) artifacts: dict[str, str] = { "pr_scope_clusters_json": pr_scope_path.name, "archived_pr_scope_clusters_json": f"snapshots/{sid}/{pr_scope_path.name}", } if options.new_contributor_report: log("Generating new contributor dataset/report artifacts") run_new_contributor_report( NewContributorReportOptions( snapshot_dir=output_root, output_dir=output_root, output=None, json_output=None, hf_repo_id=None, hf_revision=None, hf_materialize_dir=None, window_days=options.new_contributor_window_days, max_authors=options.new_contributor_max_authors, ) ) manifest["counts"]["new_contributors"] = len( read_parquet_rows(output_root / "new_contributors.parquet") ) artifacts.update( { "new_contributors_parquet": "new_contributors.parquet", "new_contributors_json": "new-contributors-report.json", "new_contributors_markdown": "new-contributors-report.md", } ) manifest["artifacts"] = artifacts manifest["watermark"].pop("previous_snapshot_dir", None) write_json(manifest, output_root / "manifest.json") write_text( build_hf_dataset_card( repo_slug, sid, include_new_contributors=options.new_contributor_report, ), output_root / "README.md", ) write_json( { "repo": repo_slug, "last_successful_snapshot_id": sid, "effective_since": effective_since, "next_since": crawl_started_at, "updated_at": extracted_at, }, output_root / "state" / "watermark.json", ) write_json(manifest, archived_snapshot_dir / "manifest.json") write_json( { "repo": repo_slug, "latest_snapshot_id": sid, "snapshot_dir": f"snapshots/{sid}", "manifest_path": "manifest.json", "archived_manifest_path": f"snapshots/{sid}/manifest.json", "next_since": crawl_started_at, }, output_root / "snapshots" / "latest.json", ) log("Uploading updated dataset to the Hub") api.upload_folder( folder_path=str(output_root), repo_id=options.hf_repo_id, repo_type="dataset", commit_message=f"Refresh {repo_name} dataset snapshot {sid}", ) log(f"Dataset refresh complete for {options.hf_repo_id}") return { "repo": repo_slug, "dataset_id": options.hf_repo_id, "snapshot_id": sid, "effective_since": effective_since, "counts": manifest["counts"], } def main(argv: list[str] | None = None) -> None: args = parse_args(argv) result = run_dataset_refresh( DatasetRefreshOptions( repo=RepoRef.parse(args.repo), hf_repo_id=args.hf_repo_id, private_hf_repo=args.private_hf_repo, max_issues=args.max_issues, max_prs=args.max_prs, max_issue_comments=args.max_issue_comments, max_reviews_per_pr=args.max_reviews_per_pr, max_review_comments_per_pr=args.max_review_comments_per_pr, fetch_timeline=args.fetch_timeline, new_contributor_report=args.new_contributor_report, new_contributor_window_days=args.new_contributor_window_days, new_contributor_max_authors=args.new_contributor_max_authors, http_timeout=args.http_timeout, http_max_retries=args.http_max_retries, checkpoint_every_comments=args.checkpoint_every_comments, checkpoint_every_prs=args.checkpoint_every_prs, cluster_suppression_rules=tuple(args.cluster_suppression_rules), ) ) print(json.dumps(result, indent=2)) if __name__ == "__main__": main()