dataset-builder / data1 /reporting /code_file_stats.py
SunDou's picture
Upload data1/reporting/code_file_stats.py with huggingface_hub
ce13bef verified
"""
Stage C: 代码文件级统计(复用analysis.py的逻辑)
对前15000仓库进行代码文件分析
"""
import os
import json
import sys
from pathlib import Path
from collections import defaultdict, Counter
from tqdm import tqdm
import statistics
import math
from multiprocessing import Pool, cpu_count
import pandas as pd
# 导入analysis.py的函数
sys.path.insert(0, str(Path(__file__).parent.parent))
from analysis import (
detect_language, count_comments, count_tokens,
count_functions_and_parameters, analyze_code
)
def _default_repo_stats():
"""Factory function for defaultdict (must be top-level for pickle)"""
return {
'total_files': 0,
'total_lines': 0,
'total_code_lines': 0,
'total_comment_lines': 0,
'total_tokens': 0,
'total_functions': 0,
'total_parameters': 0,
'languages': Counter(),
'file_sizes': [],
}
class CodeFileStats:
def __init__(self, repos_dir, output_dir, top_n=15000, max_file_size_mb=2):
self.repos_dir = Path(repos_dir)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.top_n = top_n
self.max_file_size_bytes = max_file_size_mb * 1024 * 1024
# 跳过目录
self.skip_dirs = {
'.git', 'node_modules', 'vendor', 'dist', 'build', '__pycache__',
'.pytest_cache', '.ipynb_checkpoints', 'venv', 'env', '.venv',
'target', '.idea', '.vscode', '.mypy_cache', '.tox'
}
# 代码文件扩展名(基于analysis.py)
self.code_extensions = {
'.py', '.java', '.c', '.h', '.hh', '.hpp', '.cpp', '.cc', '.cxx', '.c++',
'.f', '.f90', '.f95', '.F', '.r', '.m', '.sh', '.bash', '.rs', '.go',
'.ipynb' # Notebook单独处理
}
self.file_stats = []
self.repo_stats = defaultdict(_default_repo_stats)
def parse_notebook(self, file_path):
"""解析Jupyter Notebook,提取代码cells"""
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
nb = json.load(f)
code_cells = []
for cell in nb.get('cells', []):
if cell.get('cell_type') == 'code':
source = cell.get('source', [])
if isinstance(source, list):
code = ''.join(source)
else:
code = str(source)
if code.strip():
code_cells.append(code)
return '\n'.join(code_cells)
except:
return None
def analyze_file(self, file_path, repo_name):
"""分析单个代码文件"""
try:
# 检查文件大小
file_size = file_path.stat().st_size
if file_size > self.max_file_size_bytes:
return None
# 读取文件
if file_path.suffix.lower() == '.ipynb':
code = self.parse_notebook(file_path)
if not code:
return None
lang = 'jupyter'
else:
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
code = f.read()
except:
return None
# 使用analysis.py的analyze_code函数
result = analyze_code(code, str(file_path))
result['repo_name'] = repo_name
result['file_path'] = str(file_path.relative_to(self.repos_dir / repo_name))
result['file_size_bytes'] = file_size
# 派生指标
if result['total_lines'] > 0:
result['comment_ratio'] = result['comment_lines'] / result['total_lines']
else:
result['comment_ratio'] = 0
if result['total_lines'] > 0:
result['code_density'] = result['code_lines'] / result['total_lines']
else:
result['code_density'] = 0
if result['code_lines'] > 0:
result['avg_tokens_per_line'] = result['tokens'] / result['code_lines']
else:
result['avg_tokens_per_line'] = 0
if result['functions'] > 0:
result['avg_params_per_func'] = result['parameters'] / result['functions']
else:
result['avg_params_per_func'] = 0
# 对于notebook,保持language为jupyter
if file_path.suffix.lower() == '.ipynb':
result['language'] = 'jupyter'
return result
except Exception as e:
return None
def scan_repo(self, repo_path):
"""扫描单个仓库的所有代码文件"""
repo_name = repo_path.name
repo_files = []
for root, dirs, files in os.walk(repo_path):
# 跳过不需要的目录
dirs[:] = [d for d in dirs if d not in self.skip_dirs]
for file in files:
file_path = Path(root) / file
ext = file_path.suffix.lower()
# 只处理代码文件
if ext in self.code_extensions or ext == '':
result = self.analyze_file(file_path, repo_name)
if result:
repo_files.append(result)
return repo_files
def scan_all_repos(self, num_workers=None):
"""扫描所有仓库(多进程优化版)"""
if num_workers is None:
num_workers = min(cpu_count(), 32) # 限制最大进程数,避免内存问题
# 获取所有仓库目录
all_repos = sorted([d for d in self.repos_dir.iterdir() if d.is_dir()])
selected_repos = all_repos[:self.top_n]
print(f"Scanning {len(selected_repos)} repos for code files using {num_workers} workers...")
# 使用较小的 chunksize 以便进度条能够实时更新
chunksize = 1
# 多进程处理(使用 imap_unordered 更快返回结果)
with Pool(processes=num_workers) as pool:
results = list(tqdm(
pool.imap_unordered(self.scan_repo, selected_repos, chunksize=chunksize),
total=len(selected_repos),
desc="Scanning repos"
))
# 扁平化结果
for repo_files in results:
self.file_stats.extend(repo_files)
print(f"Found {len(self.file_stats)} code files")
def aggregate_repo_stats(self):
"""聚合仓库级统计"""
for file_stat in self.file_stats:
repo = file_stat['repo_name']
self.repo_stats[repo]['total_files'] += 1
self.repo_stats[repo]['total_lines'] += file_stat['total_lines']
self.repo_stats[repo]['total_code_lines'] += file_stat['code_lines']
self.repo_stats[repo]['total_comment_lines'] += file_stat['comment_lines']
self.repo_stats[repo]['total_tokens'] += file_stat['tokens']
self.repo_stats[repo]['total_functions'] += file_stat['functions']
self.repo_stats[repo]['total_parameters'] += file_stat['parameters']
self.repo_stats[repo]['languages'][file_stat['language']] += 1
self.repo_stats[repo]['file_sizes'].append(file_stat['file_size_bytes'])
# 转换为可序列化格式
repo_stats_list = []
for repo, stats in self.repo_stats.items():
total_files = stats['total_files']
stats_dict = {
'repo_name': repo,
'full_name': repo.replace('___', '/'),
'total_files': total_files,
'total_lines': stats['total_lines'],
'total_code_lines': stats['total_code_lines'],
'total_comment_lines': stats['total_comment_lines'],
'total_tokens': stats['total_tokens'],
'total_functions': stats['total_functions'],
'total_parameters': stats['total_parameters'],
'language_count': len(stats['languages']),
'primary_language': stats['languages'].most_common(1)[0][0] if stats['languages'] else 'unknown',
'primary_language_files': stats['languages'].most_common(1)[0][1] if stats['languages'] else 0,
}
# 派生指标
if stats['total_lines'] > 0:
stats_dict['comment_ratio'] = stats['total_comment_lines'] / stats['total_lines']
else:
stats_dict['comment_ratio'] = 0
if stats['total_functions'] > 0:
stats_dict['avg_func_length'] = stats['total_code_lines'] / stats['total_functions']
stats_dict['avg_params_per_func'] = stats['total_parameters'] / stats['total_functions']
else:
stats_dict['avg_func_length'] = 0
stats_dict['avg_params_per_func'] = 0
# 语言多样性(熵)
if stats['languages']:
total_lang_files = sum(stats['languages'].values())
entropy = 0
for count in stats['languages'].values():
p = count / total_lang_files
if p > 0:
entropy -= p * math.log2(p)
stats_dict['language_entropy'] = entropy
else:
stats_dict['language_entropy'] = 0
# 文件大小统计
if stats['file_sizes']:
stats_dict['avg_file_size_kb'] = statistics.mean(stats['file_sizes']) / 1024
stats_dict['max_file_size_mb'] = max(stats['file_sizes']) / (1024 * 1024)
# 主语言占比
if stats['languages']:
primary_lang_count = stats['languages'].most_common(1)[0][1]
stats_dict['primary_language_ratio'] = primary_lang_count / total_files
else:
stats_dict['primary_language_ratio'] = 0
repo_stats_list.append(stats_dict)
return repo_stats_list
def save_results(self):
"""保存结果"""
# 保存文件级统计(抽样,只保存前10000或按大小排序的异常值)
file_df = pd.DataFrame(self.file_stats)
if len(file_df) > 10000:
# 保存最大和最小的文件
file_df_large = file_df.nlargest(5000, 'file_size_bytes')
file_df_small = file_df.nsmallest(5000, 'file_size_bytes')
file_df_sample = pd.concat([file_df_large, file_df_small]).drop_duplicates()
else:
file_df_sample = file_df
file_df_sample.to_csv(self.output_dir / 'file_level_metrics_sampled.csv', index=False)
# 保存仓库级统计
repo_stats_list = self.aggregate_repo_stats()
repo_df = pd.DataFrame(repo_stats_list)
repo_df.to_csv(self.output_dir / 'repo_level_metrics_top15000.csv', index=False)
# 汇总统计
summary = {
'total_files': len(self.file_stats),
'total_repos': len(self.repo_stats),
'avg_files_per_repo': len(self.file_stats) / len(self.repo_stats) if self.repo_stats else 0,
}
# 按语言统计
lang_counter = Counter(f['language'] for f in self.file_stats)
summary['files_by_language'] = dict(lang_counter.most_common(20))
if repo_stats_list:
summary['repo_stats'] = {
'avg_total_lines': statistics.mean([r['total_lines'] for r in repo_stats_list]),
'avg_code_lines': statistics.mean([r['total_code_lines'] for r in repo_stats_list]),
'avg_comment_lines': statistics.mean([r['total_comment_lines'] for r in repo_stats_list]),
'avg_tokens': statistics.mean([r['total_tokens'] for r in repo_stats_list]),
'avg_functions': statistics.mean([r['total_functions'] for r in repo_stats_list]),
}
with open(self.output_dir / 'code_stats_summary.json', 'w', encoding='utf-8') as f:
json.dump(summary, f, indent=2, ensure_ascii=False)
def run(self, num_workers=None):
"""执行完整流程"""
print("Stage C: Analyzing code files...")
self.scan_all_repos(num_workers=num_workers)
print("Aggregating repo-level stats...")
print("Saving results...")
self.save_results()
print(f"Code file stats complete! Results saved to {self.output_dir}")
if __name__ == "__main__":
repos_dir = "/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered"
output_dir = "/home/weifengsun/tangou1/domain_code/src/workdir/reporting/code_stats"
stats = CodeFileStats(repos_dir, output_dir, top_n=15000)
stats.run()