|
|
|
|
|
""" |
|
|
Export repository files to CSV datasets grouped by keyword. |
|
|
|
|
|
This script processes all files in repos_filtered directory, groups them by keyword |
|
|
from repos_check_history.csv, and exports to separate CSV files for each keyword. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import csv |
|
|
import re |
|
|
from pathlib import Path |
|
|
from collections import defaultdict |
|
|
from typing import Dict, List, Tuple, Optional |
|
|
import pandas as pd |
|
|
from tqdm import tqdm |
|
|
import logging |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
|
handlers=[ |
|
|
logging.FileHandler('export_files_to_csv.log'), |
|
|
logging.StreamHandler() |
|
|
] |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
REPOS_FILTERED_DIR = Path("/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered") |
|
|
REPOS_CHECK_HISTORY_CSV = Path("/home/weifengsun/tangou1/domain_code/src/workdir/repos_check_history.csv") |
|
|
OUTPUT_DIR = Path("/home/weifengsun/tangou1/domain_code/src/workdir/dataset_csv") |
|
|
MAX_FILE_SIZE = None |
|
|
|
|
|
|
|
|
SKIP_DIRS = {'.git', 'node_modules', '__pycache__', '.pytest_cache', '.mypy_cache', |
|
|
'venv', 'env', '.venv', '.env', 'dist', 'build', '.eggs', '*.egg-info'} |
|
|
|
|
|
|
|
|
BINARY_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.gif', '.bmp', '.ico', '.svg', |
|
|
'.pdf', '.zip', '.tar', '.gz', '.bz2', '.xz', '.7z', |
|
|
'.exe', '.dll', '.so', '.dylib', '.bin', '.o', '.a', |
|
|
'.pyc', '.pyo', '.pyd', '.class', '.jar', '.war', |
|
|
'.mp3', '.mp4', '.avi', '.mov', '.wav', '.flac', |
|
|
'.db', '.sqlite', '.sqlite3', '.h5', '.hdf5', '.pkl', '.pickle'} |
|
|
|
|
|
|
|
|
LANGUAGE_MAP = { |
|
|
'.py': 'Python', |
|
|
'.js': 'JavaScript', |
|
|
'.ts': 'TypeScript', |
|
|
'.java': 'Java', |
|
|
'.cpp': 'C++', |
|
|
'.c': 'C', |
|
|
'.cs': 'C#', |
|
|
'.go': 'Go', |
|
|
'.rs': 'Rust', |
|
|
'.rb': 'Ruby', |
|
|
'.php': 'PHP', |
|
|
'.swift': 'Swift', |
|
|
'.kt': 'Kotlin', |
|
|
'.scala': 'Scala', |
|
|
'.r': 'R', |
|
|
'.m': 'MATLAB', |
|
|
'.jl': 'Julia', |
|
|
'.sh': 'Shell', |
|
|
'.bash': 'Bash', |
|
|
'.zsh': 'Zsh', |
|
|
'.sql': 'SQL', |
|
|
'.html': 'HTML', |
|
|
'.css': 'CSS', |
|
|
'.xml': 'XML', |
|
|
'.json': 'JSON', |
|
|
'.yaml': 'YAML', |
|
|
'.yml': 'YAML', |
|
|
'.md': 'Markdown', |
|
|
'.tex': 'LaTeX', |
|
|
'.f90': 'Fortran', |
|
|
'.f': 'Fortran', |
|
|
'.f77': 'Fortran', |
|
|
'.f95': 'Fortran', |
|
|
'.cu': 'CUDA', |
|
|
'.cl': 'OpenCL', |
|
|
'.hs': 'Haskell', |
|
|
'.ml': 'OCaml', |
|
|
'.fs': 'F#', |
|
|
'.vb': 'Visual Basic', |
|
|
'.pl': 'Perl', |
|
|
'.pm': 'Perl', |
|
|
'.lua': 'Lua', |
|
|
'.vim': 'Vim script', |
|
|
'.cmake': 'CMake', |
|
|
'.makefile': 'Makefile', |
|
|
'.dockerfile': 'Dockerfile', |
|
|
} |
|
|
|
|
|
|
|
|
def sanitize_keyword(keyword: str) -> str: |
|
|
"""Sanitize keyword for use in filename.""" |
|
|
|
|
|
sanitized = re.sub(r'[^\w\s-]', '_', keyword) |
|
|
|
|
|
sanitized = re.sub(r'[\s-]+', '_', sanitized) |
|
|
|
|
|
sanitized = re.sub(r'_+', '_', sanitized) |
|
|
|
|
|
sanitized = sanitized.strip('_') |
|
|
return sanitized |
|
|
|
|
|
|
|
|
def load_keyword_mapping() -> Dict[str, str]: |
|
|
"""Load keyword mapping from repos_check_history.csv.""" |
|
|
logger.info(f"Loading keyword mapping from {REPOS_CHECK_HISTORY_CSV}") |
|
|
|
|
|
mapping = {} |
|
|
try: |
|
|
|
|
|
chunk_size = 100000 |
|
|
for chunk in pd.read_csv(REPOS_CHECK_HISTORY_CSV, chunksize=chunk_size): |
|
|
for _, row in chunk.iterrows(): |
|
|
full_name = row['full_name'] |
|
|
keyword = row['keyword'] |
|
|
mapping[full_name] = keyword |
|
|
|
|
|
logger.info(f"Loaded {len(mapping)} keyword mappings") |
|
|
return mapping |
|
|
except Exception as e: |
|
|
logger.error(f"Error loading keyword mapping: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
def is_binary_file(file_path: Path) -> bool: |
|
|
"""Check if file is binary by extension and content.""" |
|
|
|
|
|
if file_path.suffix.lower() in BINARY_EXTENSIONS: |
|
|
return True |
|
|
|
|
|
|
|
|
for part in file_path.parts: |
|
|
if part in SKIP_DIRS or part.startswith('.'): |
|
|
return True |
|
|
|
|
|
|
|
|
try: |
|
|
with open(file_path, 'rb') as f: |
|
|
chunk = f.read(512) |
|
|
|
|
|
if b'\x00' in chunk: |
|
|
return True |
|
|
|
|
|
try: |
|
|
chunk.decode('utf-8') |
|
|
except UnicodeDecodeError: |
|
|
return True |
|
|
except Exception: |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
def should_skip_file(file_path: Path) -> bool: |
|
|
"""Determine if file should be skipped.""" |
|
|
|
|
|
for part in file_path.parts: |
|
|
if part in SKIP_DIRS: |
|
|
return True |
|
|
if part.startswith('.') and part != '.': |
|
|
return True |
|
|
|
|
|
|
|
|
file_name = file_path.name.lower() |
|
|
if file_name.startswith('readme') and file_path.suffix.lower() in {'.md', '.markdown', '.txt'}: |
|
|
return True |
|
|
|
|
|
|
|
|
if is_binary_file(file_path): |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
def get_language(file_path: Path) -> str: |
|
|
"""Get programming language from file extension.""" |
|
|
ext = file_path.suffix.lower() |
|
|
return LANGUAGE_MAP.get(ext, 'Unknown') |
|
|
|
|
|
|
|
|
def read_file_content(file_path: Path) -> Optional[str]: |
|
|
"""Read file content, handling encoding issues.""" |
|
|
try: |
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
content = f.read() |
|
|
return content |
|
|
except UnicodeDecodeError: |
|
|
|
|
|
encodings = ['latin-1', 'iso-8859-1', 'cp1252'] |
|
|
for encoding in encodings: |
|
|
try: |
|
|
with open(file_path, 'r', encoding=encoding) as f: |
|
|
content = f.read() |
|
|
logger.warning(f"Read {file_path} with {encoding} encoding") |
|
|
return content |
|
|
except (UnicodeDecodeError, LookupError): |
|
|
continue |
|
|
|
|
|
logger.warning(f"Could not decode {file_path}, skipping") |
|
|
return None |
|
|
except Exception as e: |
|
|
logger.error(f"Error reading {file_path}: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def process_file(file_path: Path, repo_name: str, keyword: str) -> Optional[Dict]: |
|
|
"""Process a single file and return its metadata and content.""" |
|
|
if should_skip_file(file_path): |
|
|
return None |
|
|
|
|
|
try: |
|
|
file_size = file_path.stat().st_size |
|
|
|
|
|
|
|
|
repo_dir = REPOS_FILTERED_DIR / repo_name |
|
|
try: |
|
|
relative_path = file_path.relative_to(repo_dir) |
|
|
except ValueError: |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
content = read_file_content(file_path) |
|
|
if content is None: |
|
|
return None |
|
|
|
|
|
|
|
|
line_count = content.count('\n') + (1 if content else 0) |
|
|
|
|
|
return { |
|
|
'keyword': keyword, |
|
|
'repo_name': repo_name.replace('___', '/'), |
|
|
'file_path': str(relative_path), |
|
|
'file_extension': file_path.suffix, |
|
|
'file_size': file_size, |
|
|
'line_count': line_count, |
|
|
'content': content, |
|
|
'language': get_language(file_path) |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error processing {file_path}: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def process_repo(repo_name: str, keyword_mapping: Dict[str, str]) -> List[Dict]: |
|
|
"""Process all files in a repository.""" |
|
|
repo_dir = REPOS_FILTERED_DIR / repo_name |
|
|
|
|
|
if not repo_dir.exists() or not repo_dir.is_dir(): |
|
|
return [] |
|
|
|
|
|
|
|
|
full_name = repo_name.replace('___', '/') |
|
|
keyword = keyword_mapping.get(full_name) |
|
|
|
|
|
if not keyword: |
|
|
logger.debug(f"No keyword found for {full_name}, skipping") |
|
|
return [] |
|
|
|
|
|
results = [] |
|
|
|
|
|
|
|
|
try: |
|
|
for root, dirs, files in os.walk(repo_dir): |
|
|
|
|
|
dirs[:] = [d for d in dirs if d not in SKIP_DIRS and not d.startswith('.')] |
|
|
|
|
|
for file in files: |
|
|
file_path = Path(root) / file |
|
|
result = process_file(file_path, repo_name, keyword) |
|
|
if result: |
|
|
results.append(result) |
|
|
except Exception as e: |
|
|
logger.error(f"Error walking {repo_dir}: {e}") |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
class CSVWriterManager: |
|
|
"""Manager for CSV writers - handles opening, writing, and closing CSV files.""" |
|
|
|
|
|
def __init__(self, output_dir: Path): |
|
|
self.output_dir = output_dir |
|
|
self.writers = {} |
|
|
self.file_counts = defaultdict(int) |
|
|
self.fieldnames = ['keyword', 'repo_name', 'file_path', 'file_extension', |
|
|
'file_size', 'line_count', 'content', 'language'] |
|
|
|
|
|
def get_writer(self, keyword: str): |
|
|
"""Get or create a CSV writer for a keyword.""" |
|
|
if keyword not in self.writers: |
|
|
sanitized_keyword = sanitize_keyword(keyword) |
|
|
output_file = self.output_dir / f"dataset_{sanitized_keyword}.csv" |
|
|
|
|
|
file_handle = open(output_file, 'w', newline='', encoding='utf-8') |
|
|
writer = csv.DictWriter(file_handle, fieldnames=self.fieldnames, quoting=csv.QUOTE_ALL) |
|
|
writer.writeheader() |
|
|
|
|
|
self.writers[keyword] = (file_handle, writer) |
|
|
|
|
|
return self.writers[keyword][1] |
|
|
|
|
|
def write_row(self, keyword: str, row: Dict): |
|
|
"""Write a row to the appropriate CSV file.""" |
|
|
writer = self.get_writer(keyword) |
|
|
writer.writerow(row) |
|
|
self.file_counts[keyword] += 1 |
|
|
|
|
|
def close_all(self): |
|
|
"""Close all open file handles.""" |
|
|
for keyword, (file_handle, _) in self.writers.items(): |
|
|
file_handle.close() |
|
|
logger.info(f"Closed dataset_{sanitize_keyword(keyword)}.csv with {self.file_counts[keyword]} files") |
|
|
|
|
|
def get_stats(self) -> Tuple[int, int]: |
|
|
"""Return (total_keywords, total_files).""" |
|
|
return len(self.writers), sum(self.file_counts.values()) |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main function with streaming write to avoid memory issues.""" |
|
|
logger.info("Starting file export to CSV (streaming mode)") |
|
|
|
|
|
|
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
|
|
logger.info(f"Output directory: {OUTPUT_DIR}") |
|
|
|
|
|
|
|
|
keyword_mapping = load_keyword_mapping() |
|
|
|
|
|
|
|
|
logger.info("Scanning repository directories...") |
|
|
repo_dirs = [d.name for d in REPOS_FILTERED_DIR.iterdir() if d.is_dir()] |
|
|
logger.info(f"Found {len(repo_dirs)} repositories") |
|
|
|
|
|
|
|
|
csv_manager = CSVWriterManager(OUTPUT_DIR) |
|
|
|
|
|
|
|
|
logger.info("Processing repositories (streaming mode - writing as we go)...") |
|
|
|
|
|
total_files_processed = 0 |
|
|
repos_processed = 0 |
|
|
repos_with_no_keyword = 0 |
|
|
|
|
|
try: |
|
|
with tqdm(total=len(repo_dirs), desc="Processing repos") as pbar: |
|
|
for repo_name in repo_dirs: |
|
|
|
|
|
full_name = repo_name.replace('___', '/') |
|
|
keyword = keyword_mapping.get(full_name) |
|
|
|
|
|
if not keyword: |
|
|
repos_with_no_keyword += 1 |
|
|
pbar.update(1) |
|
|
continue |
|
|
|
|
|
|
|
|
results = process_repo(repo_name, keyword_mapping) |
|
|
|
|
|
if results: |
|
|
|
|
|
for result in results: |
|
|
csv_manager.write_row(result['keyword'], result) |
|
|
total_files_processed += 1 |
|
|
repos_processed += 1 |
|
|
|
|
|
pbar.update(1) |
|
|
|
|
|
|
|
|
if repos_processed > 0 and repos_processed % 1000 == 0: |
|
|
logger.info(f"Progress: {repos_processed} repos, {total_files_processed} files") |
|
|
|
|
|
finally: |
|
|
|
|
|
csv_manager.close_all() |
|
|
|
|
|
|
|
|
total_keywords, total_files = csv_manager.get_stats() |
|
|
|
|
|
logger.info("=" * 60) |
|
|
logger.info("Export completed!") |
|
|
logger.info(f"Repositories processed: {repos_processed}") |
|
|
logger.info(f"Repositories with no keyword mapping: {repos_with_no_keyword}") |
|
|
logger.info(f"Total keywords: {total_keywords}") |
|
|
logger.info(f"Total files exported: {total_files}") |
|
|
logger.info(f"Output directory: {OUTPUT_DIR}") |
|
|
logger.info("=" * 60) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|
|
|
|