DouDou
Upload data2/instruction_generation/pipeline.py with huggingface_hub
c21b56f verified
#!/usr/bin/env python3
"""
instruction_generation unified entry script
Supports --mode summarize|parse|all
"""
import os
import sys
import asyncio
import argparse
from pathlib import Path
from dotenv import load_dotenv
# Load .env file (before importing logger)
env_file = Path(__file__).parent / ".env"
if env_file.exists():
load_dotenv(env_file)
elif (Path(__file__).parent.parent / ".env").exists():
# If not in current directory, try loading from project root
load_dotenv(Path(__file__).parent.parent / ".env")
# Add current directory to path (for importing local modules)
sys.path.insert(0, str(Path(__file__).parent))
# Add domain_code/src to path for reusing util functions
sys.path.insert(0, str(Path(__file__).parent.parent / "domain_code" / "src"))
from util import init_logger, logger
# Import modules
from summarize_repo_readme import process_all_repos as process_summarize
from extract_repo_functions import process_all_repos as process_extract
async def main():
"""Main function"""
parser = argparse.ArgumentParser(
description="instruction_generation unified entry tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Full pipeline: summarize README first, then parse functions
python3 pipeline.py --mode all
# Summarize README only
python3 pipeline.py --mode summarize
# Parse functions only (requires README_SUMMARY.md to exist)
python3 pipeline.py --mode parse
# Use local vLLM Qwen (default)
python3 pipeline.py --mode all
# Use OpenAI API
export OPENAI_API_KEY="your-api-key"
python3 pipeline.py --mode all --base_url https://api.openai.com/v1 --model gpt-4o-mini
# Specify repository directory and other parameters
python3 pipeline.py --mode all --repos_dir /path/to/repos_filtered --max_concurrency 16 --overwrite
""",
)
# Common parameters
parser.add_argument(
"--repos_dir",
type=str,
default=os.getenv("REPOS_DIR", "/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered"),
help="Repository root directory path (can be read from REPOS_DIR env var)",
)
parser.add_argument(
"--mode",
type=str,
choices=["summarize", "parse", "all"],
default="all",
help="Execution mode: 'summarize' (README only), 'parse' (functions only), 'all' (full pipeline)",
)
# LLM parameters
parser.add_argument(
"--base_url",
type=str,
default=os.getenv("OPENAI_BASE_URL", "http://localhost:8000/v1"),
help="LLM API base URL (can be read from OPENAI_BASE_URL env var, default: http://localhost:8000/v1)",
)
parser.add_argument(
"--model",
type=str,
default=os.getenv("DEFAULT_MODEL", "Qwen3"),
help="Model name (can be read from DEFAULT_MODEL env var, default: Qwen3)",
)
parser.add_argument(
"--api_key_env",
type=str,
default="OPENAI_API_KEY",
help="API key environment variable name (default: OPENAI_API_KEY)",
)
# Performance parameters
parser.add_argument(
"--max_concurrency",
type=int,
default=int(os.getenv("MAX_CONCURRENCY", "8")),
help="Maximum concurrency (can be read from MAX_CONCURRENCY env var, default: 8)",
)
parser.add_argument(
"--max_file_chars",
type=int,
default=int(os.getenv("MAX_FILE_CHARS", "200000")),
help="Maximum file size (chars, for parse mode only, can be read from MAX_FILE_CHARS env var, default: 200000)",
)
# Other parameters
parser.add_argument(
"--overwrite",
action="store_true",
help="Overwrite existing files",
)
parser.add_argument(
"--log_file",
type=str,
default="instruction_generation/workdir/logs/pipeline.log",
help="Log file path",
)
args = parser.parse_args()
# Initialize logger
init_logger(args.log_file, level="INFO")
# Get API key (with debug logging)
# region agent log
env_key_before = os.getenv(args.api_key_env)
debug_log_path = Path(__file__).parent.parent / ".cursor" / "debug.log"
try:
with open(debug_log_path, "a", encoding="utf-8") as f:
import json
log_entry = {
"sessionId": "debug-session",
"runId": "api-key-debug",
"hypothesisId": "A",
"location": "pipeline.py:130",
"message": "API key read from env",
"data": {
"env_var_name": args.api_key_env,
"key_exists": env_key_before is not None,
"key_length": len(env_key_before) if env_key_before else 0,
"key_prefix": env_key_before[:20] + "..." if env_key_before and len(env_key_before) > 20 else env_key_before,
},
"timestamp": int(__import__("time").time() * 1000)
}
f.write(json.dumps(log_entry) + "\n")
except Exception:
pass
# endregion
api_key = os.getenv(args.api_key_env, "none")
# region agent log
try:
with open(debug_log_path, "a", encoding="utf-8") as f:
log_entry = {
"sessionId": "debug-session",
"runId": "api-key-debug",
"hypothesisId": "A",
"location": "pipeline.py:150",
"message": "API key final value",
"data": {
"api_key_length": len(api_key) if api_key else 0,
"api_key_prefix": api_key[:20] + "..." if api_key and len(api_key) > 20 else api_key,
"api_key_suffix": "..." + api_key[-10:] if api_key and len(api_key) > 10 else api_key,
"is_default_none": api_key == "none",
},
"timestamp": int(__import__("time").time() * 1000)
}
f.write(json.dumps(log_entry) + "\n")
except Exception:
pass
# endregion
# Check repository directory
repos_dir = Path(args.repos_dir)
if not repos_dir.exists():
logger.error(f"Repository directory does not exist: {repos_dir}")
sys.exit(1)
# Create log directory
log_file_path = Path(args.log_file)
log_file_path.parent.mkdir(parents=True, exist_ok=True)
logger.info("=" * 80)
logger.info(f"instruction_generation tool started")
logger.info("=" * 80)
logger.info(f"Mode: {args.mode}")
logger.info(f"Repository directory: {repos_dir}")
logger.info(f"LLM API: {args.base_url}")
logger.info(f"Model: {args.model}")
logger.info(f"Max concurrency: {args.max_concurrency}")
logger.info(f"Overwrite existing files: {args.overwrite}")
logger.info("=" * 80)
# Execute based on mode
if args.mode == "summarize":
# Summarize README only
logger.info("Starting: README summarization")
results = await process_summarize(
repos_dir=repos_dir,
base_url=args.base_url,
model=args.model,
api_key=api_key,
log_file=str(log_file_path),
max_concurrency=args.max_concurrency,
overwrite=args.overwrite,
)
logger.info("\n" + "=" * 80)
logger.info("README summarization complete!")
logger.info("=" * 80)
elif args.mode == "parse":
# Parse functions only
logger.info("Starting: Function parsing")
results = await process_extract(
repos_dir=repos_dir,
base_url=args.base_url,
model=args.model,
api_key=api_key,
log_file=str(log_file_path),
max_file_chars=args.max_file_chars,
max_concurrency=args.max_concurrency,
overwrite=args.overwrite,
)
logger.info("\n" + "=" * 80)
logger.info("Function parsing complete!")
logger.info("=" * 80)
elif args.mode == "all":
# Full pipeline: summarize README first, then parse functions
logger.info("Starting: Full pipeline")
logger.info("\n" + "-" * 80)
logger.info("Step 1/2: README summarization")
logger.info("-" * 80)
summarize_results = await process_summarize(
repos_dir=repos_dir,
base_url=args.base_url,
model=args.model,
api_key=api_key,
log_file=str(log_file_path),
max_concurrency=args.max_concurrency,
overwrite=args.overwrite,
)
logger.info("\n" + "-" * 80)
logger.info("Step 2/2: Function parsing")
logger.info("-" * 80)
# When parsing functions, if README_SUMMARY.md doesn't exist, overwrite=False will auto-skip
# But if user requests overwrite, it will try to parse even without README_SUMMARY.md (will skip repos without summary)
parse_results = await process_extract(
repos_dir=repos_dir,
base_url=args.base_url,
model=args.model,
api_key=api_key,
log_file=str(log_file_path),
max_file_chars=args.max_file_chars,
max_concurrency=args.max_concurrency,
overwrite=args.overwrite,
)
logger.info("\n" + "=" * 80)
logger.info("Full pipeline complete!")
logger.info("=" * 80)
results = parse_results
else:
logger.error(f"Unknown mode: {args.mode}")
sys.exit(1)
logger.info("\n" + "=" * 80)
logger.info("All tasks complete!")
logger.info("=" * 80)
if __name__ == "__main__":
asyncio.run(main())