DPACMAN / dpacman /data_tasks /fimo /pre_fimo.py
svincoff's picture
training works
29899b4
#!/usr/bin/env python3
import pandas as pd
import numpy as np
import os
import json
import multiprocessing as mp
from multiprocessing import Pool, cpu_count
from omegaconf import DictConfig
from pathlib import Path
from hydra.core.hydra_config import HydraConfig
import rootutils
from dpacman.utils import pylogger
import logging
root = rootutils.setup_root(__file__, indicator=".project-root", pythonpath=True)
logger = pylogger.RankedLogger(__name__, rank_zero_only=True)
def init_worker(log_file, logger_name):
"""Initialize a logger in each worker."""
wlogger = logging.getLogger(logger_name)
wlogger.setLevel(logging.INFO)
wlogger.propagate = False # Don't double-log to root
# Avoid re-adding handlers if this logger is reused
if not wlogger.handlers:
handler = logging.FileHandler(log_file)
formatter = logging.Formatter("%(asctime)s - %(message)s")
handler.setFormatter(formatter)
wlogger.addHandler(handler)
return wlogger
def process_chromosome(
chrom, df, json_dir, output_path, example_dir, save_example_files, log_dir
):
log_file = Path(log_dir) / f"chrom_{chrom}.log"
logger_name = f"logger_chrom_{chrom}"
wlogger = init_worker(log_file, logger_name)
wlogger.info(f"Processing chromosome {chrom}")
sub_df = df[df["#chrom"] == chrom].reset_index(drop=True)
sub_out_dir = Path(output_path) / f"chr{chrom}"
os.makedirs(sub_out_dir, exist_ok=True)
seq_fasta_path = sub_out_dir / "to_scan.fa"
extract_sequences(sub_df, seq_fasta_path, json_dir, wlogger)
if save_example_files:
example_out = Path(example_dir) / f"example_chr{chrom}_to_scan.fa"
if not example_out.exists():
with open(seq_fasta_path, "r") as f:
lines = f.readlines()
with open(example_out, "w") as f:
f.write("".join(lines[:50]))
wlogger.info(f"Saved example: {example_out}")
def assemble_main_input(
input_csv: str, window_total: int, output_csv: str, save_example_files: bool
):
"""
Method for assembling the main input dataframe
Args:
- input_csv: path to input csv that is converted to the file for FIMO input
- window_total: int determining the total non-ChIPseq-peak nucleotides included in a datapoint
- output_csv: where processed file will be saved
- save_example_files: bool determining whether we save example files that can be easily viewed
"""
# 1) make input and output paths
input_path = Path(root) / input_csv
df = pd.read_csv(input_path, sep="\t")
out = None # initialize out
output_path = Path(root) / output_csv
os.makedirs(output_path.parent, exist_ok=True)
if not (os.path.exists(output_path)):
# 2) normalize chromosomes and exclude non-whole chromosomes
df["chrom"] = df["chrom"].str.replace(r"^chr", "", regex=True)
valid = [str(i) for i in range(1, 23)] + ["X", "Y"]
df = df[df["chrom"].isin(valid)].reset_index(drop=True)
# 3) explode TF names
df["tr_list"] = df["tr"].str.split(",")
df = df.explode("tr_list").rename(columns={"tr_list": "TR"})
df["TR"] = df["TR"].str.strip()
# 4) draw a random left‐flank between 0 and WINDOW_TOTAL,
# then right‐flank is whatever remains to sum to WINDOW_TOTAL
n = len(df)
df["left_context"] = np.random.randint(0, window_total + 1, size=n)
df["right_context"] = window_total - df["left_context"]
# 5) compute contextStart / contextEnd
df["contextStart"] = (
(df["chromStart"] - df["left_context"]).clip(lower=0).astype(int)
)
df["contextEnd"] = (df["chromEnd"] + df["right_context"]).astype(int)
# 6) assemble output
out = df[
[
"chrom",
"contextStart",
"chromStart", # original ChIPStart
"chromEnd", # original ChIPEnd
"contextEnd",
"score", # original score column
"TR",
]
].rename(
columns={
"chrom": "#chrom",
"chromStart": "ChIPStart",
"chromEnd": "ChIPEnd",
"score": "chipscore",
}
)
# 8) write csv
out.to_csv(output_path, index=False)
logger.info(f"Wrote {len(out)} rows to {output_path}")
# Load the DF if we need
if out is None:
out = pd.read_csv(output_path)
# 9) write example csv if necessary
if save_example_files:
example_dir = output_path.parent / "examples"
os.makedirs(example_dir, exist_ok=True)
output_csv_name = output_csv.split("/")[-1]
example_savepath = os.path.join(example_dir, "example500_" + output_csv_name)
if not (os.path.exists(example_savepath)):
out.sample(n=500, random_state=42).reset_index(drop=True).to_csv(
example_savepath, index=False
)
logger.info(
f"Saved example FIMO input file with 500 rows to: {example_savepath}"
)
return out
def load_chrom_dna(chrom, cache, json_dir):
"""
Load DNA from the chromosome that we pre-downloaded
"""
json_dir = Path(root) / json_dir
if chrom in cache:
return cache[chrom]
fname = os.path.join(json_dir, f"hg38_chr{chrom}.json")
if not os.path.isfile(fname):
raise FileNotFoundError(f"Chrom JSON not found: {fname}")
with open(fname) as f:
cache[chrom] = json.load(f)["dna"]
return cache[chrom]
def parallel_make_all_fasta_inputs(
df, json_dir, output_path, example_dir, save_example_files=True, max_workers=8
):
df["#chrom"] = df["#chrom"].astype(str)
chromosomes = df["#chrom"].unique().tolist()
log_dir = Path(HydraConfig.get().run.dir) / "logs"
os.makedirs(log_dir, exist_ok=True)
logger.info(f"Created {log_dir} for storing logs for subprocesses.")
os.makedirs(example_dir, exist_ok=True)
logger.info(f"Created {example_dir} for storing example inputs")
args = [
(chrom, df, json_dir, output_path, example_dir, save_example_files, log_dir)
for chrom in chromosomes
]
with mp.Pool(processes=max_workers) as pool:
pool.starmap(process_chromosome, args)
def extract_sequences(df, seq_fasta, json_dir, wlogger):
"""
Make the main sequence fasta for this chromosome. Used for building the background model.
"""
dna_cache = {}
n_rows = len(df)
checkpoints = set(int(n_rows * i / 100) for i in range(1, 101)) # 1% to 100%
wlogger.info(f"Writing to {seq_fasta}")
if not (os.path.exists(seq_fasta)):
with open(seq_fasta, "w") as fa:
for idx, row in df.iterrows():
chrom = str(row["#chrom"])
tr = str(row["TR"])
dna = load_chrom_dna(chrom, dna_cache, json_dir)
start = int(row["contextStart"])
end = int(row["contextEnd"])
seq = dna[start:end] # end index is not included in ChIP-seq peaks
header = f"{idx}_chr{chrom}_{tr}_{start}_{end}"
fa.write(f">{header}\n{seq}\n")
# log every 1%
if idx in checkpoints:
wlogger.info(
f" Reached {idx / n_rows:.0%} of the DataFrame (index {idx})"
)
def main(cfg: DictConfig):
# 1) make the full input CSV
paths = cfg.data_task.paths
df = assemble_main_input(
input_csv=paths.input_csv,
window_total=cfg.data_task.window_total,
output_csv=paths.output_csv,
save_example_files=cfg.data_task.save_example_files,
)
# Make example dir to use in future methods
example_dir = Path(root) / paths.output_csv
example_dir = example_dir.parent / "examples"
os.makedirs(example_dir, exist_ok=True)
# 2) Make individual input files per chromosome
total_chroms = len(df["#chrom"].unique().tolist())
max_workers = cpu_count() - 1
logger.info(f"Max workers available (cpu_count - 1): {max_workers}")
max_workers = min(max_workers, total_chroms)
logger.info(f"min(max_workers, total_chroms) = {max_workers}")
parallel_make_all_fasta_inputs(
df,
json_dir=paths.json_dir,
output_path=Path(root) / paths.chrom_output_path,
example_dir=example_dir,
save_example_files=cfg.data_task.save_example_files,
max_workers=max_workers,
)
if __name__ == "__main__":
main()