import traceback import os import json from concurrent.futures import ProcessPoolExecutor, as_completed from tree_sitter import Language, Parser import tree_sitter_c import tree_sitter_cpp import tree_sitter_java import tree_sitter_go import tree_sitter_rust import tree_sitter_julia import tree_sitter_python ######################################## # 配置区 ######################################## OUTPUT_DIR = "/home/weifengsun/tangou1/step2/step22/dataset_calls" SCORE_THRESHOLD = 0.92 EXCLUDE_DIRS = { ".git", "node_modules", "vendor", "third_party", "build", "dist", "target", "__pycache__" } LANGUAGE_CONFIG = { "python": { "ext": [".py"], "function_nodes": ["function_definition"], "call_nodes": ["call"], "name_field": "name", }, "c": { "ext": [".c", ".h"], "function_nodes": ["function_definition"], "call_nodes": ["call_expression"], "name_field": "declarator", }, "cpp": { "ext": [".cpp", ".cc", ".cxx", ".hpp", ".hh"], "function_nodes": ["function_definition"], "call_nodes": ["call_expression"], "name_field": "declarator", }, "java": { "ext": [".java"], "function_nodes": ["method_declaration", "constructor_declaration"], "call_nodes": ["method_invocation", "call_expression"], "name_field": "name", }, "go": { "ext": [".go"], "function_nodes": ["function_declaration", "method_declaration"], "call_nodes": ["call_expression"], "name_field": "name", }, "rust": { "ext": [".rs"], "function_nodes": ["function_item"], "call_nodes": ["call_expression"], "name_field": "name", }, "julia": { "ext": [".jl"], "function_nodes": ["function_definition"], "call_nodes": ["call_expression"], "name_field": "name", }, } EXT_TO_LANG = {} for lang, cfg in LANGUAGE_CONFIG.items(): for e in cfg["ext"]: EXT_TO_LANG[e] = lang ######################################## # worker 初始化 ######################################## LANGUAGES = { "python": Language(tree_sitter_python.language()), "c": Language(tree_sitter_c.language()), "cpp": Language(tree_sitter_cpp.language()), "java": Language(tree_sitter_java.language()), "go": Language(tree_sitter_go.language()), "rust": Language(tree_sitter_rust.language()), "julia": Language(tree_sitter_julia.language()), } def init_worker(): global PARSERS PARSERS = {} for lang in LANGUAGE_CONFIG: try: parser = Parser() parser.set_language(LANGUAGES[lang]) PARSERS[lang] = parser except Exception: print(f"Failed to load parser for {lang}") print(traceback.format_exc()) pass ######################################## # jsonl 读取 ######################################## def load_high_score_functions(jsonl_path, score_threshold=SCORE_THRESHOLD): funcs = [] names = set() with open(jsonl_path, "r", encoding="utf-8") as f: for line in f: obj = json.loads(line) if obj.get("score", 0) >= score_threshold: funcs.append(obj) names.add(obj["name"]) return funcs, names ######################################## # Tree-sitter 提取调用 ######################################## def extract_calls_from_node(node, src, call_types): calls = set() def walk(n): if n.type in call_types: for c in n.children: if c.type == "identifier": calls.add(src[c.start_byte:c.end_byte].decode()) for c in n.children: walk(c) walk(node) return calls def find_function_node_by_line(tree, start_line, func_nodes): result = None def walk(node): nonlocal result if result is not None: return if node.type in func_nodes and node.start_point[0] + 1 == start_line: result = node return for c in node.children: walk(c) walk(tree.root_node) return result ######################################## # 匹配函数名 ######################################## def match_names(called_names, indexed_names): matches = {} for call in called_names: hits = [name for name in indexed_names if call == name or name.startswith(call) or call.startswith(name)] if hits: matches[call] = hits return matches ######################################## # 单项目分析 ######################################## def process_project_calls(project_path): project_name = os.path.basename(project_path.rstrip("/")) input_jsonl = os.path.join(project_path, "functions.jsonl") # 假设每项目jsonl在项目目录下 output_path = os.path.join(OUTPUT_DIR, project_name, "calls.jsonl") os.makedirs(os.path.dirname(output_path), exist_ok=True) funcs, name_set = load_high_score_functions(input_jsonl) with open(output_path, "w", encoding="utf-8") as out: for f in funcs: file_path = f["file"] lang = f["language"] if not lang or lang not in PARSERS: continue try: src = open(file_path, "rb").read() parser = PARSERS[lang] tree = parser.parse(src) node = find_function_node_by_line(tree, f["start_line"], LANGUAGE_CONFIG[lang]["function_nodes"]) if not node: continue calls = extract_calls_from_node(node, src, LANGUAGE_CONFIG[lang]["call_nodes"]) matched = match_names(calls, name_set) out.write(json.dumps({ "function": f["name"], "qualified_name": f.get("qualified_name", f["name"]), "calls": sorted(list(calls)), "matched": matched, "file": f["file"], "start_line": f["start_line"], "end_line": f["end_line"], "score": f.get("score", 0), "language": lang }, ensure_ascii=False) + "\n") except Exception: continue return project_name ######################################## # 主入口 ######################################## def load_projects(root): return [os.path.join(root, d) for d in os.listdir(root) if os.path.isdir(os.path.join(root, d))] def main(): projects_root = "/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered" os.makedirs(OUTPUT_DIR, exist_ok=True) projects = load_projects(projects_root) with ProcessPoolExecutor(max_workers=min(os.cpu_count(), 32), initializer=init_worker) as pool: futures = {pool.submit(process_project_calls, p): p for p in projects} for f in as_completed(futures): proj = futures[f] try: f.result() print(f"[OK] {proj}") except Exception as e: print(f"[FAIL] {proj}: {e}") if __name__ == "__main__": main()