#!/usr/bin/env python3 import pandas as pd import numpy as np import os import json import multiprocessing as mp from multiprocessing import Pool, cpu_count from omegaconf import DictConfig from pathlib import Path from hydra.core.hydra_config import HydraConfig import rootutils from dpacman.utils import pylogger import logging root = rootutils.setup_root(__file__, indicator=".project-root", pythonpath=True) logger = pylogger.RankedLogger(__name__, rank_zero_only=True) def init_worker(log_file, logger_name): """Initialize a logger in each worker.""" wlogger = logging.getLogger(logger_name) wlogger.setLevel(logging.INFO) wlogger.propagate = False # Don't double-log to root # Avoid re-adding handlers if this logger is reused if not wlogger.handlers: handler = logging.FileHandler(log_file) formatter = logging.Formatter("%(asctime)s - %(message)s") handler.setFormatter(formatter) wlogger.addHandler(handler) return wlogger def process_chromosome( chrom, df, json_dir, output_path, example_dir, save_example_files, log_dir ): log_file = Path(log_dir) / f"chrom_{chrom}.log" logger_name = f"logger_chrom_{chrom}" wlogger = init_worker(log_file, logger_name) wlogger.info(f"Processing chromosome {chrom}") sub_df = df[df["#chrom"] == chrom].reset_index(drop=True) sub_out_dir = Path(output_path) / f"chr{chrom}" os.makedirs(sub_out_dir, exist_ok=True) seq_fasta_path = sub_out_dir / "to_scan.fa" extract_sequences(sub_df, seq_fasta_path, json_dir, wlogger) if save_example_files: example_out = Path(example_dir) / f"example_chr{chrom}_to_scan.fa" if not example_out.exists(): with open(seq_fasta_path, "r") as f: lines = f.readlines() with open(example_out, "w") as f: f.write("".join(lines[:50])) wlogger.info(f"Saved example: {example_out}") def assemble_main_input( input_csv: str, window_total: int, output_csv: str, save_example_files: bool ): """ Method for assembling the main input dataframe Args: - input_csv: path to input csv that is converted to the file for FIMO input - window_total: int determining the total non-ChIPseq-peak nucleotides included in a datapoint - output_csv: where processed file will be saved - save_example_files: bool determining whether we save example files that can be easily viewed """ # 1) make input and output paths input_path = Path(root) / input_csv df = pd.read_csv(input_path, sep="\t") out = None # initialize out output_path = Path(root) / output_csv os.makedirs(output_path.parent, exist_ok=True) if not (os.path.exists(output_path)): # 2) normalize chromosomes and exclude non-whole chromosomes df["chrom"] = df["chrom"].str.replace(r"^chr", "", regex=True) valid = [str(i) for i in range(1, 23)] + ["X", "Y"] df = df[df["chrom"].isin(valid)].reset_index(drop=True) # 3) explode TF names df["tr_list"] = df["tr"].str.split(",") df = df.explode("tr_list").rename(columns={"tr_list": "TR"}) df["TR"] = df["TR"].str.strip() # 4) draw a random left‐flank between 0 and WINDOW_TOTAL, # then right‐flank is whatever remains to sum to WINDOW_TOTAL n = len(df) df["left_context"] = np.random.randint(0, window_total + 1, size=n) df["right_context"] = window_total - df["left_context"] # 5) compute contextStart / contextEnd df["contextStart"] = ( (df["chromStart"] - df["left_context"]).clip(lower=0).astype(int) ) df["contextEnd"] = (df["chromEnd"] + df["right_context"]).astype(int) # 6) assemble output out = df[ [ "chrom", "contextStart", "chromStart", # original ChIPStart "chromEnd", # original ChIPEnd "contextEnd", "score", # original score column "TR", ] ].rename( columns={ "chrom": "#chrom", "chromStart": "ChIPStart", "chromEnd": "ChIPEnd", "score": "chipscore", } ) # 8) write csv out.to_csv(output_path, index=False) logger.info(f"Wrote {len(out)} rows to {output_path}") # Load the DF if we need if out is None: out = pd.read_csv(output_path) # 9) write example csv if necessary if save_example_files: example_dir = output_path.parent / "examples" os.makedirs(example_dir, exist_ok=True) output_csv_name = output_csv.split("/")[-1] example_savepath = os.path.join(example_dir, "example500_" + output_csv_name) if not (os.path.exists(example_savepath)): out.sample(n=500, random_state=42).reset_index(drop=True).to_csv( example_savepath, index=False ) logger.info( f"Saved example FIMO input file with 500 rows to: {example_savepath}" ) return out def load_chrom_dna(chrom, cache, json_dir): """ Load DNA from the chromosome that we pre-downloaded """ json_dir = Path(root) / json_dir if chrom in cache: return cache[chrom] fname = os.path.join(json_dir, f"hg38_chr{chrom}.json") if not os.path.isfile(fname): raise FileNotFoundError(f"Chrom JSON not found: {fname}") with open(fname) as f: cache[chrom] = json.load(f)["dna"] return cache[chrom] def parallel_make_all_fasta_inputs( df, json_dir, output_path, example_dir, save_example_files=True, max_workers=8 ): df["#chrom"] = df["#chrom"].astype(str) chromosomes = df["#chrom"].unique().tolist() log_dir = Path(HydraConfig.get().run.dir) / "logs" os.makedirs(log_dir, exist_ok=True) logger.info(f"Created {log_dir} for storing logs for subprocesses.") os.makedirs(example_dir, exist_ok=True) logger.info(f"Created {example_dir} for storing example inputs") args = [ (chrom, df, json_dir, output_path, example_dir, save_example_files, log_dir) for chrom in chromosomes ] with mp.Pool(processes=max_workers) as pool: pool.starmap(process_chromosome, args) def extract_sequences(df, seq_fasta, json_dir, wlogger): """ Make the main sequence fasta for this chromosome. Used for building the background model. """ dna_cache = {} n_rows = len(df) checkpoints = set(int(n_rows * i / 100) for i in range(1, 101)) # 1% to 100% wlogger.info(f"Writing to {seq_fasta}") if not (os.path.exists(seq_fasta)): with open(seq_fasta, "w") as fa: for idx, row in df.iterrows(): chrom = str(row["#chrom"]) tr = str(row["TR"]) dna = load_chrom_dna(chrom, dna_cache, json_dir) start = int(row["contextStart"]) end = int(row["contextEnd"]) seq = dna[start:end] # end index is not included in ChIP-seq peaks header = f"{idx}_chr{chrom}_{tr}_{start}_{end}" fa.write(f">{header}\n{seq}\n") # log every 1% if idx in checkpoints: wlogger.info( f" Reached {idx / n_rows:.0%} of the DataFrame (index {idx})" ) def main(cfg: DictConfig): # 1) make the full input CSV paths = cfg.data_task.paths df = assemble_main_input( input_csv=paths.input_csv, window_total=cfg.data_task.window_total, output_csv=paths.output_csv, save_example_files=cfg.data_task.save_example_files, ) # Make example dir to use in future methods example_dir = Path(root) / paths.output_csv example_dir = example_dir.parent / "examples" os.makedirs(example_dir, exist_ok=True) # 2) Make individual input files per chromosome total_chroms = len(df["#chrom"].unique().tolist()) max_workers = cpu_count() - 1 logger.info(f"Max workers available (cpu_count - 1): {max_workers}") max_workers = min(max_workers, total_chroms) logger.info(f"min(max_workers, total_chroms) = {max_workers}") parallel_make_all_fasta_inputs( df, json_dir=paths.json_dir, output_path=Path(root) / paths.chrom_output_path, example_dir=example_dir, save_example_files=cfg.data_task.save_example_files, max_workers=max_workers, ) if __name__ == "__main__": main()