from __future__ import annotations import json import shutil from collections.abc import Iterable from datetime import UTC, datetime, timedelta from pathlib import Path from typing import Any, Protocol from slop_farmer.config import NewContributorReportOptions, PipelineOptions, resolve_github_token from slop_farmer.data.dataset_card import build_hf_dataset_card from slop_farmer.data.github_api import GitHubClient from slop_farmer.data.links import build_pr_duplicate_candidate_rows, build_text_link_rows from slop_farmer.data.normalize import ( issue_url_to_number, normalize_comment, normalize_issue, normalize_pr_diff, normalize_pr_file, normalize_pull_request, normalize_review, normalize_review_comment, normalize_timeline_event, ) from slop_farmer.data.parquet_io import ( read_json, read_parquet_rows, write_json, write_parquet, write_text, ) from slop_farmer.reports.new_contributor_report import run_new_contributor_report # Navigation: # - protocol + small time/log/view helpers # - checkpoint/state helpers for resumable crawls # - incremental merge helpers # - run_pipeline(): fetch -> hydrate -> derive links -> merge/write -> publish class GitHubClientLike(Protocol): def iter_repo_issues( self, owner: str, repo: str, since: str | None, limit: int | None ) -> Iterable[dict[str, Any]]: ... def iter_issue_comments_for_number( self, owner: str, repo: str, number: int, since: str | None, limit: int | None = None ) -> Iterable[dict[str, Any]]: ... def get_pull_request(self, owner: str, repo: str, number: int) -> dict[str, Any]: ... def iter_pull_reviews( self, owner: str, repo: str, number: int, limit: int | None = None ) -> Iterable[dict[str, Any]]: ... def iter_pull_review_comments( self, owner: str, repo: str, number: int, limit: int | None = None ) -> Iterable[dict[str, Any]]: ... def iter_pull_files( self, owner: str, repo: str, number: int, limit: int | None = None ) -> Iterable[dict[str, Any]]: ... def get_pull_request_diff(self, owner: str, repo: str, number: int) -> str: ... def iter_issue_timeline( self, owner: str, repo: str, number: int, limit: int | None = None ) -> Iterable[dict[str, Any]]: ... def _iso_now() -> str: return datetime.now(tz=UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z") def _snapshot_id() -> str: return datetime.now(tz=UTC).strftime("%Y%m%dT%H%M%SZ") def _log(message: str) -> None: stamp = datetime.now(tz=UTC).strftime("%H:%M:%SZ") print(f"[{stamp}] {message}", flush=True) def _remaining_limit(limit: int | None, used: int) -> int | None: if limit is None: return None return max(limit - used, 0) def _created_after_cutoff( item: dict[str, Any], max_age_days: int | None, reference_time: datetime ) -> bool: if max_age_days is None: return True created_at = item.get("created_at") if not created_at: return False try: created_dt = datetime.fromisoformat(str(created_at).replace("Z", "+00:00")) except ValueError: return False return created_dt >= reference_time - timedelta(days=max_age_days) def _reference_time_for_age_caps(crawl_started_at: str) -> datetime: try: return datetime.fromisoformat(crawl_started_at.replace("Z", "+00:00")) except ValueError: return datetime.now(tz=UTC) def _dataset_card( repo: str, snapshot_id: str, manifest: dict[str, Any], *, include_new_contributors: bool = False ) -> str: notes = ["new contributor reviewer artifacts are included"] if include_new_contributors else [] del manifest return build_hf_dataset_card( repo, snapshot_id, include_new_contributors=include_new_contributors, notes=notes, ) def _viewer_comment_rows( comments: list[dict[str, Any]], pull_requests: list[dict[str, Any]], ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: pr_numbers = {int(row["number"]) for row in pull_requests if row.get("number") is not None} issue_comments: list[dict[str, Any]] = [] pr_comments: list[dict[str, Any]] = [] for row in comments: parent_number = row.get("parent_number") parent_kind = row.get("parent_kind") if parent_kind == "pull_request" or parent_number in pr_numbers: pr_comments.append(row) else: issue_comments.append(row) return issue_comments, pr_comments PRIMARY_KEYS: dict[str, tuple[str, ...]] = { "issues": ("github_id",), "pull_requests": ("github_id",), "comments": ("github_id",), "reviews": ("github_id",), "review_comments": ("github_id",), "pr_files": ("repo", "pull_request_number", "filename"), "pr_diffs": ("repo", "pull_request_number"), "links": ( "repo", "source_type", "source_number", "source_github_id", "target_owner", "target_repo", "target_number", "link_type", "link_origin", ), "events": ( "repo", "parent_kind", "parent_number", "event", "created_at", "actor_login", "source_issue_number", "source_issue_url", "commit_id", "label_name", ), } CHECKPOINT_VERSION = 1 CHECKPOINT_PR_INTERVAL = 5 CHECKPOINT_ISSUE_TIMELINE_INTERVAL = 25 CHECKPOINT_TABLE_NAMES = ( "issues", "comments", "pull_requests", "reviews", "review_comments", "pr_files", "pr_diffs", "events", ) # Checkpoint/state helpers def _state_dir(output_dir: Path) -> Path: return output_dir / "state" def _in_progress_path(output_dir: Path) -> Path: return _state_dir(output_dir) / "in_progress.json" def _watermark_path(output_dir: Path) -> Path: return _state_dir(output_dir) / "watermark.json" def _latest_snapshot_pointer_path(output_dir: Path) -> Path: return output_dir / "snapshots" / "latest.json" def _checkpoint_dir(snapshot_dir: Path) -> Path: return snapshot_dir / "_checkpoint" def _checkpoint_progress_path(snapshot_dir: Path) -> Path: return _checkpoint_dir(snapshot_dir) / "progress.json" def _checkpoint_table_path(snapshot_dir: Path, table_name: str) -> Path: return _checkpoint_dir(snapshot_dir) / f"{table_name}.parquet" def _checkpoint_options(options: PipelineOptions) -> dict[str, Any]: return { "max_issues": options.max_issues, "max_prs": options.max_prs, "max_issue_comments": options.max_issue_comments, "max_reviews_per_pr": options.max_reviews_per_pr, "max_review_comments_per_pr": options.max_review_comments_per_pr, "issue_max_age_days": options.issue_max_age_days, "pr_max_age_days": options.pr_max_age_days, "fetch_timeline": options.fetch_timeline, } def _load_in_progress_checkpoint(options: PipelineOptions, repo_slug: str) -> dict[str, Any] | None: if not options.resume or options.since: return None path = _in_progress_path(options.output_dir) if not path.exists(): return None payload = read_json(path) if not isinstance(payload, dict): return None if payload.get("version") != CHECKPOINT_VERSION or payload.get("repo") != repo_slug: return None if payload.get("options") != _checkpoint_options(options): _log(f"Ignoring incompatible in-progress checkpoint: {path}") return None snapshot_dir_raw = payload.get("snapshot_dir") if not isinstance(snapshot_dir_raw, str) or not snapshot_dir_raw: return None payload["snapshot_dir"] = Path(snapshot_dir_raw) previous_snapshot_dir_raw = payload.get("previous_snapshot_dir") payload["previous_snapshot_dir"] = ( Path(previous_snapshot_dir_raw) if isinstance(previous_snapshot_dir_raw, str) and previous_snapshot_dir_raw else None ) return payload def _load_checkpoint_rows(snapshot_dir: Path, table_name: str) -> list[dict[str, Any]]: return read_parquet_rows(_checkpoint_table_path(snapshot_dir, table_name)) def _write_checkpoint( *, options: PipelineOptions, repo_slug: str, snapshot_id: str, snapshot_dir: Path, effective_since: str | None, crawl_started_at: str, extracted_at: str, previous_snapshot_dir: Path | None, merge_with_previous: bool, phase: str, comments_done: bool, issue_rows: list[dict[str, Any]], comment_rows: list[dict[str, Any]], pr_rows: list[dict[str, Any]], review_rows: list[dict[str, Any]], review_comment_rows: list[dict[str, Any]], pr_file_rows: list[dict[str, Any]], pr_diff_rows: list[dict[str, Any]], timeline_rows: list[dict[str, Any]], completed_issue_timeline_numbers: set[int], ) -> None: checkpoint_tables = { "issues": issue_rows, "comments": comment_rows, "pull_requests": pr_rows, "reviews": review_rows, "review_comments": review_comment_rows, "pr_files": pr_file_rows, "pr_diffs": pr_diff_rows, "events": timeline_rows, } for table_name, rows in checkpoint_tables.items(): write_parquet(rows, _checkpoint_table_path(snapshot_dir, table_name), table_name) progress = { "version": CHECKPOINT_VERSION, "repo": repo_slug, "snapshot_id": snapshot_id, "snapshot_dir": str(snapshot_dir), "effective_since": effective_since, "crawl_started_at": crawl_started_at, "extracted_at": extracted_at, "phase": phase, "comments_done": comments_done, "merge_with_previous": merge_with_previous, "previous_snapshot_dir": str(previous_snapshot_dir) if previous_snapshot_dir else None, "completed_pr_numbers": sorted( int(row["number"]) for row in pr_rows if row.get("number") is not None ), "completed_issue_timeline_numbers": sorted(completed_issue_timeline_numbers), "options": _checkpoint_options(options), "counts": {table_name: len(rows) for table_name, rows in checkpoint_tables.items()}, } write_json(progress, _checkpoint_progress_path(snapshot_dir)) write_json(progress, _in_progress_path(options.output_dir)) def _clear_checkpoint(output_dir: Path, snapshot_dir: Path) -> None: checkpoint_state = _in_progress_path(output_dir) if checkpoint_state.exists(): checkpoint_state.unlink() checkpoint_dir = _checkpoint_dir(snapshot_dir) if checkpoint_dir.exists(): shutil.rmtree(checkpoint_dir) def _load_watermark(output_dir: Path) -> dict[str, Any] | None: path = _watermark_path(output_dir) if not path.exists(): return None return read_json(path) def _load_latest_snapshot_pointer(output_dir: Path) -> dict[str, Any] | None: path = _latest_snapshot_pointer_path(output_dir) if not path.exists(): return None return read_json(path) def _resolve_effective_since( options: PipelineOptions, repo_slug: str ) -> tuple[str | None, dict[str, Any] | None]: if options.since: return options.since, None if not options.resume: return None, None watermark = _load_watermark(options.output_dir) if not watermark or watermark.get("repo") != repo_slug: return None, watermark return watermark.get("next_since"), watermark def _previous_snapshot_dir(output_dir: Path, repo_slug: str) -> Path | None: latest = _load_latest_snapshot_pointer(output_dir) if not latest: return None latest_repo = latest.get("repo") if latest_repo and latest_repo != repo_slug: return None snapshot_dir = latest.get("snapshot_dir") if not snapshot_dir: return None path = Path(snapshot_dir) return path if path.exists() else None # Incremental merge helpers def _row_key(row: dict[str, Any], key_fields: tuple[str, ...]) -> str: return json.dumps([row.get(field) for field in key_fields], sort_keys=False, default=str) def _merge_rows( table_name: str, previous_rows: list[dict[str, Any]], delta_rows: list[dict[str, Any]] ) -> list[dict[str, Any]]: key_fields = PRIMARY_KEYS[table_name] if table_name == "pr_files": refreshed_prs = { (row.get("repo"), row.get("pull_request_number")) for row in delta_rows if row.get("pull_request_number") is not None } previous_rows = [ row for row in previous_rows if (row.get("repo"), row.get("pull_request_number")) not in refreshed_prs ] merged: dict[str, dict[str, Any]] = {} for row in previous_rows: merged[_row_key(row, key_fields)] = row for row in delta_rows: merged[_row_key(row, key_fields)] = row rows = list(merged.values()) sort_fields = [ field for field in ("number", "pull_request_number", "parent_number", "github_id", "created_at") if rows and field in rows[0] ] if sort_fields: rows.sort( key=lambda row: tuple( "" if row.get(field) is None else str(row.get(field)) for field in sort_fields ) ) return rows def _load_previous_rows(snapshot_dir: Path | None, table_name: str) -> list[dict[str, Any]]: if snapshot_dir is None: return [] return read_parquet_rows(snapshot_dir / f"{table_name}.parquet") # Pipeline orchestration def run_pipeline(options: PipelineOptions, client: GitHubClientLike | None = None) -> Path: # Resume or initialize one snapshot run. repo_slug = options.repo.slug checkpoint = _load_in_progress_checkpoint(options, repo_slug) watermark: dict[str, Any] | None = None if checkpoint: effective_since = checkpoint.get("effective_since") previous_snapshot_dir = checkpoint.get("previous_snapshot_dir") merge_with_previous = bool(checkpoint.get("merge_with_previous")) crawl_started_at = str(checkpoint.get("crawl_started_at") or _iso_now()) snapshot_id = str(checkpoint.get("snapshot_id") or _snapshot_id()) extracted_at = str(checkpoint.get("extracted_at") or _iso_now()) snapshot_dir = checkpoint["snapshot_dir"] else: effective_since, watermark = _resolve_effective_since(options, repo_slug) previous_snapshot_dir = _previous_snapshot_dir(options.output_dir, repo_slug) merge_with_previous = previous_snapshot_dir is not None and effective_since is not None crawl_started_at = _iso_now() snapshot_id = _snapshot_id() extracted_at = _iso_now() snapshot_dir = options.output_dir / "snapshots" / snapshot_id snapshot_dir.mkdir(parents=True, exist_ok=True) if client is None: token = resolve_github_token() client = GitHubClient( token=token, timeout=options.http_timeout, max_retries=options.http_max_retries, log=_log, ) if checkpoint: _log(f"Resuming snapshot {snapshot_id} for {repo_slug}") _log(f"Recovered in-progress checkpoint: {_in_progress_path(options.output_dir)}") else: _log(f"Starting snapshot {snapshot_id} for {repo_slug}") _log(f"Output directory: {snapshot_dir}") if options.since: _log(f"Using explicit since watermark: {effective_since}") elif checkpoint: _log(f"Resuming in-progress crawl window from {effective_since}") elif effective_since: source_snapshot = watermark.get("last_successful_snapshot_id") if watermark else None _log(f"Resuming from local watermark {effective_since} from snapshot {source_snapshot}") else: _log("No watermark active; running full snapshot") if merge_with_previous: _log(f"Merging delta into previous snapshot: {previous_snapshot_dir}") # Load any checkpointed tables before resuming remote work. issue_rows = _load_checkpoint_rows(snapshot_dir, "issues") if checkpoint else [] comment_rows = _load_checkpoint_rows(snapshot_dir, "comments") if checkpoint else [] pr_rows = _load_checkpoint_rows(snapshot_dir, "pull_requests") if checkpoint else [] review_rows = _load_checkpoint_rows(snapshot_dir, "reviews") if checkpoint else [] review_comment_rows = ( _load_checkpoint_rows(snapshot_dir, "review_comments") if checkpoint else [] ) pr_file_rows = _load_checkpoint_rows(snapshot_dir, "pr_files") if checkpoint else [] pr_diff_rows = _load_checkpoint_rows(snapshot_dir, "pr_diffs") if checkpoint else [] timeline_rows = _load_checkpoint_rows(snapshot_dir, "events") if checkpoint else [] completed_issue_timeline_numbers = { int(number) for number in (checkpoint or {}).get("completed_issue_timeline_numbers", []) if number is not None } comments_done = bool((checkpoint or {}).get("comments_done")) if not checkpoint: _write_checkpoint( options=options, repo_slug=repo_slug, snapshot_id=snapshot_id, snapshot_dir=snapshot_dir, effective_since=effective_since, crawl_started_at=crawl_started_at, extracted_at=extracted_at, previous_snapshot_dir=previous_snapshot_dir, merge_with_previous=merge_with_previous, phase="starting", comments_done=False, issue_rows=issue_rows, comment_rows=comment_rows, pr_rows=pr_rows, review_rows=review_rows, review_comment_rows=review_comment_rows, pr_file_rows=pr_file_rows, pr_diff_rows=pr_diff_rows, timeline_rows=timeline_rows, completed_issue_timeline_numbers=completed_issue_timeline_numbers, ) # Fetch lightweight issue/PR stubs and top-level discussion comments first. _log("Fetching issue and pull request stubs from GitHub") issue_stubs = list( client.iter_repo_issues( options.repo.owner, options.repo.name, since=effective_since, limit=options.max_issues, ) ) reference_time = _reference_time_for_age_caps(crawl_started_at) issues = [ item for item in issue_stubs if "pull_request" not in item and _created_after_cutoff(item, options.issue_max_age_days, reference_time) ] pr_stubs = [ item for item in issue_stubs if "pull_request" in item and _created_after_cutoff(item, options.pr_max_age_days, reference_time) ] if options.max_prs is not None: pr_stubs = pr_stubs[: options.max_prs] _log( f"Fetched {len(issue_stubs)} stubs total: {len(issues)} issues and {len(pr_stubs)} pull requests selected" ) if options.issue_max_age_days is not None: _log(f"Issue import age cap: last {options.issue_max_age_days} days by created_at") if options.pr_max_age_days is not None: _log(f"PR import age cap: last {options.pr_max_age_days} days by created_at") issue_number_to_kind = { item["number"]: ("pull_request" if "pull_request" in item else "issue") for item in issue_stubs } issue_rows = [normalize_issue(repo_slug, item, snapshot_id, extracted_at) for item in issues] if comments_done: _log(f"Reusing {len(comment_rows)} checkpointed discussion comments") else: comment_rows = [] comment_threads_seen = 0 for item in issue_stubs: if not item.get("comments"): continue remaining = _remaining_limit(options.max_issue_comments, len(comment_rows)) if remaining == 0: break comment_threads_seen += 1 if comment_threads_seen == 1 or comment_threads_seen % 25 == 0: _log( f"Collecting discussion comments: {len(comment_rows)} gathered so far across {comment_threads_seen} threads" ) for comment in client.iter_issue_comments_for_number( options.repo.owner, options.repo.name, int(item["number"]), since=effective_since, limit=remaining, ): parent_number = issue_url_to_number(comment.get("issue_url")) kind = issue_number_to_kind.get(parent_number, "issue_or_pr") comment_rows.append( normalize_comment( repo_slug, comment, kind, parent_number, snapshot_id, extracted_at ) ) remaining = _remaining_limit(options.max_issue_comments, len(comment_rows)) if remaining == 0: break comments_done = True _write_checkpoint( options=options, repo_slug=repo_slug, snapshot_id=snapshot_id, snapshot_dir=snapshot_dir, effective_since=effective_since, crawl_started_at=crawl_started_at, extracted_at=extracted_at, previous_snapshot_dir=previous_snapshot_dir, merge_with_previous=merge_with_previous, phase="comments_complete", comments_done=comments_done, issue_rows=issue_rows, comment_rows=comment_rows, pr_rows=pr_rows, review_rows=review_rows, review_comment_rows=review_comment_rows, pr_file_rows=pr_file_rows, pr_diff_rows=pr_diff_rows, timeline_rows=timeline_rows, completed_issue_timeline_numbers=completed_issue_timeline_numbers, ) _log(f"Collected {len(comment_rows)} discussion comments") # Hydrate PR-owned detail tables: reviews, review comments, files, diffs, timelines. completed_pr_numbers = {int(row["number"]) for row in pr_rows if row.get("number") is not None} if completed_pr_numbers: _log(f"Reusing hydrated data for {len(completed_pr_numbers)} pull requests from checkpoint") total_prs = len(pr_stubs) for pr_stub in pr_stubs: number = int(pr_stub["number"]) if number in completed_pr_numbers: continue current_pr_index = len(completed_pr_numbers) + 1 if current_pr_index == 1 or current_pr_index % 10 == 0 or current_pr_index == total_prs: _log(f"Hydrating pull requests: {current_pr_index}/{total_prs} (current #{number})") pr_detail = client.get_pull_request(options.repo.owner, options.repo.name, number) pr_rows.append( normalize_pull_request(repo_slug, pr_stub, pr_detail, snapshot_id, extracted_at) ) for review in client.iter_pull_reviews( options.repo.owner, options.repo.name, number, limit=options.max_reviews_per_pr, ): review_rows.append( normalize_review(repo_slug, number, review, snapshot_id, extracted_at) ) for review_comment in client.iter_pull_review_comments( options.repo.owner, options.repo.name, number, limit=options.max_review_comments_per_pr, ): review_comment_rows.append( normalize_review_comment( repo_slug, number, review_comment, snapshot_id, extracted_at ) ) for pr_file in client.iter_pull_files(options.repo.owner, options.repo.name, number): pr_file_rows.append( normalize_pr_file(repo_slug, number, pr_file, snapshot_id, extracted_at) ) pr_diff_rows.append( normalize_pr_diff( repo_slug, number, pr_stub.get("html_url"), pr_stub.get("url"), client.get_pull_request_diff(options.repo.owner, options.repo.name, number), snapshot_id, extracted_at, ) ) if options.fetch_timeline: for event in client.iter_issue_timeline(options.repo.owner, options.repo.name, number): timeline_rows.append( normalize_timeline_event( repo_slug, number, "pull_request", event, snapshot_id, extracted_at ) ) completed_pr_numbers.add(number) if ( len(completed_pr_numbers) % CHECKPOINT_PR_INTERVAL == 0 or len(completed_pr_numbers) == total_prs ): _log(f"Checkpointing pull request hydration at {len(completed_pr_numbers)}/{total_prs}") _write_checkpoint( options=options, repo_slug=repo_slug, snapshot_id=snapshot_id, snapshot_dir=snapshot_dir, effective_since=effective_since, crawl_started_at=crawl_started_at, extracted_at=extracted_at, previous_snapshot_dir=previous_snapshot_dir, merge_with_previous=merge_with_previous, phase="hydrating_pull_requests", comments_done=comments_done, issue_rows=issue_rows, comment_rows=comment_rows, pr_rows=pr_rows, review_rows=review_rows, review_comment_rows=review_comment_rows, pr_file_rows=pr_file_rows, pr_diff_rows=pr_diff_rows, timeline_rows=timeline_rows, completed_issue_timeline_numbers=completed_issue_timeline_numbers, ) _log( f"Hydrated {len(pr_rows)} pull requests, {len(review_rows)} reviews, " f"{len(review_comment_rows)} review comments, {len(pr_file_rows)} PR files, " f"and {len(pr_diff_rows)} PR diffs" ) # Fetch issue timelines after PR hydration so checkpoints can resume either phase. if options.fetch_timeline: _log(f"Fetching timeline events for {len(issues)} issues") if completed_issue_timeline_numbers: _log(f"Reusing timeline checkpoints for {len(completed_issue_timeline_numbers)} issues") for issue in issues: number = int(issue["number"]) if number in completed_issue_timeline_numbers: continue for event in client.iter_issue_timeline(options.repo.owner, options.repo.name, number): timeline_rows.append( normalize_timeline_event( repo_slug, number, "issue", event, snapshot_id, extracted_at ) ) completed_issue_timeline_numbers.add(number) if len( completed_issue_timeline_numbers ) % CHECKPOINT_ISSUE_TIMELINE_INTERVAL == 0 or len( completed_issue_timeline_numbers ) == len(issues): _log( f"Checkpointing issue timelines at {len(completed_issue_timeline_numbers)}/{len(issues)} issues" ) _write_checkpoint( options=options, repo_slug=repo_slug, snapshot_id=snapshot_id, snapshot_dir=snapshot_dir, effective_since=effective_since, crawl_started_at=crawl_started_at, extracted_at=extracted_at, previous_snapshot_dir=previous_snapshot_dir, merge_with_previous=merge_with_previous, phase="fetching_issue_timelines", comments_done=comments_done, issue_rows=issue_rows, comment_rows=comment_rows, pr_rows=pr_rows, review_rows=review_rows, review_comment_rows=review_comment_rows, pr_file_rows=pr_file_rows, pr_diff_rows=pr_diff_rows, timeline_rows=timeline_rows, completed_issue_timeline_numbers=completed_issue_timeline_numbers, ) _log(f"Collected {len(timeline_rows)} timeline events") # Derive link rows, then optionally merge this delta into the previous full snapshot. _log("Building derived link rows") link_rows: list[dict[str, Any]] = [] for issue_row in issue_rows: link_rows.extend( build_text_link_rows( repo=repo_slug, owner=options.repo.owner, repo_name=options.repo.name, source_type="issue", source_number=issue_row["number"], source_id=issue_row["github_id"], body=issue_row["body"], snapshot_id=snapshot_id, extracted_at=extracted_at, ) ) for pr_row in pr_rows: link_rows.extend( build_text_link_rows( repo=repo_slug, owner=options.repo.owner, repo_name=options.repo.name, source_type="pull_request", source_number=pr_row["number"], source_id=pr_row["github_id"], body=pr_row["body"], snapshot_id=snapshot_id, extracted_at=extracted_at, ) ) for comment_row in comment_rows: parent_number = comment_row.get("parent_number") if parent_number is None: continue link_rows.extend( build_text_link_rows( repo=repo_slug, owner=options.repo.owner, repo_name=options.repo.name, source_type="comment", source_number=parent_number, source_id=comment_row["github_id"], body=comment_row["body"], snapshot_id=snapshot_id, extracted_at=extracted_at, ) ) for review_row in review_rows: link_rows.extend( build_text_link_rows( repo=repo_slug, owner=options.repo.owner, repo_name=options.repo.name, source_type="review", source_number=review_row["pull_request_number"], source_id=review_row["github_id"], body=review_row["body"], snapshot_id=snapshot_id, extracted_at=extracted_at, ) ) for review_comment_row in review_comment_rows: link_rows.extend( build_text_link_rows( repo=repo_slug, owner=options.repo.owner, repo_name=options.repo.name, source_type="review_comment", source_number=review_comment_row["pull_request_number"], source_id=review_comment_row["github_id"], body=review_comment_row["body"], snapshot_id=snapshot_id, extracted_at=extracted_at, ) ) link_rows.extend( build_pr_duplicate_candidate_rows( repo=repo_slug, pull_requests=pr_rows, link_rows=link_rows, snapshot_id=snapshot_id, extracted_at=extracted_at, ) ) for event in timeline_rows: if event.get("source_issue_number"): link_rows.append( { "repo": repo_slug, "source_type": event["parent_kind"], "source_number": event["parent_number"], "source_github_id": None, "target_owner": options.repo.owner, "target_repo": options.repo.name, "target_number": event["source_issue_number"], "link_type": f"timeline:{event['event']}", "link_origin": "timeline", "snapshot_id": snapshot_id, "extracted_at": extracted_at, } ) _log(f"Built {len(link_rows)} link rows") delta_tables = { "issues": issue_rows, "pull_requests": pr_rows, "comments": comment_rows, "reviews": review_rows, "review_comments": review_comment_rows, "pr_files": pr_file_rows, "pr_diffs": pr_diff_rows, "links": link_rows, "events": timeline_rows, } final_tables = dict(delta_tables) if merge_with_previous: _log("Loading previous snapshot tables for merge") for table_name, delta_rows in delta_tables.items(): previous_rows = _load_previous_rows(previous_snapshot_dir, table_name) final_tables[table_name] = _merge_rows(table_name, previous_rows, delta_rows) _log("Merged incremental delta into cumulative snapshot") manifest = { "repo": repo_slug, "snapshot_id": snapshot_id, "crawl_started_at": crawl_started_at, "extracted_at": extracted_at, "watermark": { "effective_since": effective_since, "next_since": crawl_started_at, "resume_enabled": options.resume, "resumed_from_checkpoint": bool(checkpoint), "merge_with_previous": merge_with_previous, "previous_snapshot_dir": str(previous_snapshot_dir) if previous_snapshot_dir else None, }, "options": { "since": options.since, "effective_since": effective_since, "http_timeout": options.http_timeout, "http_max_retries": options.http_max_retries, "max_issues": options.max_issues, "max_prs": options.max_prs, "max_issue_comments": options.max_issue_comments, "max_reviews_per_pr": options.max_reviews_per_pr, "max_review_comments_per_pr": options.max_review_comments_per_pr, "issue_max_age_days": options.issue_max_age_days, "pr_max_age_days": options.pr_max_age_days, "fetch_timeline": options.fetch_timeline, "new_contributor_report": options.new_contributor_report, "new_contributor_window_days": options.new_contributor_window_days, "new_contributor_max_authors": options.new_contributor_max_authors, }, "delta_counts": { "issue_stubs": len(issue_stubs), "issues": len(issue_rows), "pull_requests": len(pr_rows), "comments": len(comment_rows), "reviews": len(review_rows), "review_comments": len(review_comment_rows), "pr_files": len(pr_file_rows), "pr_diffs": len(pr_diff_rows), "timeline_events": len(timeline_rows), "links": len(link_rows), }, "counts": { "issues": len(final_tables["issues"]), "pull_requests": len(final_tables["pull_requests"]), "comments": len(final_tables["comments"]), "reviews": len(final_tables["reviews"]), "review_comments": len(final_tables["review_comments"]), "pr_files": len(final_tables["pr_files"]), "pr_diffs": len(final_tables["pr_diffs"]), "timeline_events": len(final_tables["events"]), "links": len(final_tables["links"]), }, } # Write the final snapshot, derived viewer tables, manifest, and optional publish artifacts. _log("Writing Parquet snapshot files") write_parquet(final_tables["issues"], snapshot_dir / "issues.parquet", "issues") write_parquet( final_tables["pull_requests"], snapshot_dir / "pull_requests.parquet", "pull_requests" ) write_parquet(final_tables["comments"], snapshot_dir / "comments.parquet", "comments") issue_comment_rows, pr_comment_rows = _viewer_comment_rows( final_tables["comments"], final_tables["pull_requests"], ) write_parquet(issue_comment_rows, snapshot_dir / "issue_comments.parquet", "comments") write_parquet(pr_comment_rows, snapshot_dir / "pr_comments.parquet", "comments") write_parquet(final_tables["reviews"], snapshot_dir / "reviews.parquet", "reviews") write_parquet(final_tables["pr_files"], snapshot_dir / "pr_files.parquet", "pr_files") write_parquet(final_tables["pr_diffs"], snapshot_dir / "pr_diffs.parquet", "pr_diffs") write_parquet( final_tables["review_comments"], snapshot_dir / "review_comments.parquet", "review_comments" ) write_parquet(final_tables["links"], snapshot_dir / "links.parquet", "links") write_parquet(final_tables["events"], snapshot_dir / "events.parquet", "events") write_json(manifest, snapshot_dir / "manifest.json") generated_new_contributor_report = False if options.new_contributor_report: _log("Generating new contributor dataset/report artifacts") run_new_contributor_report( NewContributorReportOptions( snapshot_dir=snapshot_dir, output_dir=options.output_dir, output=None, json_output=None, hf_repo_id=None, hf_revision=None, hf_materialize_dir=None, window_days=options.new_contributor_window_days, max_authors=options.new_contributor_max_authors, ) ) generated_new_contributor_report = True new_contributor_rows = read_parquet_rows(snapshot_dir / "new_contributors.parquet") manifest["counts"]["new_contributors"] = len(new_contributor_rows) manifest["artifacts"] = { "new_contributors_parquet": "new_contributors.parquet", "new_contributors_json": "new-contributors-report.json", "new_contributors_markdown": "new-contributors-report.md", } write_json(manifest, snapshot_dir / "manifest.json") write_text( _dataset_card( repo_slug, snapshot_id, manifest, include_new_contributors=generated_new_contributor_report, ), snapshot_dir / "README.md", ) _log("Wrote manifest and dataset card") latest = _latest_snapshot_pointer_path(options.output_dir) write_json( { "repo": repo_slug, "latest_snapshot_id": snapshot_id, "snapshot_dir": str(snapshot_dir), "manifest_path": str(snapshot_dir / "manifest.json"), "next_since": crawl_started_at, }, latest, ) _log(f"Updated latest snapshot pointer: {latest}") watermark_payload = { "repo": repo_slug, "last_successful_snapshot_id": snapshot_id, "snapshot_dir": str(snapshot_dir), "effective_since": effective_since, "next_since": crawl_started_at, "updated_at": extracted_at, } write_json(watermark_payload, _watermark_path(options.output_dir)) _log(f"Updated watermark state: {_watermark_path(options.output_dir)}") _clear_checkpoint(options.output_dir, snapshot_dir) _log(f"Snapshot complete: {snapshot_dir}") return snapshot_dir