dataset-builder / data1 /scripts /export_files_to_csv.py
SunDou's picture
Upload data1/scripts/export_files_to_csv.py with huggingface_hub
f0b48c5 verified
#!/usr/bin/env python3
"""
Export repository files to CSV datasets grouped by keyword.
This script processes all files in repos_filtered directory, groups them by keyword
from repos_check_history.csv, and exports to separate CSV files for each keyword.
"""
import os
import csv
import re
from pathlib import Path
from collections import defaultdict
from typing import Dict, List, Tuple, Optional
import pandas as pd
from tqdm import tqdm
import logging
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('export_files_to_csv.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Configuration
REPOS_FILTERED_DIR = Path("/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered")
REPOS_CHECK_HISTORY_CSV = Path("/home/weifengsun/tangou1/domain_code/src/workdir/repos_check_history.csv")
OUTPUT_DIR = Path("/home/weifengsun/tangou1/domain_code/src/workdir/dataset_csv")
MAX_FILE_SIZE = None # No size limit - process all files
# Directories to skip
SKIP_DIRS = {'.git', 'node_modules', '__pycache__', '.pytest_cache', '.mypy_cache',
'venv', 'env', '.venv', '.env', 'dist', 'build', '.eggs', '*.egg-info'}
# Binary file extensions to skip
BINARY_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.gif', '.bmp', '.ico', '.svg',
'.pdf', '.zip', '.tar', '.gz', '.bz2', '.xz', '.7z',
'.exe', '.dll', '.so', '.dylib', '.bin', '.o', '.a',
'.pyc', '.pyo', '.pyd', '.class', '.jar', '.war',
'.mp3', '.mp4', '.avi', '.mov', '.wav', '.flac',
'.db', '.sqlite', '.sqlite3', '.h5', '.hdf5', '.pkl', '.pickle'}
# Language mapping based on file extension
LANGUAGE_MAP = {
'.py': 'Python',
'.js': 'JavaScript',
'.ts': 'TypeScript',
'.java': 'Java',
'.cpp': 'C++',
'.c': 'C',
'.cs': 'C#',
'.go': 'Go',
'.rs': 'Rust',
'.rb': 'Ruby',
'.php': 'PHP',
'.swift': 'Swift',
'.kt': 'Kotlin',
'.scala': 'Scala',
'.r': 'R',
'.m': 'MATLAB',
'.jl': 'Julia',
'.sh': 'Shell',
'.bash': 'Bash',
'.zsh': 'Zsh',
'.sql': 'SQL',
'.html': 'HTML',
'.css': 'CSS',
'.xml': 'XML',
'.json': 'JSON',
'.yaml': 'YAML',
'.yml': 'YAML',
'.md': 'Markdown',
'.tex': 'LaTeX',
'.f90': 'Fortran',
'.f': 'Fortran',
'.f77': 'Fortran',
'.f95': 'Fortran',
'.cu': 'CUDA',
'.cl': 'OpenCL',
'.hs': 'Haskell',
'.ml': 'OCaml',
'.fs': 'F#',
'.vb': 'Visual Basic',
'.pl': 'Perl',
'.pm': 'Perl',
'.lua': 'Lua',
'.vim': 'Vim script',
'.cmake': 'CMake',
'.makefile': 'Makefile',
'.dockerfile': 'Dockerfile',
}
def sanitize_keyword(keyword: str) -> str:
"""Sanitize keyword for use in filename."""
# Replace special characters with underscores
sanitized = re.sub(r'[^\w\s-]', '_', keyword)
# Replace spaces with underscores
sanitized = re.sub(r'[\s-]+', '_', sanitized)
# Remove multiple underscores
sanitized = re.sub(r'_+', '_', sanitized)
# Remove leading/trailing underscores
sanitized = sanitized.strip('_')
return sanitized
def load_keyword_mapping() -> Dict[str, str]:
"""Load keyword mapping from repos_check_history.csv."""
logger.info(f"Loading keyword mapping from {REPOS_CHECK_HISTORY_CSV}")
mapping = {}
try:
# Read CSV in chunks to handle large file
chunk_size = 100000
for chunk in pd.read_csv(REPOS_CHECK_HISTORY_CSV, chunksize=chunk_size):
for _, row in chunk.iterrows():
full_name = row['full_name']
keyword = row['keyword']
mapping[full_name] = keyword
logger.info(f"Loaded {len(mapping)} keyword mappings")
return mapping
except Exception as e:
logger.error(f"Error loading keyword mapping: {e}")
raise
def is_binary_file(file_path: Path) -> bool:
"""Check if file is binary by extension and content."""
# Check extension
if file_path.suffix.lower() in BINARY_EXTENSIONS:
return True
# Check if file is in skip directory
for part in file_path.parts:
if part in SKIP_DIRS or part.startswith('.'):
return True
# Try to read first 512 bytes to detect binary content
try:
with open(file_path, 'rb') as f:
chunk = f.read(512)
# Check for null bytes (common in binary files)
if b'\x00' in chunk:
return True
# Check if content is mostly printable
try:
chunk.decode('utf-8')
except UnicodeDecodeError:
return True
except Exception:
return True
return False
def should_skip_file(file_path: Path) -> bool:
"""Determine if file should be skipped."""
# Check if in skip directory
for part in file_path.parts:
if part in SKIP_DIRS:
return True
if part.startswith('.') and part != '.':
return True
# Skip README and README_SUMMARY markdown files
file_name = file_path.name.lower()
if file_name.startswith('readme') and file_path.suffix.lower() in {'.md', '.markdown', '.txt'}:
return True
# Check if binary
if is_binary_file(file_path):
return True
return False
def get_language(file_path: Path) -> str:
"""Get programming language from file extension."""
ext = file_path.suffix.lower()
return LANGUAGE_MAP.get(ext, 'Unknown')
def read_file_content(file_path: Path) -> Optional[str]:
"""Read file content, handling encoding issues."""
try:
# Try UTF-8 first
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return content
except UnicodeDecodeError:
# Try other encodings
encodings = ['latin-1', 'iso-8859-1', 'cp1252']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
logger.warning(f"Read {file_path} with {encoding} encoding")
return content
except (UnicodeDecodeError, LookupError):
continue
logger.warning(f"Could not decode {file_path}, skipping")
return None
except Exception as e:
logger.error(f"Error reading {file_path}: {e}")
return None
def process_file(file_path: Path, repo_name: str, keyword: str) -> Optional[Dict]:
"""Process a single file and return its metadata and content."""
if should_skip_file(file_path):
return None
try:
file_size = file_path.stat().st_size
# Get relative path from repo root
repo_dir = REPOS_FILTERED_DIR / repo_name
try:
relative_path = file_path.relative_to(repo_dir)
except ValueError:
# File is not in repo directory (shouldn't happen)
return None
# Read content
content = read_file_content(file_path)
if content is None:
return None
# Count lines
line_count = content.count('\n') + (1 if content else 0)
return {
'keyword': keyword,
'repo_name': repo_name.replace('___', '/'), # Convert to full_name format
'file_path': str(relative_path),
'file_extension': file_path.suffix,
'file_size': file_size,
'line_count': line_count,
'content': content,
'language': get_language(file_path)
}
except Exception as e:
logger.error(f"Error processing {file_path}: {e}")
return None
def process_repo(repo_name: str, keyword_mapping: Dict[str, str]) -> List[Dict]:
"""Process all files in a repository."""
repo_dir = REPOS_FILTERED_DIR / repo_name
if not repo_dir.exists() or not repo_dir.is_dir():
return []
# Get keyword for this repo
full_name = repo_name.replace('___', '/')
keyword = keyword_mapping.get(full_name)
if not keyword:
logger.debug(f"No keyword found for {full_name}, skipping")
return []
results = []
# Walk through all files
try:
for root, dirs, files in os.walk(repo_dir):
# Filter out skip directories
dirs[:] = [d for d in dirs if d not in SKIP_DIRS and not d.startswith('.')]
for file in files:
file_path = Path(root) / file
result = process_file(file_path, repo_name, keyword)
if result:
results.append(result)
except Exception as e:
logger.error(f"Error walking {repo_dir}: {e}")
return results
class CSVWriterManager:
"""Manager for CSV writers - handles opening, writing, and closing CSV files."""
def __init__(self, output_dir: Path):
self.output_dir = output_dir
self.writers = {} # keyword -> (file_handle, csv_writer)
self.file_counts = defaultdict(int) # keyword -> count
self.fieldnames = ['keyword', 'repo_name', 'file_path', 'file_extension',
'file_size', 'line_count', 'content', 'language']
def get_writer(self, keyword: str):
"""Get or create a CSV writer for a keyword."""
if keyword not in self.writers:
sanitized_keyword = sanitize_keyword(keyword)
output_file = self.output_dir / f"dataset_{sanitized_keyword}.csv"
file_handle = open(output_file, 'w', newline='', encoding='utf-8')
writer = csv.DictWriter(file_handle, fieldnames=self.fieldnames, quoting=csv.QUOTE_ALL)
writer.writeheader()
self.writers[keyword] = (file_handle, writer)
return self.writers[keyword][1]
def write_row(self, keyword: str, row: Dict):
"""Write a row to the appropriate CSV file."""
writer = self.get_writer(keyword)
writer.writerow(row)
self.file_counts[keyword] += 1
def close_all(self):
"""Close all open file handles."""
for keyword, (file_handle, _) in self.writers.items():
file_handle.close()
logger.info(f"Closed dataset_{sanitize_keyword(keyword)}.csv with {self.file_counts[keyword]} files")
def get_stats(self) -> Tuple[int, int]:
"""Return (total_keywords, total_files)."""
return len(self.writers), sum(self.file_counts.values())
def main():
"""Main function with streaming write to avoid memory issues."""
logger.info("Starting file export to CSV (streaming mode)")
# Create output directory
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
logger.info(f"Output directory: {OUTPUT_DIR}")
# Load keyword mapping
keyword_mapping = load_keyword_mapping()
# Get all repository directories
logger.info("Scanning repository directories...")
repo_dirs = [d.name for d in REPOS_FILTERED_DIR.iterdir() if d.is_dir()]
logger.info(f"Found {len(repo_dirs)} repositories")
# Initialize CSV writer manager (streaming mode - write as we go)
csv_manager = CSVWriterManager(OUTPUT_DIR)
# Process repositories and write immediately
logger.info("Processing repositories (streaming mode - writing as we go)...")
total_files_processed = 0
repos_processed = 0
repos_with_no_keyword = 0
try:
with tqdm(total=len(repo_dirs), desc="Processing repos") as pbar:
for repo_name in repo_dirs:
# Get keyword for this repo
full_name = repo_name.replace('___', '/')
keyword = keyword_mapping.get(full_name)
if not keyword:
repos_with_no_keyword += 1
pbar.update(1)
continue
# Process repository
results = process_repo(repo_name, keyword_mapping)
if results:
# Write results immediately to CSV (streaming)
for result in results:
csv_manager.write_row(result['keyword'], result)
total_files_processed += 1
repos_processed += 1
pbar.update(1)
# Periodic logging
if repos_processed > 0 and repos_processed % 1000 == 0:
logger.info(f"Progress: {repos_processed} repos, {total_files_processed} files")
finally:
# Close all CSV files
csv_manager.close_all()
# Print summary
total_keywords, total_files = csv_manager.get_stats()
logger.info("=" * 60)
logger.info("Export completed!")
logger.info(f"Repositories processed: {repos_processed}")
logger.info(f"Repositories with no keyword mapping: {repos_with_no_keyword}")
logger.info(f"Total keywords: {total_keywords}")
logger.info(f"Total files exported: {total_files}")
logger.info(f"Output directory: {OUTPUT_DIR}")
logger.info("=" * 60)
if __name__ == "__main__":
main()