dataset-builder / data2 /step22 /depend_analysis.py
SunDou's picture
Upload data2/step22/depend_analysis.py with huggingface_hub
5c9df83 verified
import traceback
import os
import json
from concurrent.futures import ProcessPoolExecutor, as_completed
from tree_sitter import Language, Parser
import tree_sitter_c
import tree_sitter_cpp
import tree_sitter_java
import tree_sitter_go
import tree_sitter_rust
import tree_sitter_julia
import tree_sitter_python
########################################
# 配置区
########################################
OUTPUT_DIR = "/home/weifengsun/tangou1/step2/step22/dataset_calls"
SCORE_THRESHOLD = 0.92
EXCLUDE_DIRS = {
".git", "node_modules", "vendor", "third_party",
"build", "dist", "target", "__pycache__"
}
LANGUAGE_CONFIG = {
"python": {
"ext": [".py"],
"function_nodes": ["function_definition"],
"call_nodes": ["call"],
"name_field": "name",
},
"c": {
"ext": [".c", ".h"],
"function_nodes": ["function_definition"],
"call_nodes": ["call_expression"],
"name_field": "declarator",
},
"cpp": {
"ext": [".cpp", ".cc", ".cxx", ".hpp", ".hh"],
"function_nodes": ["function_definition"],
"call_nodes": ["call_expression"],
"name_field": "declarator",
},
"java": {
"ext": [".java"],
"function_nodes": ["method_declaration", "constructor_declaration"],
"call_nodes": ["method_invocation", "call_expression"],
"name_field": "name",
},
"go": {
"ext": [".go"],
"function_nodes": ["function_declaration", "method_declaration"],
"call_nodes": ["call_expression"],
"name_field": "name",
},
"rust": {
"ext": [".rs"],
"function_nodes": ["function_item"],
"call_nodes": ["call_expression"],
"name_field": "name",
},
"julia": {
"ext": [".jl"],
"function_nodes": ["function_definition"],
"call_nodes": ["call_expression"],
"name_field": "name",
},
}
EXT_TO_LANG = {}
for lang, cfg in LANGUAGE_CONFIG.items():
for e in cfg["ext"]:
EXT_TO_LANG[e] = lang
########################################
# worker 初始化
########################################
LANGUAGES = {
"python": Language(tree_sitter_python.language()),
"c": Language(tree_sitter_c.language()),
"cpp": Language(tree_sitter_cpp.language()),
"java": Language(tree_sitter_java.language()),
"go": Language(tree_sitter_go.language()),
"rust": Language(tree_sitter_rust.language()),
"julia": Language(tree_sitter_julia.language()),
}
def init_worker():
global PARSERS
PARSERS = {}
for lang in LANGUAGE_CONFIG:
try:
parser = Parser()
parser.set_language(LANGUAGES[lang])
PARSERS[lang] = parser
except Exception:
print(f"Failed to load parser for {lang}")
print(traceback.format_exc())
pass
########################################
# jsonl 读取
########################################
def load_high_score_functions(jsonl_path, score_threshold=SCORE_THRESHOLD):
funcs = []
names = set()
with open(jsonl_path, "r", encoding="utf-8") as f:
for line in f:
obj = json.loads(line)
if obj.get("score", 0) >= score_threshold:
funcs.append(obj)
names.add(obj["name"])
return funcs, names
########################################
# Tree-sitter 提取调用
########################################
def extract_calls_from_node(node, src, call_types):
calls = set()
def walk(n):
if n.type in call_types:
for c in n.children:
if c.type == "identifier":
calls.add(src[c.start_byte:c.end_byte].decode())
for c in n.children:
walk(c)
walk(node)
return calls
def find_function_node_by_line(tree, start_line, func_nodes):
result = None
def walk(node):
nonlocal result
if result is not None:
return
if node.type in func_nodes and node.start_point[0] + 1 == start_line:
result = node
return
for c in node.children:
walk(c)
walk(tree.root_node)
return result
########################################
# 匹配函数名
########################################
def match_names(called_names, indexed_names):
matches = {}
for call in called_names:
hits = [name for name in indexed_names if call == name or name.startswith(call) or call.startswith(name)]
if hits:
matches[call] = hits
return matches
########################################
# 单项目分析
########################################
def process_project_calls(project_path):
project_name = os.path.basename(project_path.rstrip("/"))
input_jsonl = os.path.join(project_path, "functions.jsonl") # 假设每项目jsonl在项目目录下
output_path = os.path.join(OUTPUT_DIR, project_name, "calls.jsonl")
os.makedirs(os.path.dirname(output_path), exist_ok=True)
funcs, name_set = load_high_score_functions(input_jsonl)
with open(output_path, "w", encoding="utf-8") as out:
for f in funcs:
file_path = f["file"]
lang = f["language"]
if not lang or lang not in PARSERS:
continue
try:
src = open(file_path, "rb").read()
parser = PARSERS[lang]
tree = parser.parse(src)
node = find_function_node_by_line(tree, f["start_line"], LANGUAGE_CONFIG[lang]["function_nodes"])
if not node:
continue
calls = extract_calls_from_node(node, src, LANGUAGE_CONFIG[lang]["call_nodes"])
matched = match_names(calls, name_set)
out.write(json.dumps({
"function": f["name"],
"qualified_name": f.get("qualified_name", f["name"]),
"calls": sorted(list(calls)),
"matched": matched,
"file": f["file"],
"start_line": f["start_line"],
"end_line": f["end_line"],
"score": f.get("score", 0),
"language": lang
}, ensure_ascii=False) + "\n")
except Exception:
continue
return project_name
########################################
# 主入口
########################################
def load_projects(root):
return [os.path.join(root, d) for d in os.listdir(root) if os.path.isdir(os.path.join(root, d))]
def main():
projects_root = "/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered"
os.makedirs(OUTPUT_DIR, exist_ok=True)
projects = load_projects(projects_root)
with ProcessPoolExecutor(max_workers=min(os.cpu_count(), 32), initializer=init_worker) as pool:
futures = {pool.submit(process_project_calls, p): p for p in projects}
for f in as_completed(futures):
proj = futures[f]
try:
f.result()
print(f"[OK] {proj}")
except Exception as e:
print(f"[FAIL] {proj}: {e}")
if __name__ == "__main__":
main()