DPACMAN / dpacman /data_tasks /fimo /run_fimo.py
svincoff's picture
training works
29899b4
#!/usr/bin/env python3
import os
import json
import subprocess
import pandas as pd
from multiprocessing import Pool, cpu_count
from tqdm import tqdm
import rootutils
import logging
from omegaconf import DictConfig
from pathlib import Path
import time
import shutil
from hydra.core.hydra_config import HydraConfig
import rootutils
from dpacman.utils import pylogger
root = rootutils.setup_root(__file__, indicator=".project-root", pythonpath=True)
logger = pylogger.RankedLogger(__name__, rank_zero_only=True)
def run_markov(fasta_get_markov, seq_fasta, bg_model):
subprocess.check_call(
[fasta_get_markov, seq_fasta, bg_model],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def split_fasta(
n_chunks, input_file, output_dir, debug=False, debug_n=1000, all_caps=True
):
"""
Round-robin split SEQ_FASTA into chunked FASTA files.
If in debug mode, only keep the first 5 entries for each.
"""
output_dir = Path(root) / output_dir
out_names = [os.path.join(output_dir, f"to_scan_{i}.fa") for i in range(n_chunks)]
out_handles = [open(out_names[i], "w") for i in range(n_chunks)]
chunk_counts = [0] * n_chunks # Count sequences per chunk
logger.info(f"ALL CAPS mode: {all_caps}")
with open(input_file) as inf:
header = None
seq_lines = []
for line in inf:
if line.startswith(">"):
if header is not None:
idx = int(header[1:].split("_")[0]) % n_chunks
if not debug or chunk_counts[idx] < debug_n:
out_handles[idx].write(header)
seqj = "".join(seq_lines)
if all_caps:
seqj = seqj.upper()
out_handles[idx].write(seqj)
chunk_counts[idx] += 1
header = line
seq_lines = []
else:
seq_lines.append(line)
# last record
if header is not None:
idx = int(header[1:].split("_")[0]) % n_chunks
if not debug or chunk_counts[idx] < debug_n:
out_handles[idx].write(header)
seqj = "".join(seq_lines)
if all_caps:
seqj = seqj.upper()
out_handles[idx].write(seqj)
chunk_counts[idx] += 1
for o in out_handles:
o.close()
# Log chunk sizes
for i, count in enumerate(chunk_counts):
logger.info(f"Chunk {i}: {count} sequences")
return out_names
def run_fimo_chunk(cfg):
"""
Run FIMO for a chunk.
Args:
cfg: dict with keys:
- chunk_id
- fasta_path
- fimo_outdir
- fimo_bin
- bg_model
- max_stored
- motif_file
- thresh
- thresh_mode
- outdir
"""
chunk_id = cfg["chunk_id"]
log_dir = Path(HydraConfig.get().run.dir) / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f"fimo_chunk_{chunk_id}.log"
wlogger = logging.getLogger(f"fimo_chunk_{chunk_id}")
wlogger.setLevel(logging.DEBUG)
wlogger.propagate = False # Don't double-log to root
outdir = Path(cfg["outdir"])
os.makedirs(outdir, exist_ok=True)
if not any(isinstance(h, logging.FileHandler) for h in wlogger.handlers):
fh = logging.FileHandler(log_file, mode="w", encoding="utf-8")
fh.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
wlogger.addHandler(fh)
# make an output directory for this chromosome
wlogger.info(f"Chunk {cfg['chunk_id']} starting FIMO")
wlogger.info(f"Threshold mode: {cfg['thresh_mode']}")
try:
call_list = [
cfg["fimo_bin"],
"--oc",
outdir,
"--bfile",
cfg["bg_model"],
"--max-stored-scores",
str(cfg["max_stored"]),
"--thresh",
str(cfg["thresh"]),
"--qv-thresh", # threshold on q-value
"--no-pgc", # suppress parsing of genomic coordinates in FASTA sequence header
cfg["motif_file"],
cfg["fasta_path"],
]
if cfg["thresh_mode"] != "q":
call_list = [x for x in call_list if x != "--qv-thresh"]
assert "--qv-thresh" not in call_list
with open(log_file, "a") as log_fh:
subprocess.check_call(
call_list,
stdout=log_fh,
stderr=log_fh,
)
wlogger.info(f"\tChunk {cfg['chunk_id']} finished")
# Delete the file - gotta save space!
file_path = Path(cfg["fasta_path"])
if file_path.exists() and file_path.is_file():
file_path.unlink()
wlogger.info(f"\tDeleted file: {file_path}")
except subprocess.CalledProcessError as e:
wlogger.error(f"\tChunk {chunk_id}: FIMO failed with error code {e.returncode}")
raise
return os.path.join(outdir, f"fimo.tsv")
def annotate_with_fimo(df, fdf):
df = df.reset_index().rename(columns={"index": "idx"})
df["sequence_name"] = (
df["idx"].astype(str)
+ "_chr"
+ df["#chrom"]
+ "_"
+ df["TR"]
+ "_"
+ df["contextStart"].astype(str)
+ "_"
+ df["contextEnd"].astype(str)
) # construt it the same way as headers
# Crucial: filter FDF results to only rows where the TF whose motif was found actually matches the TF that was detected there.
fdf["input_tr"] = fdf["sequence_name"].str.split("_", expand=True)[2]
true_matches = fdf.loc[fdf["motif_alt_id"] == fdf["input_tr"]].reset_index(
drop=True
)
logger.info(f"Length of full returned FIMO results: {len(fdf)}")
logger.info(
f"Length of true matches, where the FIMO tr and the input tr match: {len(true_matches)}"
)
true_matches = true_matches.merge(
df[["sequence_name", "contextStart"]], on="sequence_name", how="left"
)
true_matches["genomic_start"] = (
true_matches["contextStart"] + true_matches["start"] - 1
)
true_matches["genomic_end"] = true_matches["contextStart"] + true_matches["stop"]
true_matches["coord"] = (
true_matches["genomic_start"].astype(str)
+ "-"
+ true_matches["genomic_end"].astype(str)
)
agg = true_matches.groupby("sequence_name")["coord"].agg(
lambda hits: ",".join(hits)
)
df["jaspar"] = df["sequence_name"].map(agg).fillna("")
return df
def main(cfg: DictConfig):
"""
Main method for running FIMO analysis, searching JASPAR motifs against ChIP-seq peaks
"""
# 0) configs
paths = cfg.data_task.paths
fimo = cfg.data_task.fimo
meme = cfg.data_task.meme
# set njobs to max or whatever # is specified by user
njobs = fimo.njobs
if njobs == "max":
njobs = cpu_count()
else:
njobs = min(cpu_count(), int(njobs))
# 1) Optionally, acitvate test mode
# ── TEST MODE: extract just chromosome 1 to benchmark a smaller job ──
chroms = [str(x) for x in cfg.data_task.chroms]
logger.info(f"Debug setting: {cfg.data_task.debug}")
if cfg.data_task.debug:
chroms = chroms[0:1]
logging.info(f" DEBUG MODE: running on only one chromosome: {chroms}")
# 2) extract sequences & build BG model
for chrom in chroms:
path_to_fasta = (
Path(root)
/ Path(paths.input_fasta_outer_dir)
/ f"chr{chrom}"
/ paths.seq_fasta
)
path_to_bg = (
Path(root)
/ Path(paths.input_fasta_outer_dir)
/ f"chr{chrom}"
/ paths.bg_model
)
logging.info(f"Path to fasta file: {path_to_fasta}")
logger.info(f"Building background model at {path_to_bg}…")
run_markov(
Path(root) / meme.fasta_get_markov, path_to_fasta, Path(root) / path_to_bg
)
# 3) chunk FASTA and run FIMO in parallel
# make a folder to store the split fastas
chunk_folder = Path(path_to_fasta.parent) / "chunks"
os.makedirs(chunk_folder, exist_ok=True)
logger.info(f"Made directory {chunk_folder} to store {njobs} chunked fastas")
chunks = split_fasta(
njobs,
input_file=path_to_fasta,
output_dir=chunk_folder,
debug=cfg.data_task.debug,
all_caps=cfg.data_task.all_caps,
)
chrom_outdir = Path(root) / paths.fimo_outdir / f"chrom{chrom}"
os.makedirs(chrom_outdir, exist_ok=True)
chunk_cfgs = [
dict(
chunk_id=i,
fasta_path=chunk,
fimo_outdir=Path(root) / paths.fimo_outdir,
fimo_bin=Path(root) / meme.fimo_bin,
bg_model=path_to_bg,
max_stored=fimo.max_stored,
motif_file=Path(root) / meme.jaspar_motif_file,
thresh=fimo.thresh,
thresh_mode=fimo.thresh_mode,
outdir=Path(chrom_outdir) / f"chunk{i}",
)
for i, chunk in enumerate(chunks)
]
logger.info(f"Running FIMO in parallel ({njobs} jobs)…")
start_time = time.time()
# Call the parallel jobs and get back a list of tsv paths
with Pool(njobs) as pool:
tsv_paths = pool.map(run_fimo_chunk, chunk_cfgs)
end_time = time.time()
logger.info(
f"COMPLETED FIMO ({njobs} parallel jobs) in {end_time-start_time:.2f}s"
)
# cleanup! delete the chunked input files
if not any(chunk_folder.iterdir()): # Empty folder
chunk_folder.rmdir()
logger.info(f"Deleted empty folder: {chunk_folder}")
# 4) merge chunked TSVs. Some may be empty, so can't do a simple loop
# delete intermediate folders as we go
dfs = []
for tsv in tsv_paths:
try:
df = pd.read_csv(tsv, sep="\t", comment="#")
if not df.empty:
dfs.append(df)
except pd.errors.EmptyDataError:
logger.warning(f"Skipped empty TSV (only comments or blank): {tsv}")
except Exception as e:
logger.error(f"Error reading {tsv}: {e}")
raise # Or continue, depending on your needs
# delete this folder to save storage
chunk_dir = Path(tsv).parent
try:
shutil.rmtree(chunk_dir)
logger.info(f"Deleted chunk directory: {chunk_dir}")
except Exception as e:
logger.warning(f"Could not delete chunk dir {chunk_dir}: {e}")
combined = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()
# 5) annotate & write final CSV
df = pd.read_csv(Path(root) / paths.input_csv, low_memory=False)
df["#chrom"] = df["#chrom"].astype(str)
df = df.loc[df["#chrom"] == chrom].reset_index(drop=True)
output_full_csv_path = Path(root) / chrom_outdir / f"fimo_annotations.csv"
combined.to_csv(output_full_csv_path, index=False)
logger.info(
f"Merging FIMO results into input DataFrame, which has {len(df)} rows for chromosome {chrom}"
)
df = annotate_with_fimo(df, combined)
final = df[
[
"#chrom",
"contextStart",
"ChIPStart",
"ChIPEnd",
"contextEnd",
"chipscore",
"TR",
"jaspar",
]
]
output_csv_path = Path(root) / chrom_outdir / f"final.csv"
final.to_csv(output_csv_path, index=False)
logger.info(f"Wrote {len(final)} rows to {output_csv_path}")
if __name__ == "__main__":
main()