| |
| import pandas as pd |
| import numpy as np |
| import os |
| import json |
| import multiprocessing as mp |
| from multiprocessing import Pool, cpu_count |
| from omegaconf import DictConfig |
| from pathlib import Path |
| from hydra.core.hydra_config import HydraConfig |
| import rootutils |
| from dpacman.utils import pylogger |
| import logging |
|
|
| root = rootutils.setup_root(__file__, indicator=".project-root", pythonpath=True) |
| logger = pylogger.RankedLogger(__name__, rank_zero_only=True) |
|
|
|
|
| def init_worker(log_file, logger_name): |
| """Initialize a logger in each worker.""" |
| wlogger = logging.getLogger(logger_name) |
| wlogger.setLevel(logging.INFO) |
| wlogger.propagate = False |
|
|
| |
| if not wlogger.handlers: |
| handler = logging.FileHandler(log_file) |
| formatter = logging.Formatter("%(asctime)s - %(message)s") |
| handler.setFormatter(formatter) |
| wlogger.addHandler(handler) |
|
|
| return wlogger |
|
|
|
|
| def process_chromosome( |
| chrom, df, json_dir, output_path, example_dir, save_example_files, log_dir |
| ): |
| log_file = Path(log_dir) / f"chrom_{chrom}.log" |
| logger_name = f"logger_chrom_{chrom}" |
| wlogger = init_worker(log_file, logger_name) |
|
|
| wlogger.info(f"Processing chromosome {chrom}") |
|
|
| sub_df = df[df["#chrom"] == chrom].reset_index(drop=True) |
| sub_out_dir = Path(output_path) / f"chr{chrom}" |
| os.makedirs(sub_out_dir, exist_ok=True) |
|
|
| seq_fasta_path = sub_out_dir / "to_scan.fa" |
| extract_sequences(sub_df, seq_fasta_path, json_dir, wlogger) |
|
|
| if save_example_files: |
| example_out = Path(example_dir) / f"example_chr{chrom}_to_scan.fa" |
| if not example_out.exists(): |
| with open(seq_fasta_path, "r") as f: |
| lines = f.readlines() |
| with open(example_out, "w") as f: |
| f.write("".join(lines[:50])) |
| wlogger.info(f"Saved example: {example_out}") |
|
|
|
|
| def assemble_main_input( |
| input_csv: str, window_total: int, output_csv: str, save_example_files: bool |
| ): |
| """ |
| Method for assembling the main input dataframe |
| |
| Args: |
| - input_csv: path to input csv that is converted to the file for FIMO input |
| - window_total: int determining the total non-ChIPseq-peak nucleotides included in a datapoint |
| - output_csv: where processed file will be saved |
| - save_example_files: bool determining whether we save example files that can be easily viewed |
| """ |
| |
| input_path = Path(root) / input_csv |
| df = pd.read_csv(input_path, sep="\t") |
| out = None |
|
|
| output_path = Path(root) / output_csv |
| os.makedirs(output_path.parent, exist_ok=True) |
|
|
| if not (os.path.exists(output_path)): |
| |
| df["chrom"] = df["chrom"].str.replace(r"^chr", "", regex=True) |
|
|
| valid = [str(i) for i in range(1, 23)] + ["X", "Y"] |
| df = df[df["chrom"].isin(valid)].reset_index(drop=True) |
|
|
| |
| df["tr_list"] = df["tr"].str.split(",") |
| df = df.explode("tr_list").rename(columns={"tr_list": "TR"}) |
| df["TR"] = df["TR"].str.strip() |
|
|
| |
| |
| n = len(df) |
| df["left_context"] = np.random.randint(0, window_total + 1, size=n) |
| df["right_context"] = window_total - df["left_context"] |
|
|
| |
| df["contextStart"] = ( |
| (df["chromStart"] - df["left_context"]).clip(lower=0).astype(int) |
| ) |
| df["contextEnd"] = (df["chromEnd"] + df["right_context"]).astype(int) |
|
|
| |
| out = df[ |
| [ |
| "chrom", |
| "contextStart", |
| "chromStart", |
| "chromEnd", |
| "contextEnd", |
| "score", |
| "TR", |
| ] |
| ].rename( |
| columns={ |
| "chrom": "#chrom", |
| "chromStart": "ChIPStart", |
| "chromEnd": "ChIPEnd", |
| "score": "chipscore", |
| } |
| ) |
|
|
| |
| out.to_csv(output_path, index=False) |
| logger.info(f"Wrote {len(out)} rows to {output_path}") |
|
|
| |
| if out is None: |
| out = pd.read_csv(output_path) |
|
|
| |
| if save_example_files: |
| example_dir = output_path.parent / "examples" |
| os.makedirs(example_dir, exist_ok=True) |
| output_csv_name = output_csv.split("/")[-1] |
| example_savepath = os.path.join(example_dir, "example500_" + output_csv_name) |
|
|
| if not (os.path.exists(example_savepath)): |
| out.sample(n=500, random_state=42).reset_index(drop=True).to_csv( |
| example_savepath, index=False |
| ) |
| logger.info( |
| f"Saved example FIMO input file with 500 rows to: {example_savepath}" |
| ) |
|
|
| return out |
|
|
|
|
| def load_chrom_dna(chrom, cache, json_dir): |
| """ |
| Load DNA from the chromosome that we pre-downloaded |
| """ |
| json_dir = Path(root) / json_dir |
| if chrom in cache: |
| return cache[chrom] |
| fname = os.path.join(json_dir, f"hg38_chr{chrom}.json") |
| if not os.path.isfile(fname): |
| raise FileNotFoundError(f"Chrom JSON not found: {fname}") |
| with open(fname) as f: |
| cache[chrom] = json.load(f)["dna"] |
| return cache[chrom] |
|
|
|
|
| def parallel_make_all_fasta_inputs( |
| df, json_dir, output_path, example_dir, save_example_files=True, max_workers=8 |
| ): |
| df["#chrom"] = df["#chrom"].astype(str) |
| chromosomes = df["#chrom"].unique().tolist() |
|
|
| log_dir = Path(HydraConfig.get().run.dir) / "logs" |
|
|
| os.makedirs(log_dir, exist_ok=True) |
| logger.info(f"Created {log_dir} for storing logs for subprocesses.") |
|
|
| os.makedirs(example_dir, exist_ok=True) |
| logger.info(f"Created {example_dir} for storing example inputs") |
|
|
| args = [ |
| (chrom, df, json_dir, output_path, example_dir, save_example_files, log_dir) |
| for chrom in chromosomes |
| ] |
|
|
| with mp.Pool(processes=max_workers) as pool: |
| pool.starmap(process_chromosome, args) |
|
|
|
|
| def extract_sequences(df, seq_fasta, json_dir, wlogger): |
| """ |
| Make the main sequence fasta for this chromosome. Used for building the background model. |
| """ |
| dna_cache = {} |
| n_rows = len(df) |
| checkpoints = set(int(n_rows * i / 100) for i in range(1, 101)) |
|
|
| wlogger.info(f"Writing to {seq_fasta}") |
| if not (os.path.exists(seq_fasta)): |
| with open(seq_fasta, "w") as fa: |
| for idx, row in df.iterrows(): |
| chrom = str(row["#chrom"]) |
| tr = str(row["TR"]) |
| dna = load_chrom_dna(chrom, dna_cache, json_dir) |
| start = int(row["contextStart"]) |
| end = int(row["contextEnd"]) |
| seq = dna[start:end] |
| header = f"{idx}_chr{chrom}_{tr}_{start}_{end}" |
| fa.write(f">{header}\n{seq}\n") |
|
|
| |
| if idx in checkpoints: |
| wlogger.info( |
| f" Reached {idx / n_rows:.0%} of the DataFrame (index {idx})" |
| ) |
|
|
|
|
| def main(cfg: DictConfig): |
| |
| paths = cfg.data_task.paths |
| df = assemble_main_input( |
| input_csv=paths.input_csv, |
| window_total=cfg.data_task.window_total, |
| output_csv=paths.output_csv, |
| save_example_files=cfg.data_task.save_example_files, |
| ) |
|
|
| |
| example_dir = Path(root) / paths.output_csv |
| example_dir = example_dir.parent / "examples" |
| os.makedirs(example_dir, exist_ok=True) |
|
|
| |
| total_chroms = len(df["#chrom"].unique().tolist()) |
| max_workers = cpu_count() - 1 |
| logger.info(f"Max workers available (cpu_count - 1): {max_workers}") |
| max_workers = min(max_workers, total_chroms) |
| logger.info(f"min(max_workers, total_chroms) = {max_workers}") |
|
|
| parallel_make_all_fasta_inputs( |
| df, |
| json_dir=paths.json_dir, |
| output_path=Path(root) / paths.chrom_output_path, |
| example_dir=example_dir, |
| save_example_files=cfg.data_task.save_example_files, |
| max_workers=max_workers, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|