|
|
import csv |
|
|
import re |
|
|
import tokenize |
|
|
from io import StringIO |
|
|
import os |
|
|
from tqdm import tqdm |
|
|
import json |
|
|
import sys |
|
|
from functools import lru_cache |
|
|
|
|
|
csv.field_size_limit(sys.maxsize) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_LINE_COMMENT_PATTERNS = { |
|
|
"python": re.compile(r"#(.*)$"), |
|
|
"shell": re.compile(r"#(.*)$"), |
|
|
"r": re.compile(r"#(.*)$"), |
|
|
"matlab": re.compile(r"%(.*)$"), |
|
|
"fortran": re.compile(r"!(.*)$"), |
|
|
"c/c++": re.compile(r"//(.*)$"), |
|
|
"java": re.compile(r"//(.*)$"), |
|
|
"go": re.compile(r"//(.*)$"), |
|
|
"rust": re.compile(r"//(.*)$"), |
|
|
} |
|
|
|
|
|
|
|
|
_BLOCK_COMMENT_PATTERNS = { |
|
|
"python": re.compile(r'("""[\s\S]*?"""|\'\'\'[\s\S]*?\'\'\')'), |
|
|
"c/c++": re.compile(r"/\*([\s\S]*?)\*/"), |
|
|
"java": re.compile(r"/\*([\s\S]*?)\*/"), |
|
|
"rust": re.compile(r"/\*([\s\S]*?)\*/"), |
|
|
"go": re.compile(r"/\*([\s\S]*?)\*/"), |
|
|
"matlab": re.compile(r"%\{([\s\S]*?)%\}"), |
|
|
} |
|
|
|
|
|
|
|
|
_FUNCTION_PATTERNS = { |
|
|
"python": re.compile(r"^[ \t]*def\s+(\w+)\s*\(([^)]*)\)", re.MULTILINE), |
|
|
"java": re.compile(r""" |
|
|
(?:public|protected|private|static|final|native|synchronized|abstract|\s)* |
|
|
\s* |
|
|
(?:[\w\<\>\[\],\s]+) |
|
|
\s+ |
|
|
(\w+) |
|
|
\s*\(([^)]*)\) |
|
|
(?:\s*throws\s+[\w,\s]+)? |
|
|
\s*\{ |
|
|
""", re.MULTILINE | re.VERBOSE), |
|
|
"c/c++": re.compile(r""" |
|
|
^[ \t]* |
|
|
(?!.*typedef) |
|
|
(?!.*\#) |
|
|
(?:[\w\*\s&]+) |
|
|
\b(\w+)\s* |
|
|
\(([^)]*)\) |
|
|
\s*(?:const)? |
|
|
\s*(?:override)? |
|
|
\s*(?:noexcept)? |
|
|
\s*\{ |
|
|
""", re.MULTILINE | re.VERBOSE), |
|
|
"go": re.compile(r"\bfunc\s+(?:\([^)]+\)\s*)?(\w+)\s*\(([^)]*)\)", re.MULTILINE), |
|
|
"rust": re.compile(r"\b(?:pub\s+)?(?:async\s+)?fn\s+(\w+)\s*(?:<[^>]*>)?\s*\(([^)]*)\)", re.MULTILINE), |
|
|
"r": re.compile(r"(\w+)\s*(?:<-|=)\s*function\s*\(([^)]*)\)", re.MULTILINE), |
|
|
"matlab": re.compile(r"^[ \t]*function\s+(?:(?:\[?[\w,\s]*\]?\s*=\s*)?(\w+)|(\w+))\s*\(([^)]*)\)", re.MULTILINE), |
|
|
"shell": re.compile(r"^[ \t]*(?:function\s+)?(\w+)\s*\(\)\s*\{", re.MULTILINE), |
|
|
"fortran": re.compile(r""" |
|
|
(?i) |
|
|
^[ \t]* |
|
|
(?:recursive\s+)? |
|
|
(?:pure\s+)? |
|
|
(?:elemental\s+)? |
|
|
(?:[\w\*]+(?:\s*\([^)]*\))?\s+)? |
|
|
(function|subroutine)\s+ |
|
|
(\w+)\s* |
|
|
\(([^)]*)\) |
|
|
""", re.MULTILINE | re.VERBOSE), |
|
|
} |
|
|
|
|
|
|
|
|
_REMOVE_COMMENT_PATTERNS = { |
|
|
"python_line": re.compile(r'#.*$', re.MULTILINE), |
|
|
"python_triple_dq": re.compile(r'"""[\s\S]*?"""'), |
|
|
"python_triple_sq": re.compile(r"'''[\s\S]*?'''"), |
|
|
"c_line": re.compile(r'//.*$', re.MULTILINE), |
|
|
"c_block": re.compile(r'/\*[\s\S]*?\*/'), |
|
|
"shell_line": re.compile(r'#.*$', re.MULTILINE), |
|
|
"matlab_line": re.compile(r'%.*$', re.MULTILINE), |
|
|
"matlab_block": re.compile(r'%\{[\s\S]*?%\}'), |
|
|
"fortran_line": re.compile(r'!.*$', re.MULTILINE), |
|
|
} |
|
|
|
|
|
def detect_language(file_path: str): |
|
|
"""仅根据文件后缀判断语言""" |
|
|
|
|
|
ext_map = { |
|
|
".py": "python", |
|
|
|
|
|
".java": "java", |
|
|
|
|
|
".c": "c/c++", |
|
|
".h": "c/c++", |
|
|
".hh": "c/c++", |
|
|
".hpp": "c/c++", |
|
|
".cpp": "c/c++", |
|
|
".cc": "c/c++", |
|
|
".cxx": "c/c++", |
|
|
".c++": "c/c++", |
|
|
|
|
|
".F": "fortran", |
|
|
".f90": "fortran", |
|
|
".f": "fortran", |
|
|
".f95": "fortran", |
|
|
|
|
|
".r": "r", |
|
|
|
|
|
".m": "matlab", |
|
|
|
|
|
".sh": "shell", |
|
|
".bash": "shell", |
|
|
|
|
|
".rs": "rust", |
|
|
".go": "go", |
|
|
} |
|
|
|
|
|
ext = os.path.splitext(file_path)[1].lower() |
|
|
ext = ext.strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return ext_map.get(ext, ext) |
|
|
|
|
|
|
|
|
def count_comments(code: str, lang: str): |
|
|
"""统计注释行数与注释 token(支持 Python/Java/C++/Fortran/Matlab/R/Shell/Rust/Go/Jupyter) |
|
|
|
|
|
使用预编译的正则表达式以提高性能。 |
|
|
""" |
|
|
|
|
|
|
|
|
if lang == "jupyter": |
|
|
lang = "python" |
|
|
|
|
|
comment_lines = 0 |
|
|
comment_tokens = [] |
|
|
lines = code.splitlines() |
|
|
|
|
|
|
|
|
block_comment_line_indices = set() |
|
|
|
|
|
|
|
|
if lang in _BLOCK_COMMENT_PATTERNS: |
|
|
patt = _BLOCK_COMMENT_PATTERNS[lang] |
|
|
|
|
|
if lang == "python": |
|
|
|
|
|
for match in patt.finditer(code): |
|
|
start_pos = match.start() |
|
|
end_pos = match.end() |
|
|
|
|
|
|
|
|
start_line = code[:start_pos].count('\n') |
|
|
end_line = code[:end_pos].count('\n') |
|
|
|
|
|
|
|
|
prefix = code[max(0, start_pos-20):start_pos].strip() |
|
|
if not prefix.endswith('='): |
|
|
for line_idx in range(start_line, end_line + 1): |
|
|
block_comment_line_indices.add(line_idx) |
|
|
|
|
|
block_content = match.group(1) |
|
|
if block_content.startswith('"""'): |
|
|
block_content = block_content[3:-3] |
|
|
else: |
|
|
block_content = block_content[3:-3] |
|
|
|
|
|
for b in block_content.splitlines(): |
|
|
comment_lines += 1 |
|
|
if b.strip(): |
|
|
comment_tokens.extend(b.strip().split()) |
|
|
else: |
|
|
for match in patt.finditer(code): |
|
|
start_pos = match.start() |
|
|
end_pos = match.end() |
|
|
|
|
|
start_line = code[:start_pos].count('\n') |
|
|
end_line = code[:end_pos].count('\n') |
|
|
|
|
|
for line_idx in range(start_line, end_line + 1): |
|
|
block_comment_line_indices.add(line_idx) |
|
|
|
|
|
block_content = match.group(1) if match.lastindex else match.group(0) |
|
|
for b in block_content.splitlines(): |
|
|
comment_lines += 1 |
|
|
if b.strip(): |
|
|
comment_tokens.extend(b.strip().split()) |
|
|
|
|
|
|
|
|
if lang in _LINE_COMMENT_PATTERNS: |
|
|
patt = _LINE_COMMENT_PATTERNS[lang] |
|
|
for line_idx, line in enumerate(lines): |
|
|
if line_idx in block_comment_line_indices: |
|
|
continue |
|
|
|
|
|
m = patt.search(line) |
|
|
if m: |
|
|
prefix = line[:m.start()] |
|
|
single_quotes = prefix.count("'") - prefix.count("\\'") |
|
|
double_quotes = prefix.count('"') - prefix.count('\\"') |
|
|
|
|
|
if single_quotes % 2 == 0 and double_quotes % 2 == 0: |
|
|
comment_lines += 1 |
|
|
text = m.group(1) |
|
|
if text: |
|
|
comment_tokens.extend(text.strip().split()) |
|
|
|
|
|
return comment_lines, len(comment_tokens) |
|
|
|
|
|
|
|
|
def count_functions_and_parameters(code: str, lang: str): |
|
|
"""统计函数数量与参数数量,支持多语言(含 Fortran subroutine/function)。 |
|
|
|
|
|
使用预编译的正则表达式以提高性能。 |
|
|
""" |
|
|
|
|
|
|
|
|
if lang == "jupyter": |
|
|
lang = "python" |
|
|
|
|
|
patt = _FUNCTION_PATTERNS.get(lang) |
|
|
if not patt: |
|
|
return 0, 0 |
|
|
|
|
|
|
|
|
code_no_comments = _remove_comments(code, lang) |
|
|
|
|
|
|
|
|
matches = patt.findall(code_no_comments) |
|
|
|
|
|
function_count = len(matches) |
|
|
|
|
|
parameter_count = 0 |
|
|
for m in matches: |
|
|
if lang == "fortran": |
|
|
params = m[2] |
|
|
elif lang == "matlab": |
|
|
params = m[2] if len(m) > 2 else "" |
|
|
else: |
|
|
params = m[1] if isinstance(m, tuple) and len(m) > 1 else "" |
|
|
|
|
|
params = params.strip() if params else "" |
|
|
if params: |
|
|
items = [p.strip() for p in params.split(",") if p.strip()] |
|
|
parameter_count += len(items) |
|
|
|
|
|
return function_count, parameter_count |
|
|
|
|
|
|
|
|
def _remove_comments(code: str, lang: str) -> str: |
|
|
"""移除代码中的注释,用于更准确地匹配函数定义(使用预编译正则)""" |
|
|
|
|
|
if lang in ("python", "jupyter"): |
|
|
code = _REMOVE_COMMENT_PATTERNS["python_line"].sub('', code) |
|
|
code = _REMOVE_COMMENT_PATTERNS["python_triple_dq"].sub(lambda m: '\n' * m.group(0).count('\n'), code) |
|
|
code = _REMOVE_COMMENT_PATTERNS["python_triple_sq"].sub(lambda m: '\n' * m.group(0).count('\n'), code) |
|
|
|
|
|
elif lang in ("c/c++", "java", "rust", "go"): |
|
|
code = _REMOVE_COMMENT_PATTERNS["c_line"].sub('', code) |
|
|
code = _REMOVE_COMMENT_PATTERNS["c_block"].sub(lambda m: '\n' * m.group(0).count('\n'), code) |
|
|
|
|
|
elif lang == "shell": |
|
|
code = _REMOVE_COMMENT_PATTERNS["shell_line"].sub('', code) |
|
|
|
|
|
elif lang == "r": |
|
|
code = _REMOVE_COMMENT_PATTERNS["shell_line"].sub('', code) |
|
|
|
|
|
elif lang == "matlab": |
|
|
code = _REMOVE_COMMENT_PATTERNS["matlab_line"].sub('', code) |
|
|
code = _REMOVE_COMMENT_PATTERNS["matlab_block"].sub(lambda m: '\n' * m.group(0).count('\n'), code) |
|
|
|
|
|
elif lang == "fortran": |
|
|
code = _REMOVE_COMMENT_PATTERNS["fortran_line"].sub('', code) |
|
|
|
|
|
return code |
|
|
|
|
|
|
|
|
def count_tokens(code: str): |
|
|
"""统计 Python token;非 Python 用简单 split""" |
|
|
try: |
|
|
return len(list(tokenize.generate_tokens(StringIO(code).readline))) |
|
|
except: |
|
|
return len(code.split()) |
|
|
|
|
|
|
|
|
def analyze_code(code_str, code_path): |
|
|
|
|
|
lang = detect_language(code_path) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
lines = code_str.count("\n") + 1 |
|
|
empty_lines = sum(1 for line in code_str.splitlines() if not line.strip()) |
|
|
comment_lines, comment_token_count = count_comments(code_str, lang) |
|
|
functions, parameters = count_functions_and_parameters(code_str, lang) |
|
|
tokens = count_tokens(code_str) |
|
|
|
|
|
return { |
|
|
"idx": None, |
|
|
"language": lang, |
|
|
"total_lines": lines, |
|
|
"comment_lines": comment_lines, |
|
|
"comment_tokenst": comment_token_count, |
|
|
"empty_lines": empty_lines, |
|
|
"code_lines": lines - empty_lines - comment_lines, |
|
|
"tokens": tokens, |
|
|
"functions": functions, |
|
|
"parameters": parameters, |
|
|
} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
input_dir = "/home/weifengsun/tangou1/domain_code/src/datasets/data_merged" |
|
|
output_dir = "/home/weifengsun/tangou1/domain_code/src/datasets/analysis2" |
|
|
for i in range(110, 120): |
|
|
input_filename = f"{i:03}.csv" |
|
|
output_file_name = f"{i:03}.jsonl" |
|
|
|
|
|
input_path = os.path.join(input_dir, input_filename) |
|
|
output_path = os.path.join(output_dir, output_file_name) |
|
|
|
|
|
results = [] |
|
|
|
|
|
with open(input_path, "r", encoding="utf-8", errors="replace") as f: |
|
|
filtered = (line.replace('\0', '') for line in f) |
|
|
reader = csv.DictReader(filtered) |
|
|
|
|
|
for idx, row in tqdm(enumerate(reader)): |
|
|
code_str = row.get("text") |
|
|
code_path = row.get("repo_path") |
|
|
if not code_path: |
|
|
code_path = row.get("path") |
|
|
|
|
|
result = analyze_code(code_str, code_path) |
|
|
result["idx"] = f"{i:03}-{idx}" |
|
|
results.append(result) |
|
|
|
|
|
|
|
|
with open(output_path, "w", encoding="utf-8") as f: |
|
|
for r in tqdm(results): |
|
|
f.write(json.dumps(r) + "\n") |
|
|
|