#!/usr/bin/env python3 """ Export repository files to CSV datasets grouped by keyword. This script processes all files in repos_filtered directory, groups them by keyword from repos_check_history.csv, and exports to separate CSV files for each keyword. """ import os import csv import re from pathlib import Path from collections import defaultdict from typing import Dict, List, Tuple, Optional import pandas as pd from tqdm import tqdm import logging # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('export_files_to_csv.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Configuration REPOS_FILTERED_DIR = Path("/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered") REPOS_CHECK_HISTORY_CSV = Path("/home/weifengsun/tangou1/domain_code/src/workdir/repos_check_history.csv") OUTPUT_DIR = Path("/home/weifengsun/tangou1/domain_code/src/workdir/dataset_csv") MAX_FILE_SIZE = None # No size limit - process all files # Directories to skip SKIP_DIRS = {'.git', 'node_modules', '__pycache__', '.pytest_cache', '.mypy_cache', 'venv', 'env', '.venv', '.env', 'dist', 'build', '.eggs', '*.egg-info'} # Binary file extensions to skip BINARY_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.gif', '.bmp', '.ico', '.svg', '.pdf', '.zip', '.tar', '.gz', '.bz2', '.xz', '.7z', '.exe', '.dll', '.so', '.dylib', '.bin', '.o', '.a', '.pyc', '.pyo', '.pyd', '.class', '.jar', '.war', '.mp3', '.mp4', '.avi', '.mov', '.wav', '.flac', '.db', '.sqlite', '.sqlite3', '.h5', '.hdf5', '.pkl', '.pickle'} # Language mapping based on file extension LANGUAGE_MAP = { '.py': 'Python', '.js': 'JavaScript', '.ts': 'TypeScript', '.java': 'Java', '.cpp': 'C++', '.c': 'C', '.cs': 'C#', '.go': 'Go', '.rs': 'Rust', '.rb': 'Ruby', '.php': 'PHP', '.swift': 'Swift', '.kt': 'Kotlin', '.scala': 'Scala', '.r': 'R', '.m': 'MATLAB', '.jl': 'Julia', '.sh': 'Shell', '.bash': 'Bash', '.zsh': 'Zsh', '.sql': 'SQL', '.html': 'HTML', '.css': 'CSS', '.xml': 'XML', '.json': 'JSON', '.yaml': 'YAML', '.yml': 'YAML', '.md': 'Markdown', '.tex': 'LaTeX', '.f90': 'Fortran', '.f': 'Fortran', '.f77': 'Fortran', '.f95': 'Fortran', '.cu': 'CUDA', '.cl': 'OpenCL', '.hs': 'Haskell', '.ml': 'OCaml', '.fs': 'F#', '.vb': 'Visual Basic', '.pl': 'Perl', '.pm': 'Perl', '.lua': 'Lua', '.vim': 'Vim script', '.cmake': 'CMake', '.makefile': 'Makefile', '.dockerfile': 'Dockerfile', } def sanitize_keyword(keyword: str) -> str: """Sanitize keyword for use in filename.""" # Replace special characters with underscores sanitized = re.sub(r'[^\w\s-]', '_', keyword) # Replace spaces with underscores sanitized = re.sub(r'[\s-]+', '_', sanitized) # Remove multiple underscores sanitized = re.sub(r'_+', '_', sanitized) # Remove leading/trailing underscores sanitized = sanitized.strip('_') return sanitized def load_keyword_mapping() -> Dict[str, str]: """Load keyword mapping from repos_check_history.csv.""" logger.info(f"Loading keyword mapping from {REPOS_CHECK_HISTORY_CSV}") mapping = {} try: # Read CSV in chunks to handle large file chunk_size = 100000 for chunk in pd.read_csv(REPOS_CHECK_HISTORY_CSV, chunksize=chunk_size): for _, row in chunk.iterrows(): full_name = row['full_name'] keyword = row['keyword'] mapping[full_name] = keyword logger.info(f"Loaded {len(mapping)} keyword mappings") return mapping except Exception as e: logger.error(f"Error loading keyword mapping: {e}") raise def is_binary_file(file_path: Path) -> bool: """Check if file is binary by extension and content.""" # Check extension if file_path.suffix.lower() in BINARY_EXTENSIONS: return True # Check if file is in skip directory for part in file_path.parts: if part in SKIP_DIRS or part.startswith('.'): return True # Try to read first 512 bytes to detect binary content try: with open(file_path, 'rb') as f: chunk = f.read(512) # Check for null bytes (common in binary files) if b'\x00' in chunk: return True # Check if content is mostly printable try: chunk.decode('utf-8') except UnicodeDecodeError: return True except Exception: return True return False def should_skip_file(file_path: Path) -> bool: """Determine if file should be skipped.""" # Check if in skip directory for part in file_path.parts: if part in SKIP_DIRS: return True if part.startswith('.') and part != '.': return True # Skip README and README_SUMMARY markdown files file_name = file_path.name.lower() if file_name.startswith('readme') and file_path.suffix.lower() in {'.md', '.markdown', '.txt'}: return True # Check if binary if is_binary_file(file_path): return True return False def get_language(file_path: Path) -> str: """Get programming language from file extension.""" ext = file_path.suffix.lower() return LANGUAGE_MAP.get(ext, 'Unknown') def read_file_content(file_path: Path) -> Optional[str]: """Read file content, handling encoding issues.""" try: # Try UTF-8 first with open(file_path, 'r', encoding='utf-8') as f: content = f.read() return content except UnicodeDecodeError: # Try other encodings encodings = ['latin-1', 'iso-8859-1', 'cp1252'] for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: content = f.read() logger.warning(f"Read {file_path} with {encoding} encoding") return content except (UnicodeDecodeError, LookupError): continue logger.warning(f"Could not decode {file_path}, skipping") return None except Exception as e: logger.error(f"Error reading {file_path}: {e}") return None def process_file(file_path: Path, repo_name: str, keyword: str) -> Optional[Dict]: """Process a single file and return its metadata and content.""" if should_skip_file(file_path): return None try: file_size = file_path.stat().st_size # Get relative path from repo root repo_dir = REPOS_FILTERED_DIR / repo_name try: relative_path = file_path.relative_to(repo_dir) except ValueError: # File is not in repo directory (shouldn't happen) return None # Read content content = read_file_content(file_path) if content is None: return None # Count lines line_count = content.count('\n') + (1 if content else 0) return { 'keyword': keyword, 'repo_name': repo_name.replace('___', '/'), # Convert to full_name format 'file_path': str(relative_path), 'file_extension': file_path.suffix, 'file_size': file_size, 'line_count': line_count, 'content': content, 'language': get_language(file_path) } except Exception as e: logger.error(f"Error processing {file_path}: {e}") return None def process_repo(repo_name: str, keyword_mapping: Dict[str, str]) -> List[Dict]: """Process all files in a repository.""" repo_dir = REPOS_FILTERED_DIR / repo_name if not repo_dir.exists() or not repo_dir.is_dir(): return [] # Get keyword for this repo full_name = repo_name.replace('___', '/') keyword = keyword_mapping.get(full_name) if not keyword: logger.debug(f"No keyword found for {full_name}, skipping") return [] results = [] # Walk through all files try: for root, dirs, files in os.walk(repo_dir): # Filter out skip directories dirs[:] = [d for d in dirs if d not in SKIP_DIRS and not d.startswith('.')] for file in files: file_path = Path(root) / file result = process_file(file_path, repo_name, keyword) if result: results.append(result) except Exception as e: logger.error(f"Error walking {repo_dir}: {e}") return results class CSVWriterManager: """Manager for CSV writers - handles opening, writing, and closing CSV files.""" def __init__(self, output_dir: Path): self.output_dir = output_dir self.writers = {} # keyword -> (file_handle, csv_writer) self.file_counts = defaultdict(int) # keyword -> count self.fieldnames = ['keyword', 'repo_name', 'file_path', 'file_extension', 'file_size', 'line_count', 'content', 'language'] def get_writer(self, keyword: str): """Get or create a CSV writer for a keyword.""" if keyword not in self.writers: sanitized_keyword = sanitize_keyword(keyword) output_file = self.output_dir / f"dataset_{sanitized_keyword}.csv" file_handle = open(output_file, 'w', newline='', encoding='utf-8') writer = csv.DictWriter(file_handle, fieldnames=self.fieldnames, quoting=csv.QUOTE_ALL) writer.writeheader() self.writers[keyword] = (file_handle, writer) return self.writers[keyword][1] def write_row(self, keyword: str, row: Dict): """Write a row to the appropriate CSV file.""" writer = self.get_writer(keyword) writer.writerow(row) self.file_counts[keyword] += 1 def close_all(self): """Close all open file handles.""" for keyword, (file_handle, _) in self.writers.items(): file_handle.close() logger.info(f"Closed dataset_{sanitize_keyword(keyword)}.csv with {self.file_counts[keyword]} files") def get_stats(self) -> Tuple[int, int]: """Return (total_keywords, total_files).""" return len(self.writers), sum(self.file_counts.values()) def main(): """Main function with streaming write to avoid memory issues.""" logger.info("Starting file export to CSV (streaming mode)") # Create output directory OUTPUT_DIR.mkdir(parents=True, exist_ok=True) logger.info(f"Output directory: {OUTPUT_DIR}") # Load keyword mapping keyword_mapping = load_keyword_mapping() # Get all repository directories logger.info("Scanning repository directories...") repo_dirs = [d.name for d in REPOS_FILTERED_DIR.iterdir() if d.is_dir()] logger.info(f"Found {len(repo_dirs)} repositories") # Initialize CSV writer manager (streaming mode - write as we go) csv_manager = CSVWriterManager(OUTPUT_DIR) # Process repositories and write immediately logger.info("Processing repositories (streaming mode - writing as we go)...") total_files_processed = 0 repos_processed = 0 repos_with_no_keyword = 0 try: with tqdm(total=len(repo_dirs), desc="Processing repos") as pbar: for repo_name in repo_dirs: # Get keyword for this repo full_name = repo_name.replace('___', '/') keyword = keyword_mapping.get(full_name) if not keyword: repos_with_no_keyword += 1 pbar.update(1) continue # Process repository results = process_repo(repo_name, keyword_mapping) if results: # Write results immediately to CSV (streaming) for result in results: csv_manager.write_row(result['keyword'], result) total_files_processed += 1 repos_processed += 1 pbar.update(1) # Periodic logging if repos_processed > 0 and repos_processed % 1000 == 0: logger.info(f"Progress: {repos_processed} repos, {total_files_processed} files") finally: # Close all CSV files csv_manager.close_all() # Print summary total_keywords, total_files = csv_manager.get_stats() logger.info("=" * 60) logger.info("Export completed!") logger.info(f"Repositories processed: {repos_processed}") logger.info(f"Repositories with no keyword mapping: {repos_with_no_keyword}") logger.info(f"Total keywords: {total_keywords}") logger.info(f"Total files exported: {total_files}") logger.info(f"Output directory: {OUTPUT_DIR}") logger.info("=" * 60) if __name__ == "__main__": main()