#!/usr/bin/env python3 import os import json import subprocess import pandas as pd from multiprocessing import Pool, cpu_count from tqdm import tqdm import rootutils import logging from omegaconf import DictConfig from pathlib import Path import time import shutil from hydra.core.hydra_config import HydraConfig import rootutils from dpacman.utils import pylogger root = rootutils.setup_root(__file__, indicator=".project-root", pythonpath=True) logger = pylogger.RankedLogger(__name__, rank_zero_only=True) def run_markov(fasta_get_markov, seq_fasta, bg_model): subprocess.check_call( [fasta_get_markov, seq_fasta, bg_model], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) def split_fasta( n_chunks, input_file, output_dir, debug=False, debug_n=1000, all_caps=True ): """ Round-robin split SEQ_FASTA into chunked FASTA files. If in debug mode, only keep the first 5 entries for each. """ output_dir = Path(root) / output_dir out_names = [os.path.join(output_dir, f"to_scan_{i}.fa") for i in range(n_chunks)] out_handles = [open(out_names[i], "w") for i in range(n_chunks)] chunk_counts = [0] * n_chunks # Count sequences per chunk logger.info(f"ALL CAPS mode: {all_caps}") with open(input_file) as inf: header = None seq_lines = [] for line in inf: if line.startswith(">"): if header is not None: idx = int(header[1:].split("_")[0]) % n_chunks if not debug or chunk_counts[idx] < debug_n: out_handles[idx].write(header) seqj = "".join(seq_lines) if all_caps: seqj = seqj.upper() out_handles[idx].write(seqj) chunk_counts[idx] += 1 header = line seq_lines = [] else: seq_lines.append(line) # last record if header is not None: idx = int(header[1:].split("_")[0]) % n_chunks if not debug or chunk_counts[idx] < debug_n: out_handles[idx].write(header) seqj = "".join(seq_lines) if all_caps: seqj = seqj.upper() out_handles[idx].write(seqj) chunk_counts[idx] += 1 for o in out_handles: o.close() # Log chunk sizes for i, count in enumerate(chunk_counts): logger.info(f"Chunk {i}: {count} sequences") return out_names def run_fimo_chunk(cfg): """ Run FIMO for a chunk. Args: cfg: dict with keys: - chunk_id - fasta_path - fimo_outdir - fimo_bin - bg_model - max_stored - motif_file - thresh - thresh_mode - outdir """ chunk_id = cfg["chunk_id"] log_dir = Path(HydraConfig.get().run.dir) / "logs" log_dir.mkdir(parents=True, exist_ok=True) log_file = log_dir / f"fimo_chunk_{chunk_id}.log" wlogger = logging.getLogger(f"fimo_chunk_{chunk_id}") wlogger.setLevel(logging.DEBUG) wlogger.propagate = False # Don't double-log to root outdir = Path(cfg["outdir"]) os.makedirs(outdir, exist_ok=True) if not any(isinstance(h, logging.FileHandler) for h in wlogger.handlers): fh = logging.FileHandler(log_file, mode="w", encoding="utf-8") fh.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) wlogger.addHandler(fh) # make an output directory for this chromosome wlogger.info(f"Chunk {cfg['chunk_id']} starting FIMO") wlogger.info(f"Threshold mode: {cfg['thresh_mode']}") try: call_list = [ cfg["fimo_bin"], "--oc", outdir, "--bfile", cfg["bg_model"], "--max-stored-scores", str(cfg["max_stored"]), "--thresh", str(cfg["thresh"]), "--qv-thresh", # threshold on q-value "--no-pgc", # suppress parsing of genomic coordinates in FASTA sequence header cfg["motif_file"], cfg["fasta_path"], ] if cfg["thresh_mode"] != "q": call_list = [x for x in call_list if x != "--qv-thresh"] assert "--qv-thresh" not in call_list with open(log_file, "a") as log_fh: subprocess.check_call( call_list, stdout=log_fh, stderr=log_fh, ) wlogger.info(f"\tChunk {cfg['chunk_id']} finished") # Delete the file - gotta save space! file_path = Path(cfg["fasta_path"]) if file_path.exists() and file_path.is_file(): file_path.unlink() wlogger.info(f"\tDeleted file: {file_path}") except subprocess.CalledProcessError as e: wlogger.error(f"\tChunk {chunk_id}: FIMO failed with error code {e.returncode}") raise return os.path.join(outdir, f"fimo.tsv") def annotate_with_fimo(df, fdf): df = df.reset_index().rename(columns={"index": "idx"}) df["sequence_name"] = ( df["idx"].astype(str) + "_chr" + df["#chrom"] + "_" + df["TR"] + "_" + df["contextStart"].astype(str) + "_" + df["contextEnd"].astype(str) ) # construt it the same way as headers # Crucial: filter FDF results to only rows where the TF whose motif was found actually matches the TF that was detected there. fdf["input_tr"] = fdf["sequence_name"].str.split("_", expand=True)[2] true_matches = fdf.loc[fdf["motif_alt_id"] == fdf["input_tr"]].reset_index( drop=True ) logger.info(f"Length of full returned FIMO results: {len(fdf)}") logger.info( f"Length of true matches, where the FIMO tr and the input tr match: {len(true_matches)}" ) true_matches = true_matches.merge( df[["sequence_name", "contextStart"]], on="sequence_name", how="left" ) true_matches["genomic_start"] = ( true_matches["contextStart"] + true_matches["start"] - 1 ) true_matches["genomic_end"] = true_matches["contextStart"] + true_matches["stop"] true_matches["coord"] = ( true_matches["genomic_start"].astype(str) + "-" + true_matches["genomic_end"].astype(str) ) agg = true_matches.groupby("sequence_name")["coord"].agg( lambda hits: ",".join(hits) ) df["jaspar"] = df["sequence_name"].map(agg).fillna("") return df def main(cfg: DictConfig): """ Main method for running FIMO analysis, searching JASPAR motifs against ChIP-seq peaks """ # 0) configs paths = cfg.data_task.paths fimo = cfg.data_task.fimo meme = cfg.data_task.meme # set njobs to max or whatever # is specified by user njobs = fimo.njobs if njobs == "max": njobs = cpu_count() else: njobs = min(cpu_count(), int(njobs)) # 1) Optionally, acitvate test mode # ── TEST MODE: extract just chromosome 1 to benchmark a smaller job ── chroms = [str(x) for x in cfg.data_task.chroms] logger.info(f"Debug setting: {cfg.data_task.debug}") if cfg.data_task.debug: chroms = chroms[0:1] logging.info(f" DEBUG MODE: running on only one chromosome: {chroms}") # 2) extract sequences & build BG model for chrom in chroms: path_to_fasta = ( Path(root) / Path(paths.input_fasta_outer_dir) / f"chr{chrom}" / paths.seq_fasta ) path_to_bg = ( Path(root) / Path(paths.input_fasta_outer_dir) / f"chr{chrom}" / paths.bg_model ) logging.info(f"Path to fasta file: {path_to_fasta}") logger.info(f"Building background model at {path_to_bg}…") run_markov( Path(root) / meme.fasta_get_markov, path_to_fasta, Path(root) / path_to_bg ) # 3) chunk FASTA and run FIMO in parallel # make a folder to store the split fastas chunk_folder = Path(path_to_fasta.parent) / "chunks" os.makedirs(chunk_folder, exist_ok=True) logger.info(f"Made directory {chunk_folder} to store {njobs} chunked fastas") chunks = split_fasta( njobs, input_file=path_to_fasta, output_dir=chunk_folder, debug=cfg.data_task.debug, all_caps=cfg.data_task.all_caps, ) chrom_outdir = Path(root) / paths.fimo_outdir / f"chrom{chrom}" os.makedirs(chrom_outdir, exist_ok=True) chunk_cfgs = [ dict( chunk_id=i, fasta_path=chunk, fimo_outdir=Path(root) / paths.fimo_outdir, fimo_bin=Path(root) / meme.fimo_bin, bg_model=path_to_bg, max_stored=fimo.max_stored, motif_file=Path(root) / meme.jaspar_motif_file, thresh=fimo.thresh, thresh_mode=fimo.thresh_mode, outdir=Path(chrom_outdir) / f"chunk{i}", ) for i, chunk in enumerate(chunks) ] logger.info(f"Running FIMO in parallel ({njobs} jobs)…") start_time = time.time() # Call the parallel jobs and get back a list of tsv paths with Pool(njobs) as pool: tsv_paths = pool.map(run_fimo_chunk, chunk_cfgs) end_time = time.time() logger.info( f"COMPLETED FIMO ({njobs} parallel jobs) in {end_time-start_time:.2f}s" ) # cleanup! delete the chunked input files if not any(chunk_folder.iterdir()): # Empty folder chunk_folder.rmdir() logger.info(f"Deleted empty folder: {chunk_folder}") # 4) merge chunked TSVs. Some may be empty, so can't do a simple loop # delete intermediate folders as we go dfs = [] for tsv in tsv_paths: try: df = pd.read_csv(tsv, sep="\t", comment="#") if not df.empty: dfs.append(df) except pd.errors.EmptyDataError: logger.warning(f"Skipped empty TSV (only comments or blank): {tsv}") except Exception as e: logger.error(f"Error reading {tsv}: {e}") raise # Or continue, depending on your needs # delete this folder to save storage chunk_dir = Path(tsv).parent try: shutil.rmtree(chunk_dir) logger.info(f"Deleted chunk directory: {chunk_dir}") except Exception as e: logger.warning(f"Could not delete chunk dir {chunk_dir}: {e}") combined = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame() # 5) annotate & write final CSV df = pd.read_csv(Path(root) / paths.input_csv, low_memory=False) df["#chrom"] = df["#chrom"].astype(str) df = df.loc[df["#chrom"] == chrom].reset_index(drop=True) output_full_csv_path = Path(root) / chrom_outdir / f"fimo_annotations.csv" combined.to_csv(output_full_csv_path, index=False) logger.info( f"Merging FIMO results into input DataFrame, which has {len(df)} rows for chromosome {chrom}" ) df = annotate_with_fimo(df, combined) final = df[ [ "#chrom", "contextStart", "ChIPStart", "ChIPEnd", "contextEnd", "chipscore", "TR", "jaspar", ] ] output_csv_path = Path(root) / chrom_outdir / f"final.csv" final.to_csv(output_csv_path, index=False) logger.info(f"Wrote {len(final)} rows to {output_csv_path}") if __name__ == "__main__": main()