Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| import shutil | |
| from collections.abc import Iterable | |
| from datetime import UTC, datetime, timedelta | |
| from pathlib import Path | |
| from typing import Any, Protocol | |
| from slop_farmer.config import NewContributorReportOptions, PipelineOptions, resolve_github_token | |
| from slop_farmer.data.dataset_card import build_hf_dataset_card | |
| from slop_farmer.data.github_api import GitHubClient | |
| from slop_farmer.data.links import build_pr_duplicate_candidate_rows, build_text_link_rows | |
| from slop_farmer.data.normalize import ( | |
| issue_url_to_number, | |
| normalize_comment, | |
| normalize_issue, | |
| normalize_pr_diff, | |
| normalize_pr_file, | |
| normalize_pull_request, | |
| normalize_review, | |
| normalize_review_comment, | |
| normalize_timeline_event, | |
| ) | |
| from slop_farmer.data.parquet_io import ( | |
| read_json, | |
| read_parquet_rows, | |
| write_json, | |
| write_parquet, | |
| write_text, | |
| ) | |
| from slop_farmer.reports.new_contributor_report import run_new_contributor_report | |
| # Navigation: | |
| # - protocol + small time/log/view helpers | |
| # - checkpoint/state helpers for resumable crawls | |
| # - incremental merge helpers | |
| # - run_pipeline(): fetch -> hydrate -> derive links -> merge/write -> publish | |
| class GitHubClientLike(Protocol): | |
| def iter_repo_issues( | |
| self, owner: str, repo: str, since: str | None, limit: int | None | |
| ) -> Iterable[dict[str, Any]]: ... | |
| def iter_issue_comments_for_number( | |
| self, owner: str, repo: str, number: int, since: str | None, limit: int | None = None | |
| ) -> Iterable[dict[str, Any]]: ... | |
| def get_pull_request(self, owner: str, repo: str, number: int) -> dict[str, Any]: ... | |
| def iter_pull_reviews( | |
| self, owner: str, repo: str, number: int, limit: int | None = None | |
| ) -> Iterable[dict[str, Any]]: ... | |
| def iter_pull_review_comments( | |
| self, owner: str, repo: str, number: int, limit: int | None = None | |
| ) -> Iterable[dict[str, Any]]: ... | |
| def iter_pull_files( | |
| self, owner: str, repo: str, number: int, limit: int | None = None | |
| ) -> Iterable[dict[str, Any]]: ... | |
| def get_pull_request_diff(self, owner: str, repo: str, number: int) -> str: ... | |
| def iter_issue_timeline( | |
| self, owner: str, repo: str, number: int, limit: int | None = None | |
| ) -> Iterable[dict[str, Any]]: ... | |
| def _iso_now() -> str: | |
| return datetime.now(tz=UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z") | |
| def _snapshot_id() -> str: | |
| return datetime.now(tz=UTC).strftime("%Y%m%dT%H%M%SZ") | |
| def _log(message: str) -> None: | |
| stamp = datetime.now(tz=UTC).strftime("%H:%M:%SZ") | |
| print(f"[{stamp}] {message}", flush=True) | |
| def _remaining_limit(limit: int | None, used: int) -> int | None: | |
| if limit is None: | |
| return None | |
| return max(limit - used, 0) | |
| def _created_after_cutoff( | |
| item: dict[str, Any], max_age_days: int | None, reference_time: datetime | |
| ) -> bool: | |
| if max_age_days is None: | |
| return True | |
| created_at = item.get("created_at") | |
| if not created_at: | |
| return False | |
| try: | |
| created_dt = datetime.fromisoformat(str(created_at).replace("Z", "+00:00")) | |
| except ValueError: | |
| return False | |
| return created_dt >= reference_time - timedelta(days=max_age_days) | |
| def _reference_time_for_age_caps(crawl_started_at: str) -> datetime: | |
| try: | |
| return datetime.fromisoformat(crawl_started_at.replace("Z", "+00:00")) | |
| except ValueError: | |
| return datetime.now(tz=UTC) | |
| def _dataset_card( | |
| repo: str, snapshot_id: str, manifest: dict[str, Any], *, include_new_contributors: bool = False | |
| ) -> str: | |
| notes = ["new contributor reviewer artifacts are included"] if include_new_contributors else [] | |
| del manifest | |
| return build_hf_dataset_card( | |
| repo, | |
| snapshot_id, | |
| include_new_contributors=include_new_contributors, | |
| notes=notes, | |
| ) | |
| def _viewer_comment_rows( | |
| comments: list[dict[str, Any]], | |
| pull_requests: list[dict[str, Any]], | |
| ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: | |
| pr_numbers = {int(row["number"]) for row in pull_requests if row.get("number") is not None} | |
| issue_comments: list[dict[str, Any]] = [] | |
| pr_comments: list[dict[str, Any]] = [] | |
| for row in comments: | |
| parent_number = row.get("parent_number") | |
| parent_kind = row.get("parent_kind") | |
| if parent_kind == "pull_request" or parent_number in pr_numbers: | |
| pr_comments.append(row) | |
| else: | |
| issue_comments.append(row) | |
| return issue_comments, pr_comments | |
| PRIMARY_KEYS: dict[str, tuple[str, ...]] = { | |
| "issues": ("github_id",), | |
| "pull_requests": ("github_id",), | |
| "comments": ("github_id",), | |
| "reviews": ("github_id",), | |
| "review_comments": ("github_id",), | |
| "pr_files": ("repo", "pull_request_number", "filename"), | |
| "pr_diffs": ("repo", "pull_request_number"), | |
| "links": ( | |
| "repo", | |
| "source_type", | |
| "source_number", | |
| "source_github_id", | |
| "target_owner", | |
| "target_repo", | |
| "target_number", | |
| "link_type", | |
| "link_origin", | |
| ), | |
| "events": ( | |
| "repo", | |
| "parent_kind", | |
| "parent_number", | |
| "event", | |
| "created_at", | |
| "actor_login", | |
| "source_issue_number", | |
| "source_issue_url", | |
| "commit_id", | |
| "label_name", | |
| ), | |
| } | |
| CHECKPOINT_VERSION = 1 | |
| CHECKPOINT_PR_INTERVAL = 5 | |
| CHECKPOINT_ISSUE_TIMELINE_INTERVAL = 25 | |
| CHECKPOINT_TABLE_NAMES = ( | |
| "issues", | |
| "comments", | |
| "pull_requests", | |
| "reviews", | |
| "review_comments", | |
| "pr_files", | |
| "pr_diffs", | |
| "events", | |
| ) | |
| # Checkpoint/state helpers | |
| def _state_dir(output_dir: Path) -> Path: | |
| return output_dir / "state" | |
| def _in_progress_path(output_dir: Path) -> Path: | |
| return _state_dir(output_dir) / "in_progress.json" | |
| def _watermark_path(output_dir: Path) -> Path: | |
| return _state_dir(output_dir) / "watermark.json" | |
| def _latest_snapshot_pointer_path(output_dir: Path) -> Path: | |
| return output_dir / "snapshots" / "latest.json" | |
| def _checkpoint_dir(snapshot_dir: Path) -> Path: | |
| return snapshot_dir / "_checkpoint" | |
| def _checkpoint_progress_path(snapshot_dir: Path) -> Path: | |
| return _checkpoint_dir(snapshot_dir) / "progress.json" | |
| def _checkpoint_table_path(snapshot_dir: Path, table_name: str) -> Path: | |
| return _checkpoint_dir(snapshot_dir) / f"{table_name}.parquet" | |
| def _checkpoint_options(options: PipelineOptions) -> dict[str, Any]: | |
| return { | |
| "max_issues": options.max_issues, | |
| "max_prs": options.max_prs, | |
| "max_issue_comments": options.max_issue_comments, | |
| "max_reviews_per_pr": options.max_reviews_per_pr, | |
| "max_review_comments_per_pr": options.max_review_comments_per_pr, | |
| "issue_max_age_days": options.issue_max_age_days, | |
| "pr_max_age_days": options.pr_max_age_days, | |
| "fetch_timeline": options.fetch_timeline, | |
| } | |
| def _load_in_progress_checkpoint(options: PipelineOptions, repo_slug: str) -> dict[str, Any] | None: | |
| if not options.resume or options.since: | |
| return None | |
| path = _in_progress_path(options.output_dir) | |
| if not path.exists(): | |
| return None | |
| payload = read_json(path) | |
| if not isinstance(payload, dict): | |
| return None | |
| if payload.get("version") != CHECKPOINT_VERSION or payload.get("repo") != repo_slug: | |
| return None | |
| if payload.get("options") != _checkpoint_options(options): | |
| _log(f"Ignoring incompatible in-progress checkpoint: {path}") | |
| return None | |
| snapshot_dir_raw = payload.get("snapshot_dir") | |
| if not isinstance(snapshot_dir_raw, str) or not snapshot_dir_raw: | |
| return None | |
| payload["snapshot_dir"] = Path(snapshot_dir_raw) | |
| previous_snapshot_dir_raw = payload.get("previous_snapshot_dir") | |
| payload["previous_snapshot_dir"] = ( | |
| Path(previous_snapshot_dir_raw) | |
| if isinstance(previous_snapshot_dir_raw, str) and previous_snapshot_dir_raw | |
| else None | |
| ) | |
| return payload | |
| def _load_checkpoint_rows(snapshot_dir: Path, table_name: str) -> list[dict[str, Any]]: | |
| return read_parquet_rows(_checkpoint_table_path(snapshot_dir, table_name)) | |
| def _write_checkpoint( | |
| *, | |
| options: PipelineOptions, | |
| repo_slug: str, | |
| snapshot_id: str, | |
| snapshot_dir: Path, | |
| effective_since: str | None, | |
| crawl_started_at: str, | |
| extracted_at: str, | |
| previous_snapshot_dir: Path | None, | |
| merge_with_previous: bool, | |
| phase: str, | |
| comments_done: bool, | |
| issue_rows: list[dict[str, Any]], | |
| comment_rows: list[dict[str, Any]], | |
| pr_rows: list[dict[str, Any]], | |
| review_rows: list[dict[str, Any]], | |
| review_comment_rows: list[dict[str, Any]], | |
| pr_file_rows: list[dict[str, Any]], | |
| pr_diff_rows: list[dict[str, Any]], | |
| timeline_rows: list[dict[str, Any]], | |
| completed_issue_timeline_numbers: set[int], | |
| ) -> None: | |
| checkpoint_tables = { | |
| "issues": issue_rows, | |
| "comments": comment_rows, | |
| "pull_requests": pr_rows, | |
| "reviews": review_rows, | |
| "review_comments": review_comment_rows, | |
| "pr_files": pr_file_rows, | |
| "pr_diffs": pr_diff_rows, | |
| "events": timeline_rows, | |
| } | |
| for table_name, rows in checkpoint_tables.items(): | |
| write_parquet(rows, _checkpoint_table_path(snapshot_dir, table_name), table_name) | |
| progress = { | |
| "version": CHECKPOINT_VERSION, | |
| "repo": repo_slug, | |
| "snapshot_id": snapshot_id, | |
| "snapshot_dir": str(snapshot_dir), | |
| "effective_since": effective_since, | |
| "crawl_started_at": crawl_started_at, | |
| "extracted_at": extracted_at, | |
| "phase": phase, | |
| "comments_done": comments_done, | |
| "merge_with_previous": merge_with_previous, | |
| "previous_snapshot_dir": str(previous_snapshot_dir) if previous_snapshot_dir else None, | |
| "completed_pr_numbers": sorted( | |
| int(row["number"]) for row in pr_rows if row.get("number") is not None | |
| ), | |
| "completed_issue_timeline_numbers": sorted(completed_issue_timeline_numbers), | |
| "options": _checkpoint_options(options), | |
| "counts": {table_name: len(rows) for table_name, rows in checkpoint_tables.items()}, | |
| } | |
| write_json(progress, _checkpoint_progress_path(snapshot_dir)) | |
| write_json(progress, _in_progress_path(options.output_dir)) | |
| def _clear_checkpoint(output_dir: Path, snapshot_dir: Path) -> None: | |
| checkpoint_state = _in_progress_path(output_dir) | |
| if checkpoint_state.exists(): | |
| checkpoint_state.unlink() | |
| checkpoint_dir = _checkpoint_dir(snapshot_dir) | |
| if checkpoint_dir.exists(): | |
| shutil.rmtree(checkpoint_dir) | |
| def _load_watermark(output_dir: Path) -> dict[str, Any] | None: | |
| path = _watermark_path(output_dir) | |
| if not path.exists(): | |
| return None | |
| return read_json(path) | |
| def _load_latest_snapshot_pointer(output_dir: Path) -> dict[str, Any] | None: | |
| path = _latest_snapshot_pointer_path(output_dir) | |
| if not path.exists(): | |
| return None | |
| return read_json(path) | |
| def _resolve_effective_since( | |
| options: PipelineOptions, repo_slug: str | |
| ) -> tuple[str | None, dict[str, Any] | None]: | |
| if options.since: | |
| return options.since, None | |
| if not options.resume: | |
| return None, None | |
| watermark = _load_watermark(options.output_dir) | |
| if not watermark or watermark.get("repo") != repo_slug: | |
| return None, watermark | |
| return watermark.get("next_since"), watermark | |
| def _previous_snapshot_dir(output_dir: Path, repo_slug: str) -> Path | None: | |
| latest = _load_latest_snapshot_pointer(output_dir) | |
| if not latest: | |
| return None | |
| latest_repo = latest.get("repo") | |
| if latest_repo and latest_repo != repo_slug: | |
| return None | |
| snapshot_dir = latest.get("snapshot_dir") | |
| if not snapshot_dir: | |
| return None | |
| path = Path(snapshot_dir) | |
| return path if path.exists() else None | |
| # Incremental merge helpers | |
| def _row_key(row: dict[str, Any], key_fields: tuple[str, ...]) -> str: | |
| return json.dumps([row.get(field) for field in key_fields], sort_keys=False, default=str) | |
| def _merge_rows( | |
| table_name: str, previous_rows: list[dict[str, Any]], delta_rows: list[dict[str, Any]] | |
| ) -> list[dict[str, Any]]: | |
| key_fields = PRIMARY_KEYS[table_name] | |
| if table_name == "pr_files": | |
| refreshed_prs = { | |
| (row.get("repo"), row.get("pull_request_number")) | |
| for row in delta_rows | |
| if row.get("pull_request_number") is not None | |
| } | |
| previous_rows = [ | |
| row | |
| for row in previous_rows | |
| if (row.get("repo"), row.get("pull_request_number")) not in refreshed_prs | |
| ] | |
| merged: dict[str, dict[str, Any]] = {} | |
| for row in previous_rows: | |
| merged[_row_key(row, key_fields)] = row | |
| for row in delta_rows: | |
| merged[_row_key(row, key_fields)] = row | |
| rows = list(merged.values()) | |
| sort_fields = [ | |
| field | |
| for field in ("number", "pull_request_number", "parent_number", "github_id", "created_at") | |
| if rows and field in rows[0] | |
| ] | |
| if sort_fields: | |
| rows.sort( | |
| key=lambda row: tuple( | |
| "" if row.get(field) is None else str(row.get(field)) for field in sort_fields | |
| ) | |
| ) | |
| return rows | |
| def _load_previous_rows(snapshot_dir: Path | None, table_name: str) -> list[dict[str, Any]]: | |
| if snapshot_dir is None: | |
| return [] | |
| return read_parquet_rows(snapshot_dir / f"{table_name}.parquet") | |
| # Pipeline orchestration | |
| def run_pipeline(options: PipelineOptions, client: GitHubClientLike | None = None) -> Path: | |
| # Resume or initialize one snapshot run. | |
| repo_slug = options.repo.slug | |
| checkpoint = _load_in_progress_checkpoint(options, repo_slug) | |
| watermark: dict[str, Any] | None = None | |
| if checkpoint: | |
| effective_since = checkpoint.get("effective_since") | |
| previous_snapshot_dir = checkpoint.get("previous_snapshot_dir") | |
| merge_with_previous = bool(checkpoint.get("merge_with_previous")) | |
| crawl_started_at = str(checkpoint.get("crawl_started_at") or _iso_now()) | |
| snapshot_id = str(checkpoint.get("snapshot_id") or _snapshot_id()) | |
| extracted_at = str(checkpoint.get("extracted_at") or _iso_now()) | |
| snapshot_dir = checkpoint["snapshot_dir"] | |
| else: | |
| effective_since, watermark = _resolve_effective_since(options, repo_slug) | |
| previous_snapshot_dir = _previous_snapshot_dir(options.output_dir, repo_slug) | |
| merge_with_previous = previous_snapshot_dir is not None and effective_since is not None | |
| crawl_started_at = _iso_now() | |
| snapshot_id = _snapshot_id() | |
| extracted_at = _iso_now() | |
| snapshot_dir = options.output_dir / "snapshots" / snapshot_id | |
| snapshot_dir.mkdir(parents=True, exist_ok=True) | |
| if client is None: | |
| token = resolve_github_token() | |
| client = GitHubClient( | |
| token=token, | |
| timeout=options.http_timeout, | |
| max_retries=options.http_max_retries, | |
| log=_log, | |
| ) | |
| if checkpoint: | |
| _log(f"Resuming snapshot {snapshot_id} for {repo_slug}") | |
| _log(f"Recovered in-progress checkpoint: {_in_progress_path(options.output_dir)}") | |
| else: | |
| _log(f"Starting snapshot {snapshot_id} for {repo_slug}") | |
| _log(f"Output directory: {snapshot_dir}") | |
| if options.since: | |
| _log(f"Using explicit since watermark: {effective_since}") | |
| elif checkpoint: | |
| _log(f"Resuming in-progress crawl window from {effective_since}") | |
| elif effective_since: | |
| source_snapshot = watermark.get("last_successful_snapshot_id") if watermark else None | |
| _log(f"Resuming from local watermark {effective_since} from snapshot {source_snapshot}") | |
| else: | |
| _log("No watermark active; running full snapshot") | |
| if merge_with_previous: | |
| _log(f"Merging delta into previous snapshot: {previous_snapshot_dir}") | |
| # Load any checkpointed tables before resuming remote work. | |
| issue_rows = _load_checkpoint_rows(snapshot_dir, "issues") if checkpoint else [] | |
| comment_rows = _load_checkpoint_rows(snapshot_dir, "comments") if checkpoint else [] | |
| pr_rows = _load_checkpoint_rows(snapshot_dir, "pull_requests") if checkpoint else [] | |
| review_rows = _load_checkpoint_rows(snapshot_dir, "reviews") if checkpoint else [] | |
| review_comment_rows = ( | |
| _load_checkpoint_rows(snapshot_dir, "review_comments") if checkpoint else [] | |
| ) | |
| pr_file_rows = _load_checkpoint_rows(snapshot_dir, "pr_files") if checkpoint else [] | |
| pr_diff_rows = _load_checkpoint_rows(snapshot_dir, "pr_diffs") if checkpoint else [] | |
| timeline_rows = _load_checkpoint_rows(snapshot_dir, "events") if checkpoint else [] | |
| completed_issue_timeline_numbers = { | |
| int(number) | |
| for number in (checkpoint or {}).get("completed_issue_timeline_numbers", []) | |
| if number is not None | |
| } | |
| comments_done = bool((checkpoint or {}).get("comments_done")) | |
| if not checkpoint: | |
| _write_checkpoint( | |
| options=options, | |
| repo_slug=repo_slug, | |
| snapshot_id=snapshot_id, | |
| snapshot_dir=snapshot_dir, | |
| effective_since=effective_since, | |
| crawl_started_at=crawl_started_at, | |
| extracted_at=extracted_at, | |
| previous_snapshot_dir=previous_snapshot_dir, | |
| merge_with_previous=merge_with_previous, | |
| phase="starting", | |
| comments_done=False, | |
| issue_rows=issue_rows, | |
| comment_rows=comment_rows, | |
| pr_rows=pr_rows, | |
| review_rows=review_rows, | |
| review_comment_rows=review_comment_rows, | |
| pr_file_rows=pr_file_rows, | |
| pr_diff_rows=pr_diff_rows, | |
| timeline_rows=timeline_rows, | |
| completed_issue_timeline_numbers=completed_issue_timeline_numbers, | |
| ) | |
| # Fetch lightweight issue/PR stubs and top-level discussion comments first. | |
| _log("Fetching issue and pull request stubs from GitHub") | |
| issue_stubs = list( | |
| client.iter_repo_issues( | |
| options.repo.owner, | |
| options.repo.name, | |
| since=effective_since, | |
| limit=options.max_issues, | |
| ) | |
| ) | |
| reference_time = _reference_time_for_age_caps(crawl_started_at) | |
| issues = [ | |
| item | |
| for item in issue_stubs | |
| if "pull_request" not in item | |
| and _created_after_cutoff(item, options.issue_max_age_days, reference_time) | |
| ] | |
| pr_stubs = [ | |
| item | |
| for item in issue_stubs | |
| if "pull_request" in item | |
| and _created_after_cutoff(item, options.pr_max_age_days, reference_time) | |
| ] | |
| if options.max_prs is not None: | |
| pr_stubs = pr_stubs[: options.max_prs] | |
| _log( | |
| f"Fetched {len(issue_stubs)} stubs total: {len(issues)} issues and {len(pr_stubs)} pull requests selected" | |
| ) | |
| if options.issue_max_age_days is not None: | |
| _log(f"Issue import age cap: last {options.issue_max_age_days} days by created_at") | |
| if options.pr_max_age_days is not None: | |
| _log(f"PR import age cap: last {options.pr_max_age_days} days by created_at") | |
| issue_number_to_kind = { | |
| item["number"]: ("pull_request" if "pull_request" in item else "issue") | |
| for item in issue_stubs | |
| } | |
| issue_rows = [normalize_issue(repo_slug, item, snapshot_id, extracted_at) for item in issues] | |
| if comments_done: | |
| _log(f"Reusing {len(comment_rows)} checkpointed discussion comments") | |
| else: | |
| comment_rows = [] | |
| comment_threads_seen = 0 | |
| for item in issue_stubs: | |
| if not item.get("comments"): | |
| continue | |
| remaining = _remaining_limit(options.max_issue_comments, len(comment_rows)) | |
| if remaining == 0: | |
| break | |
| comment_threads_seen += 1 | |
| if comment_threads_seen == 1 or comment_threads_seen % 25 == 0: | |
| _log( | |
| f"Collecting discussion comments: {len(comment_rows)} gathered so far across {comment_threads_seen} threads" | |
| ) | |
| for comment in client.iter_issue_comments_for_number( | |
| options.repo.owner, | |
| options.repo.name, | |
| int(item["number"]), | |
| since=effective_since, | |
| limit=remaining, | |
| ): | |
| parent_number = issue_url_to_number(comment.get("issue_url")) | |
| kind = issue_number_to_kind.get(parent_number, "issue_or_pr") | |
| comment_rows.append( | |
| normalize_comment( | |
| repo_slug, comment, kind, parent_number, snapshot_id, extracted_at | |
| ) | |
| ) | |
| remaining = _remaining_limit(options.max_issue_comments, len(comment_rows)) | |
| if remaining == 0: | |
| break | |
| comments_done = True | |
| _write_checkpoint( | |
| options=options, | |
| repo_slug=repo_slug, | |
| snapshot_id=snapshot_id, | |
| snapshot_dir=snapshot_dir, | |
| effective_since=effective_since, | |
| crawl_started_at=crawl_started_at, | |
| extracted_at=extracted_at, | |
| previous_snapshot_dir=previous_snapshot_dir, | |
| merge_with_previous=merge_with_previous, | |
| phase="comments_complete", | |
| comments_done=comments_done, | |
| issue_rows=issue_rows, | |
| comment_rows=comment_rows, | |
| pr_rows=pr_rows, | |
| review_rows=review_rows, | |
| review_comment_rows=review_comment_rows, | |
| pr_file_rows=pr_file_rows, | |
| pr_diff_rows=pr_diff_rows, | |
| timeline_rows=timeline_rows, | |
| completed_issue_timeline_numbers=completed_issue_timeline_numbers, | |
| ) | |
| _log(f"Collected {len(comment_rows)} discussion comments") | |
| # Hydrate PR-owned detail tables: reviews, review comments, files, diffs, timelines. | |
| completed_pr_numbers = {int(row["number"]) for row in pr_rows if row.get("number") is not None} | |
| if completed_pr_numbers: | |
| _log(f"Reusing hydrated data for {len(completed_pr_numbers)} pull requests from checkpoint") | |
| total_prs = len(pr_stubs) | |
| for pr_stub in pr_stubs: | |
| number = int(pr_stub["number"]) | |
| if number in completed_pr_numbers: | |
| continue | |
| current_pr_index = len(completed_pr_numbers) + 1 | |
| if current_pr_index == 1 or current_pr_index % 10 == 0 or current_pr_index == total_prs: | |
| _log(f"Hydrating pull requests: {current_pr_index}/{total_prs} (current #{number})") | |
| pr_detail = client.get_pull_request(options.repo.owner, options.repo.name, number) | |
| pr_rows.append( | |
| normalize_pull_request(repo_slug, pr_stub, pr_detail, snapshot_id, extracted_at) | |
| ) | |
| for review in client.iter_pull_reviews( | |
| options.repo.owner, | |
| options.repo.name, | |
| number, | |
| limit=options.max_reviews_per_pr, | |
| ): | |
| review_rows.append( | |
| normalize_review(repo_slug, number, review, snapshot_id, extracted_at) | |
| ) | |
| for review_comment in client.iter_pull_review_comments( | |
| options.repo.owner, | |
| options.repo.name, | |
| number, | |
| limit=options.max_review_comments_per_pr, | |
| ): | |
| review_comment_rows.append( | |
| normalize_review_comment( | |
| repo_slug, number, review_comment, snapshot_id, extracted_at | |
| ) | |
| ) | |
| for pr_file in client.iter_pull_files(options.repo.owner, options.repo.name, number): | |
| pr_file_rows.append( | |
| normalize_pr_file(repo_slug, number, pr_file, snapshot_id, extracted_at) | |
| ) | |
| pr_diff_rows.append( | |
| normalize_pr_diff( | |
| repo_slug, | |
| number, | |
| pr_stub.get("html_url"), | |
| pr_stub.get("url"), | |
| client.get_pull_request_diff(options.repo.owner, options.repo.name, number), | |
| snapshot_id, | |
| extracted_at, | |
| ) | |
| ) | |
| if options.fetch_timeline: | |
| for event in client.iter_issue_timeline(options.repo.owner, options.repo.name, number): | |
| timeline_rows.append( | |
| normalize_timeline_event( | |
| repo_slug, number, "pull_request", event, snapshot_id, extracted_at | |
| ) | |
| ) | |
| completed_pr_numbers.add(number) | |
| if ( | |
| len(completed_pr_numbers) % CHECKPOINT_PR_INTERVAL == 0 | |
| or len(completed_pr_numbers) == total_prs | |
| ): | |
| _log(f"Checkpointing pull request hydration at {len(completed_pr_numbers)}/{total_prs}") | |
| _write_checkpoint( | |
| options=options, | |
| repo_slug=repo_slug, | |
| snapshot_id=snapshot_id, | |
| snapshot_dir=snapshot_dir, | |
| effective_since=effective_since, | |
| crawl_started_at=crawl_started_at, | |
| extracted_at=extracted_at, | |
| previous_snapshot_dir=previous_snapshot_dir, | |
| merge_with_previous=merge_with_previous, | |
| phase="hydrating_pull_requests", | |
| comments_done=comments_done, | |
| issue_rows=issue_rows, | |
| comment_rows=comment_rows, | |
| pr_rows=pr_rows, | |
| review_rows=review_rows, | |
| review_comment_rows=review_comment_rows, | |
| pr_file_rows=pr_file_rows, | |
| pr_diff_rows=pr_diff_rows, | |
| timeline_rows=timeline_rows, | |
| completed_issue_timeline_numbers=completed_issue_timeline_numbers, | |
| ) | |
| _log( | |
| f"Hydrated {len(pr_rows)} pull requests, {len(review_rows)} reviews, " | |
| f"{len(review_comment_rows)} review comments, {len(pr_file_rows)} PR files, " | |
| f"and {len(pr_diff_rows)} PR diffs" | |
| ) | |
| # Fetch issue timelines after PR hydration so checkpoints can resume either phase. | |
| if options.fetch_timeline: | |
| _log(f"Fetching timeline events for {len(issues)} issues") | |
| if completed_issue_timeline_numbers: | |
| _log(f"Reusing timeline checkpoints for {len(completed_issue_timeline_numbers)} issues") | |
| for issue in issues: | |
| number = int(issue["number"]) | |
| if number in completed_issue_timeline_numbers: | |
| continue | |
| for event in client.iter_issue_timeline(options.repo.owner, options.repo.name, number): | |
| timeline_rows.append( | |
| normalize_timeline_event( | |
| repo_slug, number, "issue", event, snapshot_id, extracted_at | |
| ) | |
| ) | |
| completed_issue_timeline_numbers.add(number) | |
| if len( | |
| completed_issue_timeline_numbers | |
| ) % CHECKPOINT_ISSUE_TIMELINE_INTERVAL == 0 or len( | |
| completed_issue_timeline_numbers | |
| ) == len(issues): | |
| _log( | |
| f"Checkpointing issue timelines at {len(completed_issue_timeline_numbers)}/{len(issues)} issues" | |
| ) | |
| _write_checkpoint( | |
| options=options, | |
| repo_slug=repo_slug, | |
| snapshot_id=snapshot_id, | |
| snapshot_dir=snapshot_dir, | |
| effective_since=effective_since, | |
| crawl_started_at=crawl_started_at, | |
| extracted_at=extracted_at, | |
| previous_snapshot_dir=previous_snapshot_dir, | |
| merge_with_previous=merge_with_previous, | |
| phase="fetching_issue_timelines", | |
| comments_done=comments_done, | |
| issue_rows=issue_rows, | |
| comment_rows=comment_rows, | |
| pr_rows=pr_rows, | |
| review_rows=review_rows, | |
| review_comment_rows=review_comment_rows, | |
| pr_file_rows=pr_file_rows, | |
| pr_diff_rows=pr_diff_rows, | |
| timeline_rows=timeline_rows, | |
| completed_issue_timeline_numbers=completed_issue_timeline_numbers, | |
| ) | |
| _log(f"Collected {len(timeline_rows)} timeline events") | |
| # Derive link rows, then optionally merge this delta into the previous full snapshot. | |
| _log("Building derived link rows") | |
| link_rows: list[dict[str, Any]] = [] | |
| for issue_row in issue_rows: | |
| link_rows.extend( | |
| build_text_link_rows( | |
| repo=repo_slug, | |
| owner=options.repo.owner, | |
| repo_name=options.repo.name, | |
| source_type="issue", | |
| source_number=issue_row["number"], | |
| source_id=issue_row["github_id"], | |
| body=issue_row["body"], | |
| snapshot_id=snapshot_id, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| for pr_row in pr_rows: | |
| link_rows.extend( | |
| build_text_link_rows( | |
| repo=repo_slug, | |
| owner=options.repo.owner, | |
| repo_name=options.repo.name, | |
| source_type="pull_request", | |
| source_number=pr_row["number"], | |
| source_id=pr_row["github_id"], | |
| body=pr_row["body"], | |
| snapshot_id=snapshot_id, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| for comment_row in comment_rows: | |
| parent_number = comment_row.get("parent_number") | |
| if parent_number is None: | |
| continue | |
| link_rows.extend( | |
| build_text_link_rows( | |
| repo=repo_slug, | |
| owner=options.repo.owner, | |
| repo_name=options.repo.name, | |
| source_type="comment", | |
| source_number=parent_number, | |
| source_id=comment_row["github_id"], | |
| body=comment_row["body"], | |
| snapshot_id=snapshot_id, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| for review_row in review_rows: | |
| link_rows.extend( | |
| build_text_link_rows( | |
| repo=repo_slug, | |
| owner=options.repo.owner, | |
| repo_name=options.repo.name, | |
| source_type="review", | |
| source_number=review_row["pull_request_number"], | |
| source_id=review_row["github_id"], | |
| body=review_row["body"], | |
| snapshot_id=snapshot_id, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| for review_comment_row in review_comment_rows: | |
| link_rows.extend( | |
| build_text_link_rows( | |
| repo=repo_slug, | |
| owner=options.repo.owner, | |
| repo_name=options.repo.name, | |
| source_type="review_comment", | |
| source_number=review_comment_row["pull_request_number"], | |
| source_id=review_comment_row["github_id"], | |
| body=review_comment_row["body"], | |
| snapshot_id=snapshot_id, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| link_rows.extend( | |
| build_pr_duplicate_candidate_rows( | |
| repo=repo_slug, | |
| pull_requests=pr_rows, | |
| link_rows=link_rows, | |
| snapshot_id=snapshot_id, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| for event in timeline_rows: | |
| if event.get("source_issue_number"): | |
| link_rows.append( | |
| { | |
| "repo": repo_slug, | |
| "source_type": event["parent_kind"], | |
| "source_number": event["parent_number"], | |
| "source_github_id": None, | |
| "target_owner": options.repo.owner, | |
| "target_repo": options.repo.name, | |
| "target_number": event["source_issue_number"], | |
| "link_type": f"timeline:{event['event']}", | |
| "link_origin": "timeline", | |
| "snapshot_id": snapshot_id, | |
| "extracted_at": extracted_at, | |
| } | |
| ) | |
| _log(f"Built {len(link_rows)} link rows") | |
| delta_tables = { | |
| "issues": issue_rows, | |
| "pull_requests": pr_rows, | |
| "comments": comment_rows, | |
| "reviews": review_rows, | |
| "review_comments": review_comment_rows, | |
| "pr_files": pr_file_rows, | |
| "pr_diffs": pr_diff_rows, | |
| "links": link_rows, | |
| "events": timeline_rows, | |
| } | |
| final_tables = dict(delta_tables) | |
| if merge_with_previous: | |
| _log("Loading previous snapshot tables for merge") | |
| for table_name, delta_rows in delta_tables.items(): | |
| previous_rows = _load_previous_rows(previous_snapshot_dir, table_name) | |
| final_tables[table_name] = _merge_rows(table_name, previous_rows, delta_rows) | |
| _log("Merged incremental delta into cumulative snapshot") | |
| manifest = { | |
| "repo": repo_slug, | |
| "snapshot_id": snapshot_id, | |
| "crawl_started_at": crawl_started_at, | |
| "extracted_at": extracted_at, | |
| "watermark": { | |
| "effective_since": effective_since, | |
| "next_since": crawl_started_at, | |
| "resume_enabled": options.resume, | |
| "resumed_from_checkpoint": bool(checkpoint), | |
| "merge_with_previous": merge_with_previous, | |
| "previous_snapshot_dir": str(previous_snapshot_dir) if previous_snapshot_dir else None, | |
| }, | |
| "options": { | |
| "since": options.since, | |
| "effective_since": effective_since, | |
| "http_timeout": options.http_timeout, | |
| "http_max_retries": options.http_max_retries, | |
| "max_issues": options.max_issues, | |
| "max_prs": options.max_prs, | |
| "max_issue_comments": options.max_issue_comments, | |
| "max_reviews_per_pr": options.max_reviews_per_pr, | |
| "max_review_comments_per_pr": options.max_review_comments_per_pr, | |
| "issue_max_age_days": options.issue_max_age_days, | |
| "pr_max_age_days": options.pr_max_age_days, | |
| "fetch_timeline": options.fetch_timeline, | |
| "new_contributor_report": options.new_contributor_report, | |
| "new_contributor_window_days": options.new_contributor_window_days, | |
| "new_contributor_max_authors": options.new_contributor_max_authors, | |
| }, | |
| "delta_counts": { | |
| "issue_stubs": len(issue_stubs), | |
| "issues": len(issue_rows), | |
| "pull_requests": len(pr_rows), | |
| "comments": len(comment_rows), | |
| "reviews": len(review_rows), | |
| "review_comments": len(review_comment_rows), | |
| "pr_files": len(pr_file_rows), | |
| "pr_diffs": len(pr_diff_rows), | |
| "timeline_events": len(timeline_rows), | |
| "links": len(link_rows), | |
| }, | |
| "counts": { | |
| "issues": len(final_tables["issues"]), | |
| "pull_requests": len(final_tables["pull_requests"]), | |
| "comments": len(final_tables["comments"]), | |
| "reviews": len(final_tables["reviews"]), | |
| "review_comments": len(final_tables["review_comments"]), | |
| "pr_files": len(final_tables["pr_files"]), | |
| "pr_diffs": len(final_tables["pr_diffs"]), | |
| "timeline_events": len(final_tables["events"]), | |
| "links": len(final_tables["links"]), | |
| }, | |
| } | |
| # Write the final snapshot, derived viewer tables, manifest, and optional publish artifacts. | |
| _log("Writing Parquet snapshot files") | |
| write_parquet(final_tables["issues"], snapshot_dir / "issues.parquet", "issues") | |
| write_parquet( | |
| final_tables["pull_requests"], snapshot_dir / "pull_requests.parquet", "pull_requests" | |
| ) | |
| write_parquet(final_tables["comments"], snapshot_dir / "comments.parquet", "comments") | |
| issue_comment_rows, pr_comment_rows = _viewer_comment_rows( | |
| final_tables["comments"], | |
| final_tables["pull_requests"], | |
| ) | |
| write_parquet(issue_comment_rows, snapshot_dir / "issue_comments.parquet", "comments") | |
| write_parquet(pr_comment_rows, snapshot_dir / "pr_comments.parquet", "comments") | |
| write_parquet(final_tables["reviews"], snapshot_dir / "reviews.parquet", "reviews") | |
| write_parquet(final_tables["pr_files"], snapshot_dir / "pr_files.parquet", "pr_files") | |
| write_parquet(final_tables["pr_diffs"], snapshot_dir / "pr_diffs.parquet", "pr_diffs") | |
| write_parquet( | |
| final_tables["review_comments"], snapshot_dir / "review_comments.parquet", "review_comments" | |
| ) | |
| write_parquet(final_tables["links"], snapshot_dir / "links.parquet", "links") | |
| write_parquet(final_tables["events"], snapshot_dir / "events.parquet", "events") | |
| write_json(manifest, snapshot_dir / "manifest.json") | |
| generated_new_contributor_report = False | |
| if options.new_contributor_report: | |
| _log("Generating new contributor dataset/report artifacts") | |
| run_new_contributor_report( | |
| NewContributorReportOptions( | |
| snapshot_dir=snapshot_dir, | |
| output_dir=options.output_dir, | |
| output=None, | |
| json_output=None, | |
| hf_repo_id=None, | |
| hf_revision=None, | |
| hf_materialize_dir=None, | |
| window_days=options.new_contributor_window_days, | |
| max_authors=options.new_contributor_max_authors, | |
| ) | |
| ) | |
| generated_new_contributor_report = True | |
| new_contributor_rows = read_parquet_rows(snapshot_dir / "new_contributors.parquet") | |
| manifest["counts"]["new_contributors"] = len(new_contributor_rows) | |
| manifest["artifacts"] = { | |
| "new_contributors_parquet": "new_contributors.parquet", | |
| "new_contributors_json": "new-contributors-report.json", | |
| "new_contributors_markdown": "new-contributors-report.md", | |
| } | |
| write_json(manifest, snapshot_dir / "manifest.json") | |
| write_text( | |
| _dataset_card( | |
| repo_slug, | |
| snapshot_id, | |
| manifest, | |
| include_new_contributors=generated_new_contributor_report, | |
| ), | |
| snapshot_dir / "README.md", | |
| ) | |
| _log("Wrote manifest and dataset card") | |
| latest = _latest_snapshot_pointer_path(options.output_dir) | |
| write_json( | |
| { | |
| "repo": repo_slug, | |
| "latest_snapshot_id": snapshot_id, | |
| "snapshot_dir": str(snapshot_dir), | |
| "manifest_path": str(snapshot_dir / "manifest.json"), | |
| "next_since": crawl_started_at, | |
| }, | |
| latest, | |
| ) | |
| _log(f"Updated latest snapshot pointer: {latest}") | |
| watermark_payload = { | |
| "repo": repo_slug, | |
| "last_successful_snapshot_id": snapshot_id, | |
| "snapshot_dir": str(snapshot_dir), | |
| "effective_since": effective_since, | |
| "next_since": crawl_started_at, | |
| "updated_at": extracted_at, | |
| } | |
| write_json(watermark_payload, _watermark_path(options.output_dir)) | |
| _log(f"Updated watermark state: {_watermark_path(options.output_dir)}") | |
| _clear_checkpoint(options.output_dir, snapshot_dir) | |
| _log(f"Snapshot complete: {snapshot_dir}") | |
| return snapshot_dir | |