from __future__ import annotations import json import urllib.error import urllib.request from datetime import UTC, datetime, timedelta from typing import Any from slop_farmer.config import resolve_github_token from slop_farmer.data.http import urlopen_with_retry GRAPHQL_URL = "https://api.github.com/graphql" PROFILE_QUERY = """ query UserActivityProfile($login: String!, $from: DateTime!, $to: DateTime!) { rateLimit { cost remaining resetAt } viewer { login organizations(first: 100) { totalCount nodes { login name } } } user(login: $login) { login name createdAt organizations(first: 100) { totalCount nodes { login name } } starredRepositories(first: 100, orderBy: {field: STARRED_AT, direction: DESC}) { totalCount nodes { nameWithOwner stargazerCount owner { login } } } contributionsCollection(from: $from, to: $to) { contributionCalendar { totalContributions weeks { contributionDays { date contributionCount } } } totalIssueContributions totalPullRequestContributions pullRequestContributionsByRepository(maxRepositories: 10) { repository { nameWithOwner } contributions { totalCount } } issueContributionsByRepository(maxRepositories: 10) { repository { nameWithOwner } contributions { totalCount } } } } } """.strip() SEARCH_PRS_QUERY = """ query SearchPullRequests($query: String!, $cursor: String) { rateLimit { cost remaining resetAt } search(type: ISSUE, query: $query, first: 100, after: $cursor) { issueCount pageInfo { hasNextPage endCursor } nodes { ... on PullRequest { number state merged createdAt updatedAt repository { nameWithOwner stargazerCount } } } } } """.strip() SEARCH_ISSUES_QUERY = """ query SearchIssues($query: String!, $cursor: String) { rateLimit { cost remaining resetAt } search(type: ISSUE, query: $query, first: 100, after: $cursor) { issueCount pageInfo { hasNextPage endCursor } nodes { ... on Issue { number state createdAt updatedAt repository { nameWithOwner } } } } } """.strip() def summarize_user(login: str, days: int, repo: str | None) -> dict[str, Any]: now = datetime.now(tz=UTC) start = (now - timedelta(days=days)).replace(microsecond=0) from_iso = start.isoformat().replace("+00:00", "Z") from_date = start.date().isoformat() to_iso = now.replace(microsecond=0).isoformat().replace("+00:00", "Z") _user_activity_log(f"user enrichment {login}: fetching profile") profile, _errors = _post_graphql( PROFILE_QUERY, {"login": login, "from": from_iso, "to": to_iso} ) viewer = profile["viewer"] user = profile["user"] if user is None: raise RuntimeError(f"unknown user {login!r}") contributions = user["contributionsCollection"] calendar = contributions["contributionCalendar"] repo_term = f" repo:{repo}" if repo else "" pr_query = f"author:{login} is:pr created:>={from_date}{repo_term} sort:created-desc" open_pr_query = ( f"author:{login} is:pr is:open created:>={from_date}{repo_term} sort:created-desc" ) issue_query = f"author:{login} is:issue created:>={from_date}{repo_term} sort:created-desc" pr_count, prs, inaccessible_pr_nodes = _search_all(pr_query, prs=True, label=f"{login} prs") open_pr_count, open_prs, inaccessible_open_pr_nodes = _search_all( open_pr_query, prs=True, label=f"{login} open-prs" ) issue_count, issues, inaccessible_issue_nodes = _search_all( issue_query, prs=False, label=f"{login} issues" ) merged_prs = [pr for pr in prs if pr.get("merged")] closed_unmerged_prs = [pr for pr in prs if pr.get("state") == "CLOSED" and not pr.get("merged")] still_open_prs = [pr for pr in prs if pr.get("state") == "OPEN"] open_pr_repos = sorted( {pr["repository"]["nameWithOwner"] for pr in open_prs if pr.get("repository")} ) pr_repos = sorted({pr["repository"]["nameWithOwner"] for pr in prs if pr.get("repository")}) issue_repos = sorted( {issue["repository"]["nameWithOwner"] for issue in issues if issue.get("repository")} ) starred = user.get("starredRepositories") or {} starred_nodes = [row for row in starred.get("nodes") or [] if isinstance(row, dict)] non_self_starred = [ row for row in starred_nodes if ((row.get("owner") or {}).get("login") or "").casefold() != login.casefold() ] recent_pr_repo_stars = [ int((pr.get("repository") or {}).get("stargazerCount") or 0) for pr in prs ] merged_pr_repo_stars = [ int((pr.get("repository") or {}).get("stargazerCount") or 0) for pr in merged_prs ] closed_unmerged_pr_repo_stars = [ int((pr.get("repository") or {}).get("stargazerCount") or 0) for pr in closed_unmerged_prs ] open_pr_repo_stars = [ int((pr.get("repository") or {}).get("stargazerCount") or 0) for pr in open_prs ] repo_owners = sorted( { repo_name.split("/", 1)[0] for repo_name in pr_repos + issue_repos + open_pr_repos if "/" in repo_name and repo_name.split("/", 1)[0].casefold() != login.casefold() } ) public_org_rows = _public_orgs(login) target_listed_org_rows = [ {"login": row["login"], "name": row.get("name")} for row in (user.get("organizations") or {}).get("nodes") or [] if isinstance(row, dict) and row.get("login") ] viewer_listed_org_rows = [ {"login": row["login"], "name": row.get("name")} for row in (viewer.get("organizations") or {}).get("nodes") or [] if isinstance(row, dict) and row.get("login") ] viewer_is_target = viewer.get("login", "").casefold() == login.casefold() auth_membership_rows = _authenticated_memberships() if viewer_is_target else [] direct_membership_checks = _check_viewer_org_membership(repo_owners) if viewer_is_target else [] public_org_logins = {row["login"] for row in public_org_rows} listed_org_logins = {row["login"] for row in target_listed_org_rows} auth_membership_logins = {row["login"] for row in auth_membership_rows} directly_confirmed_orgs = sorted( row["login"] for row in direct_membership_checks if row.get("viewer_is_a_member") ) listing_endpoints_partial = any( org not in public_org_logins | listed_org_logins | auth_membership_logins for org in directly_confirmed_orgs ) created_at = datetime.fromisoformat(user["createdAt"].replace("Z", "+00:00")) account_age_days = (now - created_at).days return { "login": user["login"], "name": user.get("name"), "repo_scope": repo, "window": {"days": days, "from": from_iso, "to": to_iso}, "account": {"created_at": user["createdAt"], "age_days": account_age_days}, "activity": { "visible_contributions_total": calendar["totalContributions"], **_contribution_calendar_summary(calendar["weeks"]), "authored_issues": contributions["totalIssueContributions"], "authored_pull_requests": contributions["totalPullRequestContributions"], "visible_authored_issue_count": len(issues), "visible_authored_pr_count": len(prs), "visible_open_pr_count": len(open_prs), "search_authored_issue_count": issue_count, "search_authored_pr_count": pr_count, "search_open_pr_count": open_pr_count, "inaccessible_issue_nodes": inaccessible_issue_nodes, "inaccessible_pr_nodes": inaccessible_pr_nodes, "inaccessible_open_pr_nodes": inaccessible_open_pr_nodes, "distinct_repos_with_authored_prs": len(pr_repos), "distinct_repos_with_authored_issues": len(issue_repos), "distinct_repos_with_open_prs": len(open_pr_repos), "open_pr_count": len(open_prs), "merged_pr_count": len(merged_prs), "closed_unmerged_pr_count": len(closed_unmerged_prs), "still_open_pr_count": len(still_open_prs), "merged_pr_rate": _rate(len(merged_prs), len(prs)), "closed_unmerged_pr_rate": _rate(len(closed_unmerged_prs), len(prs)), "still_open_pr_rate": _rate(len(still_open_prs), len(prs)), }, "stars": { "starred_repositories_total": starred.get("totalCount"), "visible_starred_repositories_returned": len(starred_nodes), "visible_non_self_starred_repositories": len(non_self_starred), "visible_non_self_starred_examples": [ row["nameWithOwner"] for row in non_self_starred[:10] ], "recent_pr_repo_star_distribution": _star_distribution(recent_pr_repo_stars), "recent_merged_pr_repo_star_distribution": _star_distribution(merged_pr_repo_stars), "recent_closed_unmerged_pr_repo_star_distribution": _star_distribution( closed_unmerged_pr_repo_stars ), "recent_open_pr_repo_star_distribution": _star_distribution(open_pr_repo_stars), }, "top_repositories": { "pull_requests": [ { "repo": row["repository"]["nameWithOwner"], "count": row["contributions"]["totalCount"], } for row in contributions["pullRequestContributionsByRepository"] ], "issues": [ { "repo": row["repository"]["nameWithOwner"], "count": row["contributions"]["totalCount"], } for row in contributions["issueContributionsByRepository"] ], "open_pr_repositories": open_pr_repos, }, "organization_membership": { "target_is_viewer": viewer_is_target, "public_orgs": public_org_rows, "graphql_target_listed_orgs": target_listed_org_rows, "graphql_viewer_listed_orgs": viewer_listed_org_rows, "authenticated_memberships": auth_membership_rows, "checked_recent_repo_owner_orgs": direct_membership_checks, "visibility": { "listing_endpoints_partial": listing_endpoints_partial, "public_org_count": len(public_org_rows), "graphql_target_listed_org_count": len(target_listed_org_rows), "authenticated_membership_count": len(auth_membership_rows), "directly_confirmed_membership_count": len(directly_confirmed_orgs), "directly_confirmed_memberships": directly_confirmed_orgs, }, }, } def _post_graphql( query: str, variables: dict[str, Any], *, allow_partial: bool = False, ) -> tuple[dict[str, Any], list[dict[str, Any]]]: token = resolve_github_token() if not token: raise RuntimeError("missing GITHUB_TOKEN/GRAPHQL_TOKEN/GH_TOKEN") body = json.dumps({"query": query, "variables": variables}).encode() request = urllib.request.Request( GRAPHQL_URL, data=body, headers={ "Authorization": f"bearer {token}", "User-Agent": "slop-farmer", "Content-Type": "application/json", "Accept": "application/json", }, method="POST", ) try: with urlopen_with_retry( request, timeout=120, log=_user_activity_log, label="GitHub GraphQL user activity", ) as response: payload = json.load(response) except urllib.error.HTTPError as exc: # pragma: no cover - live network only detail = exc.read().decode("utf-8", "replace") raise RuntimeError(f"graphql request failed: {exc.code} {detail}") from exc errors = payload.get("errors") or [] if errors and not allow_partial: raise RuntimeError(json.dumps(errors)) return payload["data"], errors def _search_all( query: str, *, prs: bool, label: str | None = None ) -> tuple[int, list[dict[str, Any]], int]: nodes: list[dict[str, Any]] = [] cursor: str | None = None issue_count = 0 inaccessible = 0 search_query = SEARCH_PRS_QUERY if prs else SEARCH_ISSUES_QUERY page = 0 while True: page += 1 data, errors = _post_graphql( search_query, {"query": query, "cursor": cursor}, allow_partial=True ) search = data["search"] issue_count = search["issueCount"] inaccessible += len(errors) nodes.extend(node for node in search["nodes"] if isinstance(node, dict)) if label and ( page == 1 or page % 5 == 0 or not search["pageInfo"]["hasNextPage"] or len(nodes) >= 1000 ): _user_activity_log( f"user enrichment {label}: page {page}, loaded {len(nodes)} visible rows, " f"search_count={issue_count}, inaccessible={inaccessible}" ) if not search["pageInfo"]["hasNextPage"] or len(nodes) >= 1000: break cursor = search["pageInfo"]["endCursor"] return issue_count, nodes, inaccessible def _public_orgs(login: str) -> list[dict[str, Any]]: payload = _get_json(f"https://api.github.com/users/{login}/orgs") if not isinstance(payload, list): return [] return [ {"login": row.get("login"), "name": row.get("name")} for row in payload if isinstance(row, dict) and row.get("login") ] def _authenticated_memberships() -> list[dict[str, Any]]: payload = _get_json("https://api.github.com/user/memberships/orgs") if not isinstance(payload, list): return [] rows = [] for row in payload: if not isinstance(row, dict): continue org = row.get("organization") or {} if not org.get("login"): continue rows.append( { "login": org.get("login"), "name": org.get("name"), "state": row.get("state"), "role": row.get("role"), } ) return rows def _check_viewer_org_membership(logins: list[str]) -> list[dict[str, Any]]: if not logins: return [] aliases = {f"org{i}": login for i, login in enumerate(logins)} fields = "\n".join( f'{alias}: organization(login: "{login}") {{ login name viewerIsAMember viewerCanAdminister }}' for alias, login in aliases.items() ) query = f"query ViewerOrgChecks {{ rateLimit {{ cost remaining resetAt }}\n{fields}\n}}" data, _errors = _post_graphql(query, {}) rows = [] for alias in aliases: org = data.get(alias) if isinstance(org, dict) and org.get("login"): rows.append( { "login": org["login"], "name": org.get("name"), "viewer_is_a_member": bool(org.get("viewerIsAMember")), "viewer_can_administer": bool(org.get("viewerCanAdminister")), } ) return rows def _get_json(url: str) -> Any: token = resolve_github_token() if not token: raise RuntimeError("missing GITHUB_TOKEN/GRAPHQL_TOKEN/GH_TOKEN") request = urllib.request.Request( url, headers={ "Authorization": f"bearer {token}", "User-Agent": "slop-farmer", "Accept": "application/vnd.github+json", "X-GitHub-Api-Version": "2022-11-28", }, ) try: with urlopen_with_retry( request, timeout=120, log=_user_activity_log, label=url, ) as response: return json.load(response) except urllib.error.HTTPError as exc: # pragma: no cover - live network only detail = exc.read().decode("utf-8", "replace") raise RuntimeError(f"rest request failed: {exc.code} {url} {detail}") from exc def _contribution_calendar_summary(weeks: list[dict[str, Any]]) -> dict[str, Any]: daily = [day for week in weeks for day in week["contributionDays"]] active_days = sum(1 for day in daily if day["contributionCount"] > 0) weekly = [sum(day["contributionCount"] for day in week["contributionDays"]) for week in weeks] return {"active_days": active_days, "weekly_totals": weekly} def _star_distribution(values: list[int]) -> dict[str, Any]: if not values: return { "count": 0, "min": None, "median": None, "max": None, "buckets": {"lt_100": 0, "100_to_999": 0, "1k_to_9k": 0, "10k_plus": 0}, } ordered = sorted(values) mid = len(ordered) // 2 median = ordered[mid] if len(ordered) % 2 else round((ordered[mid - 1] + ordered[mid]) / 2, 1) return { "count": len(values), "min": min(values), "median": median, "max": max(values), "buckets": { "lt_100": sum(1 for value in values if value < 100), "100_to_999": sum(1 for value in values if 100 <= value < 1000), "1k_to_9k": sum(1 for value in values if 1000 <= value < 10000), "10k_plus": sum(1 for value in values if value >= 10000), }, } def _rate(numerator: int, denominator: int) -> float | None: if denominator == 0: return None return round(numerator / denominator, 4) def _user_activity_log(message: str) -> None: stamp = datetime.now(tz=UTC).strftime("%H:%M:%SZ") print(f"[{stamp}] {message}", flush=True)