""" Stage C: 代码文件级统计(复用analysis.py的逻辑) 对前15000仓库进行代码文件分析 """ import os import json import sys from pathlib import Path from collections import defaultdict, Counter from tqdm import tqdm import statistics import math from multiprocessing import Pool, cpu_count import pandas as pd # 导入analysis.py的函数 sys.path.insert(0, str(Path(__file__).parent.parent)) from analysis import ( detect_language, count_comments, count_tokens, count_functions_and_parameters, analyze_code ) def _default_repo_stats(): """Factory function for defaultdict (must be top-level for pickle)""" return { 'total_files': 0, 'total_lines': 0, 'total_code_lines': 0, 'total_comment_lines': 0, 'total_tokens': 0, 'total_functions': 0, 'total_parameters': 0, 'languages': Counter(), 'file_sizes': [], } class CodeFileStats: def __init__(self, repos_dir, output_dir, top_n=15000, max_file_size_mb=2): self.repos_dir = Path(repos_dir) self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.top_n = top_n self.max_file_size_bytes = max_file_size_mb * 1024 * 1024 # 跳过目录 self.skip_dirs = { '.git', 'node_modules', 'vendor', 'dist', 'build', '__pycache__', '.pytest_cache', '.ipynb_checkpoints', 'venv', 'env', '.venv', 'target', '.idea', '.vscode', '.mypy_cache', '.tox' } # 代码文件扩展名(基于analysis.py) self.code_extensions = { '.py', '.java', '.c', '.h', '.hh', '.hpp', '.cpp', '.cc', '.cxx', '.c++', '.f', '.f90', '.f95', '.F', '.r', '.m', '.sh', '.bash', '.rs', '.go', '.ipynb' # Notebook单独处理 } self.file_stats = [] self.repo_stats = defaultdict(_default_repo_stats) def parse_notebook(self, file_path): """解析Jupyter Notebook,提取代码cells""" try: with open(file_path, 'r', encoding='utf-8', errors='replace') as f: nb = json.load(f) code_cells = [] for cell in nb.get('cells', []): if cell.get('cell_type') == 'code': source = cell.get('source', []) if isinstance(source, list): code = ''.join(source) else: code = str(source) if code.strip(): code_cells.append(code) return '\n'.join(code_cells) except: return None def analyze_file(self, file_path, repo_name): """分析单个代码文件""" try: # 检查文件大小 file_size = file_path.stat().st_size if file_size > self.max_file_size_bytes: return None # 读取文件 if file_path.suffix.lower() == '.ipynb': code = self.parse_notebook(file_path) if not code: return None lang = 'jupyter' else: try: with open(file_path, 'r', encoding='utf-8', errors='replace') as f: code = f.read() except: return None # 使用analysis.py的analyze_code函数 result = analyze_code(code, str(file_path)) result['repo_name'] = repo_name result['file_path'] = str(file_path.relative_to(self.repos_dir / repo_name)) result['file_size_bytes'] = file_size # 派生指标 if result['total_lines'] > 0: result['comment_ratio'] = result['comment_lines'] / result['total_lines'] else: result['comment_ratio'] = 0 if result['total_lines'] > 0: result['code_density'] = result['code_lines'] / result['total_lines'] else: result['code_density'] = 0 if result['code_lines'] > 0: result['avg_tokens_per_line'] = result['tokens'] / result['code_lines'] else: result['avg_tokens_per_line'] = 0 if result['functions'] > 0: result['avg_params_per_func'] = result['parameters'] / result['functions'] else: result['avg_params_per_func'] = 0 # 对于notebook,保持language为jupyter if file_path.suffix.lower() == '.ipynb': result['language'] = 'jupyter' return result except Exception as e: return None def scan_repo(self, repo_path): """扫描单个仓库的所有代码文件""" repo_name = repo_path.name repo_files = [] for root, dirs, files in os.walk(repo_path): # 跳过不需要的目录 dirs[:] = [d for d in dirs if d not in self.skip_dirs] for file in files: file_path = Path(root) / file ext = file_path.suffix.lower() # 只处理代码文件 if ext in self.code_extensions or ext == '': result = self.analyze_file(file_path, repo_name) if result: repo_files.append(result) return repo_files def scan_all_repos(self, num_workers=None): """扫描所有仓库(多进程优化版)""" if num_workers is None: num_workers = min(cpu_count(), 32) # 限制最大进程数,避免内存问题 # 获取所有仓库目录 all_repos = sorted([d for d in self.repos_dir.iterdir() if d.is_dir()]) selected_repos = all_repos[:self.top_n] print(f"Scanning {len(selected_repos)} repos for code files using {num_workers} workers...") # 使用较小的 chunksize 以便进度条能够实时更新 chunksize = 1 # 多进程处理(使用 imap_unordered 更快返回结果) with Pool(processes=num_workers) as pool: results = list(tqdm( pool.imap_unordered(self.scan_repo, selected_repos, chunksize=chunksize), total=len(selected_repos), desc="Scanning repos" )) # 扁平化结果 for repo_files in results: self.file_stats.extend(repo_files) print(f"Found {len(self.file_stats)} code files") def aggregate_repo_stats(self): """聚合仓库级统计""" for file_stat in self.file_stats: repo = file_stat['repo_name'] self.repo_stats[repo]['total_files'] += 1 self.repo_stats[repo]['total_lines'] += file_stat['total_lines'] self.repo_stats[repo]['total_code_lines'] += file_stat['code_lines'] self.repo_stats[repo]['total_comment_lines'] += file_stat['comment_lines'] self.repo_stats[repo]['total_tokens'] += file_stat['tokens'] self.repo_stats[repo]['total_functions'] += file_stat['functions'] self.repo_stats[repo]['total_parameters'] += file_stat['parameters'] self.repo_stats[repo]['languages'][file_stat['language']] += 1 self.repo_stats[repo]['file_sizes'].append(file_stat['file_size_bytes']) # 转换为可序列化格式 repo_stats_list = [] for repo, stats in self.repo_stats.items(): total_files = stats['total_files'] stats_dict = { 'repo_name': repo, 'full_name': repo.replace('___', '/'), 'total_files': total_files, 'total_lines': stats['total_lines'], 'total_code_lines': stats['total_code_lines'], 'total_comment_lines': stats['total_comment_lines'], 'total_tokens': stats['total_tokens'], 'total_functions': stats['total_functions'], 'total_parameters': stats['total_parameters'], 'language_count': len(stats['languages']), 'primary_language': stats['languages'].most_common(1)[0][0] if stats['languages'] else 'unknown', 'primary_language_files': stats['languages'].most_common(1)[0][1] if stats['languages'] else 0, } # 派生指标 if stats['total_lines'] > 0: stats_dict['comment_ratio'] = stats['total_comment_lines'] / stats['total_lines'] else: stats_dict['comment_ratio'] = 0 if stats['total_functions'] > 0: stats_dict['avg_func_length'] = stats['total_code_lines'] / stats['total_functions'] stats_dict['avg_params_per_func'] = stats['total_parameters'] / stats['total_functions'] else: stats_dict['avg_func_length'] = 0 stats_dict['avg_params_per_func'] = 0 # 语言多样性(熵) if stats['languages']: total_lang_files = sum(stats['languages'].values()) entropy = 0 for count in stats['languages'].values(): p = count / total_lang_files if p > 0: entropy -= p * math.log2(p) stats_dict['language_entropy'] = entropy else: stats_dict['language_entropy'] = 0 # 文件大小统计 if stats['file_sizes']: stats_dict['avg_file_size_kb'] = statistics.mean(stats['file_sizes']) / 1024 stats_dict['max_file_size_mb'] = max(stats['file_sizes']) / (1024 * 1024) # 主语言占比 if stats['languages']: primary_lang_count = stats['languages'].most_common(1)[0][1] stats_dict['primary_language_ratio'] = primary_lang_count / total_files else: stats_dict['primary_language_ratio'] = 0 repo_stats_list.append(stats_dict) return repo_stats_list def save_results(self): """保存结果""" # 保存文件级统计(抽样,只保存前10000或按大小排序的异常值) file_df = pd.DataFrame(self.file_stats) if len(file_df) > 10000: # 保存最大和最小的文件 file_df_large = file_df.nlargest(5000, 'file_size_bytes') file_df_small = file_df.nsmallest(5000, 'file_size_bytes') file_df_sample = pd.concat([file_df_large, file_df_small]).drop_duplicates() else: file_df_sample = file_df file_df_sample.to_csv(self.output_dir / 'file_level_metrics_sampled.csv', index=False) # 保存仓库级统计 repo_stats_list = self.aggregate_repo_stats() repo_df = pd.DataFrame(repo_stats_list) repo_df.to_csv(self.output_dir / 'repo_level_metrics_top15000.csv', index=False) # 汇总统计 summary = { 'total_files': len(self.file_stats), 'total_repos': len(self.repo_stats), 'avg_files_per_repo': len(self.file_stats) / len(self.repo_stats) if self.repo_stats else 0, } # 按语言统计 lang_counter = Counter(f['language'] for f in self.file_stats) summary['files_by_language'] = dict(lang_counter.most_common(20)) if repo_stats_list: summary['repo_stats'] = { 'avg_total_lines': statistics.mean([r['total_lines'] for r in repo_stats_list]), 'avg_code_lines': statistics.mean([r['total_code_lines'] for r in repo_stats_list]), 'avg_comment_lines': statistics.mean([r['total_comment_lines'] for r in repo_stats_list]), 'avg_tokens': statistics.mean([r['total_tokens'] for r in repo_stats_list]), 'avg_functions': statistics.mean([r['total_functions'] for r in repo_stats_list]), } with open(self.output_dir / 'code_stats_summary.json', 'w', encoding='utf-8') as f: json.dump(summary, f, indent=2, ensure_ascii=False) def run(self, num_workers=None): """执行完整流程""" print("Stage C: Analyzing code files...") self.scan_all_repos(num_workers=num_workers) print("Aggregating repo-level stats...") print("Saving results...") self.save_results() print(f"Code file stats complete! Results saved to {self.output_dir}") if __name__ == "__main__": repos_dir = "/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered" output_dir = "/home/weifengsun/tangou1/domain_code/src/workdir/reporting/code_stats" stats = CodeFileStats(repos_dir, output_dir, top_n=15000) stats.run()