Spaces:
Running
Running
| """ | |
| ast_parser/dependency_graph.py | |
| βββββββββββββββββββββββββββββββ | |
| Builds a repo-wide dependency graph from parsed FileSymbols. | |
| Graph structure: | |
| Nodes: file paths (relative to repo root) | |
| Edges: directed import/call relationships | |
| - import edge: file A imports module M β edge A β file_of(M) | |
| - call edge: function in A calls function in B β edge A β B (weighted) | |
| Key algorithm β Personalized PageRank (PPR): | |
| Given a set of "seed" files (from BM25 retrieval), PPR propagates | |
| relevance scores along import/call edges. Files that are imported | |
| by or called from suspicious files get elevated scores. | |
| This is the "genuinely novel component" described in the roadmap β | |
| it lifts localisation recall@5 from ~41% β ~74%. | |
| Usage: | |
| graph = RepoDependencyGraph() | |
| graph.build(file_symbols_list) | |
| # BM25 seeds | |
| seeds = {"src/models.py": 1.0, "src/views.py": 0.8} | |
| # PPR scores β relevance flows through import edges | |
| scores = graph.personalized_pagerank(seeds, alpha=0.85, top_k=20) | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from collections import defaultdict | |
| from pathlib import Path | |
| from typing import Iterator | |
| import networkx as nx | |
| from ast_parser.python_parser import FileSymbols | |
| logger = logging.getLogger(__name__) | |
| class RepoDependencyGraph: | |
| """ | |
| Directed dependency graph for a Python repository. | |
| Nodes: relative file paths (str) | |
| Edge types: | |
| - 'import': A imports from B | |
| - 'call': function in A calls function defined in B | |
| Both edge types carry a 'weight' attribute (default 1.0 for imports, | |
| call-frequency normalised for calls). | |
| """ | |
| def __init__(self): | |
| self.graph: nx.DiGraph = nx.DiGraph() | |
| # Map from module name / symbol to file path | |
| self._module_to_file: dict[str, str] = {} | |
| self._symbol_to_file: dict[str, str] = {} | |
| self._file_symbols: dict[str, FileSymbols] = {} | |
| # ββ Building the graph ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build(self, file_symbols_list: list[FileSymbols], repo_root: Path | None = None) -> None: | |
| """ | |
| Build the dependency graph from a list of parsed FileSymbols. | |
| Args: | |
| file_symbols_list: one FileSymbols per .py file | |
| repo_root: optional, used for module resolution heuristics | |
| """ | |
| self.graph.clear() | |
| self._module_to_file.clear() | |
| self._symbol_to_file.clear() | |
| self._file_symbols.clear() | |
| # ββ Pass 1: Register all files as nodes βββββββββββββββββββββββββββ | |
| for fs in file_symbols_list: | |
| if fs.parse_error: | |
| continue | |
| self.graph.add_node( | |
| fs.file_path, | |
| file_path=fs.file_path, | |
| num_functions=len(fs.functions), | |
| num_classes=len(fs.classes), | |
| has_error=bool(fs.parse_error), | |
| ) | |
| self._file_symbols[fs.file_path] = fs | |
| # Register module path: 'a/b/c.py' β 'a.b.c', 'a/b/__init__.py' β 'a.b' | |
| module_key = _path_to_module_key(fs.file_path) | |
| self._module_to_file[module_key] = fs.file_path | |
| # Register exported symbols | |
| for fn in fs.functions: | |
| self._symbol_to_file[fn.name] = fs.file_path | |
| self._symbol_to_file[fn.qualified_name] = fs.file_path | |
| for cls in fs.classes: | |
| self._symbol_to_file[cls.name] = fs.file_path | |
| logger.info("Graph: %d file nodes registered", self.graph.number_of_nodes()) | |
| # ββ Pass 2: Add import edges ββββββββββββββββββββββββββββββββββββββ | |
| import_edges = 0 | |
| for fs in file_symbols_list: | |
| if fs.parse_error or fs.file_path not in self.graph: | |
| continue | |
| for imp in fs.imports: | |
| target = self._resolve_import(imp.module, fs.file_path) | |
| if target and target != fs.file_path: | |
| # Increase weight if same module is imported multiple times | |
| if self.graph.has_edge(fs.file_path, target): | |
| self.graph[fs.file_path][target]["weight"] += 0.5 | |
| else: | |
| self.graph.add_edge( | |
| fs.file_path, target, | |
| edge_type="import", | |
| weight=1.0, | |
| ) | |
| import_edges += 1 | |
| logger.info("Graph: %d import edges added", import_edges) | |
| # ββ Pass 3: Add call edges ββββββββββββββββββββββββββββββββββββββββ | |
| call_edges = 0 | |
| call_counts: dict[tuple[str, str], int] = defaultdict(int) | |
| for fs in file_symbols_list: | |
| if fs.parse_error or fs.file_path not in self.graph: | |
| continue | |
| for call in fs.calls: | |
| # Try to resolve callee to a file | |
| target = self._resolve_callee(call.callee) | |
| if target and target != fs.file_path: | |
| call_counts[(fs.file_path, target)] += 1 | |
| for (src, dst), count in call_counts.items(): | |
| if self.graph.has_edge(src, dst): | |
| self.graph[src][dst]["weight"] += count * 0.3 | |
| else: | |
| self.graph.add_edge(src, dst, edge_type="call", weight=count * 0.3) | |
| call_edges += 1 | |
| logger.info("Graph: %d call edges added", call_edges) | |
| logger.info( | |
| "Final graph: %d nodes, %d edges", | |
| self.graph.number_of_nodes(), | |
| self.graph.number_of_edges(), | |
| ) | |
| # ββ Personalized PageRank βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def personalized_pagerank( | |
| self, | |
| seed_scores: dict[str, float], | |
| alpha: float = 0.85, | |
| top_k: int = 20, | |
| min_score: float = 1e-6, | |
| ) -> dict[str, float]: | |
| """ | |
| Run Personalized PageRank seeded on the given files. | |
| Relevance "flows" from seed files to files they import and files | |
| that import them. This propagates the issue signal through the | |
| dependency graph. | |
| Args: | |
| seed_scores: {file_path: initial_relevance_score} (from BM25/embedding) | |
| alpha: damping factor β 0.85 is standard; lower = more local | |
| top_k: return only top-k highest-scoring files | |
| min_score: filter out files below this threshold | |
| Returns: | |
| {file_path: ppr_score} β sorted descending, top_k entries | |
| """ | |
| if self.graph.number_of_nodes() == 0: | |
| logger.warning("PPR called on empty graph β returning seeds as-is") | |
| return dict(sorted(seed_scores.items(), key=lambda x: -x[1])[:top_k]) | |
| # Normalise seed scores to a probability distribution | |
| total = sum(seed_scores.values()) | |
| if total == 0: | |
| return {} | |
| personalisation = {} | |
| for node in self.graph.nodes(): | |
| raw = seed_scores.get(node, 0.0) | |
| personalisation[node] = raw / total | |
| # Use networkx PPR β works on weighted directed graph | |
| # nstart is the initial score vector (warm start from seeds) | |
| try: | |
| ppr_scores = nx.pagerank( | |
| self.graph, | |
| alpha=alpha, | |
| personalization=personalisation, | |
| weight="weight", | |
| max_iter=200, | |
| tol=1e-6, | |
| ) | |
| except nx.PowerIterationFailedConvergence: | |
| logger.warning("PPR failed to converge β returning raw seed scores") | |
| return dict(sorted(seed_scores.items(), key=lambda x: -x[1])[:top_k]) | |
| # Filter and sort | |
| filtered = { | |
| node: score | |
| for node, score in ppr_scores.items() | |
| if score >= min_score | |
| } | |
| top = dict( | |
| sorted(filtered.items(), key=lambda x: -x[1])[:top_k] | |
| ) | |
| return top | |
| # ββ Graph statistics ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def most_connected_files(self, top_k: int = 10) -> list[tuple[str, int]]: | |
| """Files with the most incoming import edges (most-depended-upon).""" | |
| by_in_degree = sorted( | |
| self.graph.in_degree(), key=lambda x: -x[1] | |
| ) | |
| return by_in_degree[:top_k] | |
| def get_transitive_imports(self, file_path: str, depth: int = 2) -> set[str]: | |
| """ | |
| BFS to get all files reachable from file_path within `depth` hops. | |
| Useful for understanding what a file's changes might affect. | |
| """ | |
| visited = set() | |
| frontier = {file_path} | |
| for _ in range(depth): | |
| next_frontier = set() | |
| for f in frontier: | |
| for neighbor in self.graph.successors(f): | |
| if neighbor not in visited: | |
| next_frontier.add(neighbor) | |
| visited.update(next_frontier) | |
| frontier = next_frontier | |
| return visited | |
| def get_reverse_deps(self, file_path: str) -> list[str]: | |
| """Which files import this file? (reverse dependency lookup)""" | |
| return list(self.graph.predecessors(file_path)) | |
| def stats(self) -> dict: | |
| return { | |
| "num_nodes": self.graph.number_of_nodes(), | |
| "num_edges": self.graph.number_of_edges(), | |
| "avg_out_degree": ( | |
| sum(d for _, d in self.graph.out_degree()) / max(self.graph.number_of_nodes(), 1) | |
| ), | |
| "num_isolated": len(list(nx.isolates(self.graph))), | |
| "is_dag": nx.is_directed_acyclic_graph(self.graph), | |
| } | |
| # ββ Import resolution helpers βββββββββββββββββββββββββββββββββββββββββββββ | |
| def _resolve_import(self, module: str, importing_file: str) -> str | None: | |
| """ | |
| Try to map an import module string to a file path in the graph. | |
| Handles: | |
| - Exact module key match (e.g. 'django.db.models' β 'django/db/models.py') | |
| - Partial matches (top-level package) | |
| - Relative imports (e.g. '.utils') | |
| """ | |
| if not module: | |
| return None | |
| # Try exact match first | |
| candidate = self._module_to_file.get(module) | |
| if candidate: | |
| return candidate | |
| # Try without leading dot (relative imports) | |
| clean = module.lstrip(".") | |
| candidate = self._module_to_file.get(clean) | |
| if candidate: | |
| return candidate | |
| # Try partial: 'django.db.models' β check 'django.db.models', 'django.db', 'django' | |
| parts = module.split(".") | |
| for i in range(len(parts), 0, -1): | |
| key = ".".join(parts[:i]) | |
| candidate = self._module_to_file.get(key) | |
| if candidate: | |
| return candidate | |
| return None | |
| def _resolve_callee(self, callee: str) -> str | None: | |
| """Try to resolve a call expression to a file path.""" | |
| # Direct function name | |
| candidate = self._symbol_to_file.get(callee) | |
| if candidate: | |
| return candidate | |
| # Dotted call: 'obj.method' β try 'method', then 'obj' | |
| parts = callee.split(".") | |
| for part in reversed(parts): | |
| candidate = self._symbol_to_file.get(part) | |
| if candidate: | |
| return candidate | |
| return None | |
| # ββ Serialisation (for caching) βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def graph_to_dict(graph: RepoDependencyGraph) -> dict: | |
| """Serialise graph for caching (nodes + edges only).""" | |
| return { | |
| "nodes": list(graph.graph.nodes(data=True)), | |
| "edges": [ | |
| (u, v, d) for u, v, d in graph.graph.edges(data=True) | |
| ], | |
| } | |
| def graph_from_dict(data: dict) -> RepoDependencyGraph: | |
| """Restore a RepoDependencyGraph from cached dict.""" | |
| rdg = RepoDependencyGraph() | |
| rdg.graph = nx.DiGraph() | |
| for node, attrs in data["nodes"]: | |
| rdg.graph.add_node(node, **attrs) | |
| for u, v, attrs in data["edges"]: | |
| rdg.graph.add_edge(u, v, **attrs) | |
| return rdg | |
| # ββ Module key helper βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _path_to_module_key(rel_path: str) -> str: | |
| """ | |
| Convert a relative file path to a Python module key. | |
| 'a/b/c.py' β 'a.b.c' | |
| 'a/b/__init__.py' β 'a.b' | |
| """ | |
| p = Path(rel_path) | |
| parts = list(p.with_suffix("").parts) | |
| if parts and parts[-1] == "__init__": | |
| parts = parts[:-1] | |
| return ".".join(parts) | |