| |
| import os |
| import json |
| import subprocess |
| import pandas as pd |
| from multiprocessing import Pool, cpu_count |
| from tqdm import tqdm |
| import rootutils |
| import logging |
| from omegaconf import DictConfig |
| from pathlib import Path |
| import time |
| import shutil |
| from hydra.core.hydra_config import HydraConfig |
| import rootutils |
| from dpacman.utils import pylogger |
|
|
| root = rootutils.setup_root(__file__, indicator=".project-root", pythonpath=True) |
| logger = pylogger.RankedLogger(__name__, rank_zero_only=True) |
|
|
|
|
| def run_markov(fasta_get_markov, seq_fasta, bg_model): |
| subprocess.check_call( |
| [fasta_get_markov, seq_fasta, bg_model], |
| stdout=subprocess.DEVNULL, |
| stderr=subprocess.DEVNULL, |
| ) |
|
|
|
|
| def split_fasta( |
| n_chunks, input_file, output_dir, debug=False, debug_n=1000, all_caps=True |
| ): |
| """ |
| Round-robin split SEQ_FASTA into chunked FASTA files. |
| If in debug mode, only keep the first 5 entries for each. |
| """ |
| output_dir = Path(root) / output_dir |
| out_names = [os.path.join(output_dir, f"to_scan_{i}.fa") for i in range(n_chunks)] |
| out_handles = [open(out_names[i], "w") for i in range(n_chunks)] |
| chunk_counts = [0] * n_chunks |
|
|
| logger.info(f"ALL CAPS mode: {all_caps}") |
|
|
| with open(input_file) as inf: |
| header = None |
| seq_lines = [] |
|
|
| for line in inf: |
| if line.startswith(">"): |
| if header is not None: |
| idx = int(header[1:].split("_")[0]) % n_chunks |
| if not debug or chunk_counts[idx] < debug_n: |
| out_handles[idx].write(header) |
| seqj = "".join(seq_lines) |
| if all_caps: |
| seqj = seqj.upper() |
| out_handles[idx].write(seqj) |
| chunk_counts[idx] += 1 |
| header = line |
| seq_lines = [] |
| else: |
| seq_lines.append(line) |
|
|
| |
| if header is not None: |
| idx = int(header[1:].split("_")[0]) % n_chunks |
| if not debug or chunk_counts[idx] < debug_n: |
| out_handles[idx].write(header) |
| seqj = "".join(seq_lines) |
| if all_caps: |
| seqj = seqj.upper() |
| out_handles[idx].write(seqj) |
| chunk_counts[idx] += 1 |
|
|
| for o in out_handles: |
| o.close() |
|
|
| |
| for i, count in enumerate(chunk_counts): |
| logger.info(f"Chunk {i}: {count} sequences") |
|
|
| return out_names |
|
|
|
|
| def run_fimo_chunk(cfg): |
| """ |
| Run FIMO for a chunk. |
| Args: |
| cfg: dict with keys: |
| - chunk_id |
| - fasta_path |
| - fimo_outdir |
| - fimo_bin |
| - bg_model |
| - max_stored |
| - motif_file |
| - thresh |
| - thresh_mode |
| - outdir |
| """ |
| chunk_id = cfg["chunk_id"] |
| log_dir = Path(HydraConfig.get().run.dir) / "logs" |
| log_dir.mkdir(parents=True, exist_ok=True) |
|
|
| log_file = log_dir / f"fimo_chunk_{chunk_id}.log" |
| wlogger = logging.getLogger(f"fimo_chunk_{chunk_id}") |
| wlogger.setLevel(logging.DEBUG) |
| wlogger.propagate = False |
|
|
| outdir = Path(cfg["outdir"]) |
| os.makedirs(outdir, exist_ok=True) |
|
|
| if not any(isinstance(h, logging.FileHandler) for h in wlogger.handlers): |
| fh = logging.FileHandler(log_file, mode="w", encoding="utf-8") |
| fh.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) |
| wlogger.addHandler(fh) |
|
|
| |
| wlogger.info(f"Chunk {cfg['chunk_id']} starting FIMO") |
| wlogger.info(f"Threshold mode: {cfg['thresh_mode']}") |
|
|
| try: |
| call_list = [ |
| cfg["fimo_bin"], |
| "--oc", |
| outdir, |
| "--bfile", |
| cfg["bg_model"], |
| "--max-stored-scores", |
| str(cfg["max_stored"]), |
| "--thresh", |
| str(cfg["thresh"]), |
| "--qv-thresh", |
| "--no-pgc", |
| cfg["motif_file"], |
| cfg["fasta_path"], |
| ] |
| if cfg["thresh_mode"] != "q": |
| call_list = [x for x in call_list if x != "--qv-thresh"] |
| assert "--qv-thresh" not in call_list |
| with open(log_file, "a") as log_fh: |
| subprocess.check_call( |
| call_list, |
| stdout=log_fh, |
| stderr=log_fh, |
| ) |
| wlogger.info(f"\tChunk {cfg['chunk_id']} finished") |
|
|
| |
| file_path = Path(cfg["fasta_path"]) |
| if file_path.exists() and file_path.is_file(): |
| file_path.unlink() |
| wlogger.info(f"\tDeleted file: {file_path}") |
|
|
| except subprocess.CalledProcessError as e: |
| wlogger.error(f"\tChunk {chunk_id}: FIMO failed with error code {e.returncode}") |
| raise |
| return os.path.join(outdir, f"fimo.tsv") |
|
|
|
|
| def annotate_with_fimo(df, fdf): |
| df = df.reset_index().rename(columns={"index": "idx"}) |
| df["sequence_name"] = ( |
| df["idx"].astype(str) |
| + "_chr" |
| + df["#chrom"] |
| + "_" |
| + df["TR"] |
| + "_" |
| + df["contextStart"].astype(str) |
| + "_" |
| + df["contextEnd"].astype(str) |
| ) |
|
|
| |
| fdf["input_tr"] = fdf["sequence_name"].str.split("_", expand=True)[2] |
| true_matches = fdf.loc[fdf["motif_alt_id"] == fdf["input_tr"]].reset_index( |
| drop=True |
| ) |
| logger.info(f"Length of full returned FIMO results: {len(fdf)}") |
| logger.info( |
| f"Length of true matches, where the FIMO tr and the input tr match: {len(true_matches)}" |
| ) |
|
|
| true_matches = true_matches.merge( |
| df[["sequence_name", "contextStart"]], on="sequence_name", how="left" |
| ) |
| true_matches["genomic_start"] = ( |
| true_matches["contextStart"] + true_matches["start"] - 1 |
| ) |
| true_matches["genomic_end"] = true_matches["contextStart"] + true_matches["stop"] |
| true_matches["coord"] = ( |
| true_matches["genomic_start"].astype(str) |
| + "-" |
| + true_matches["genomic_end"].astype(str) |
| ) |
|
|
| agg = true_matches.groupby("sequence_name")["coord"].agg( |
| lambda hits: ",".join(hits) |
| ) |
| df["jaspar"] = df["sequence_name"].map(agg).fillna("") |
| return df |
|
|
|
|
| def main(cfg: DictConfig): |
| """ |
| Main method for running FIMO analysis, searching JASPAR motifs against ChIP-seq peaks |
| """ |
| |
| paths = cfg.data_task.paths |
| fimo = cfg.data_task.fimo |
| meme = cfg.data_task.meme |
|
|
| |
| njobs = fimo.njobs |
| if njobs == "max": |
| njobs = cpu_count() |
| else: |
| njobs = min(cpu_count(), int(njobs)) |
|
|
| |
| |
| chroms = [str(x) for x in cfg.data_task.chroms] |
| logger.info(f"Debug setting: {cfg.data_task.debug}") |
| if cfg.data_task.debug: |
| chroms = chroms[0:1] |
| logging.info(f" DEBUG MODE: running on only one chromosome: {chroms}") |
|
|
| |
| for chrom in chroms: |
| path_to_fasta = ( |
| Path(root) |
| / Path(paths.input_fasta_outer_dir) |
| / f"chr{chrom}" |
| / paths.seq_fasta |
| ) |
| path_to_bg = ( |
| Path(root) |
| / Path(paths.input_fasta_outer_dir) |
| / f"chr{chrom}" |
| / paths.bg_model |
| ) |
| logging.info(f"Path to fasta file: {path_to_fasta}") |
| logger.info(f"Building background model at {path_to_bg}…") |
| run_markov( |
| Path(root) / meme.fasta_get_markov, path_to_fasta, Path(root) / path_to_bg |
| ) |
|
|
| |
| |
| chunk_folder = Path(path_to_fasta.parent) / "chunks" |
| os.makedirs(chunk_folder, exist_ok=True) |
| logger.info(f"Made directory {chunk_folder} to store {njobs} chunked fastas") |
| chunks = split_fasta( |
| njobs, |
| input_file=path_to_fasta, |
| output_dir=chunk_folder, |
| debug=cfg.data_task.debug, |
| all_caps=cfg.data_task.all_caps, |
| ) |
|
|
| chrom_outdir = Path(root) / paths.fimo_outdir / f"chrom{chrom}" |
| os.makedirs(chrom_outdir, exist_ok=True) |
|
|
| chunk_cfgs = [ |
| dict( |
| chunk_id=i, |
| fasta_path=chunk, |
| fimo_outdir=Path(root) / paths.fimo_outdir, |
| fimo_bin=Path(root) / meme.fimo_bin, |
| bg_model=path_to_bg, |
| max_stored=fimo.max_stored, |
| motif_file=Path(root) / meme.jaspar_motif_file, |
| thresh=fimo.thresh, |
| thresh_mode=fimo.thresh_mode, |
| outdir=Path(chrom_outdir) / f"chunk{i}", |
| ) |
| for i, chunk in enumerate(chunks) |
| ] |
| logger.info(f"Running FIMO in parallel ({njobs} jobs)…") |
| start_time = time.time() |
| |
| with Pool(njobs) as pool: |
| tsv_paths = pool.map(run_fimo_chunk, chunk_cfgs) |
| end_time = time.time() |
| logger.info( |
| f"COMPLETED FIMO ({njobs} parallel jobs) in {end_time-start_time:.2f}s" |
| ) |
| |
| if not any(chunk_folder.iterdir()): |
| chunk_folder.rmdir() |
| logger.info(f"Deleted empty folder: {chunk_folder}") |
|
|
| |
| |
| dfs = [] |
| for tsv in tsv_paths: |
| try: |
| df = pd.read_csv(tsv, sep="\t", comment="#") |
| if not df.empty: |
| dfs.append(df) |
| except pd.errors.EmptyDataError: |
| logger.warning(f"Skipped empty TSV (only comments or blank): {tsv}") |
| except Exception as e: |
| logger.error(f"Error reading {tsv}: {e}") |
| raise |
|
|
| |
| chunk_dir = Path(tsv).parent |
| try: |
| shutil.rmtree(chunk_dir) |
| logger.info(f"Deleted chunk directory: {chunk_dir}") |
| except Exception as e: |
| logger.warning(f"Could not delete chunk dir {chunk_dir}: {e}") |
|
|
| combined = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame() |
|
|
| |
| df = pd.read_csv(Path(root) / paths.input_csv, low_memory=False) |
| df["#chrom"] = df["#chrom"].astype(str) |
| df = df.loc[df["#chrom"] == chrom].reset_index(drop=True) |
| output_full_csv_path = Path(root) / chrom_outdir / f"fimo_annotations.csv" |
| combined.to_csv(output_full_csv_path, index=False) |
| logger.info( |
| f"Merging FIMO results into input DataFrame, which has {len(df)} rows for chromosome {chrom}" |
| ) |
| df = annotate_with_fimo(df, combined) |
|
|
| final = df[ |
| [ |
| "#chrom", |
| "contextStart", |
| "ChIPStart", |
| "ChIPEnd", |
| "contextEnd", |
| "chipscore", |
| "TR", |
| "jaspar", |
| ] |
| ] |
| output_csv_path = Path(root) / chrom_outdir / f"final.csv" |
| final.to_csv(output_csv_path, index=False) |
| logger.info(f"Wrote {len(final)} rows to {output_csv_path}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|