""" Holds Python methods for clustering Remap DNA sequences. """ import argparse import numpy as np import pandas as pd from pathlib import Path import random import sys import subprocess from collections import defaultdict import os import json from omegaconf import DictConfig from hydra.core.hydra_config import HydraConfig from dpacman.utils.clustering import ( make_fasta, process_fasta, analyze_clustering_result, run_mmseqs_clustering, cluster_summary, ) import rootutils from dpacman.utils import pylogger root = rootutils.setup_root(__file__, indicator=".project-root", pythonpath=True) logger = pylogger.RankedLogger(__name__, rank_zero_only=True) def cluster_molecules( fasta_dict, fasta_path, mmseqs_params: DictConfig, output_dir="", path_to_mmseqs="../softwares/mmseqs", moltype="dna", use_gpu=True, ): """ Args: - fasta_dict: dictionary object where the keys are sequence IDs, and the values are sequences - fasta_path: str or Path to where the output fasta should be saved - mmseqs_params: DictConfig of mmseqs hparams - type: molecule type, "dna" or "protein" """ # make the fasta logger.info(f"Making fasta at: {fasta_path}") fasta_path = str(make_fasta(fasta_dict, fasta_path)) # prepare directories output_dir = str(Path(root) / output_dir) path_to_mmseqs = str(Path(root) / path_to_mmseqs) # run mmseqs dbtype = 1 if moltype == "dna": dbtype = 2 run_mmseqs_clustering( fasta_path, output_dir, min_seq_id=mmseqs_params.min_seq_id, c=mmseqs_params.c, cov_mode=mmseqs_params.cov_mode, cluster_mode=mmseqs_params.cluster_mode, dbtype=dbtype, path_to_mmseqs=path_to_mmseqs, ) tsv_path = [x for x in os.listdir(output_dir) if x.endswith(".tsv")][0] clusters = analyze_clustering_result(fasta_path, Path(output_dir) / tsv_path) logger.info(f"Made clusters DataFrame:\n{clusters.head()}") cluster_summary(clusters) def read_input_data(input_path): """ Read the data from the input path. It may be a csv or parquet """ input_path = Path(root) / input_path df = None if str(input_path).endswith(".parquet"): df = pd.read_parquet(input_path, engine="pyarrow") elif str(input_path).endswith(".csv"): df = pd.read_csv(input_path) elif str(input_path).endswith(".tsv") or str(input_path).endswith(".txt"): df = pd.read_csv(input_path, sep="\t") else: raise Exception(f"Cannot read input data from {input_path}: invalid file type") return df def main(cfg: DictConfig): """ Run clustering on Remap protein AND DNA sequences. Get clusters for each. """ # Load input CSV # columns: Index(['ID', 'tr_seqid', 'dna_seqid', 'tr_name', 'peak_id', 'chipscore', 'total_jaspar_hits', 'dna_sequence', 'tr_sequence', 'scores'] df = read_input_data(cfg.data_task.input_data_path) # Separate configs dna_full_cfg = cfg.data_task.dna_full dna_peaks_cfg = cfg.data_task.dna_peaks protein_cfg = cfg.data_task.protein logger.info( f"Clustering DNA full: {cfg.data_task.cluster_dna_full}. Clustering DNA peaks: {cfg.data_task.cluster_dna_peaks}. Clustering protein: {cfg.data_task.cluster_protein}." ) # Make fastas dna_full_fasta_path = Path(root) / dna_full_cfg.fasta_path dna_peaks_fasta_path = Path(root) / dna_peaks_cfg.fasta_path protein_fasta_path = Path(root) / protein_cfg.fasta_path os.makedirs(dna_full_fasta_path.parent, exist_ok=True) os.makedirs(dna_peaks_fasta_path.parent, exist_ok=True) os.makedirs(protein_fasta_path.parent, exist_ok=True) # Make dictioary needed for input to the fasta methods with open(Path(root) / dna_full_cfg.input_map_path, "r") as f: dna_full_fasta_dict = json.load(f) with open(Path(root) / dna_peaks_cfg.input_map_path, "r") as f: dna_peaks_fasta_dict = json.load(f) with open(Path(root) / protein_cfg.input_map_path, "r") as f: protein_fasta_dict = json.load(f) logger.info( f"Loaded DNA seq dict from: {dna_full_cfg.input_map_path}. Size: {len(dna_full_fasta_dict)}" ) logger.info( f"Loaded DNA peaks dict from: {dna_peaks_cfg.input_map_path}. Size: {len(dna_peaks_fasta_dict)}" ) logger.info( f"Loaded TR (protein) seq dict from: {protein_cfg.input_map_path}. Size: {len(protein_fasta_dict)}" ) # Build hash-sets once (drop NaNs to avoid weird matches) dna_ids = set(df["dna_seqid"].dropna()) peak_ids = set(df["peak_seqid"].dropna()) tr_ids = set(df["tr_seqid"].dropna()) # Iterate only the intersection (fast when allowed << dict size) dna_full_fasta_dict = { k: dna_full_fasta_dict[k] for k in (dna_full_fasta_dict.keys() & dna_ids) } dna_peaks_fasta_dict = { k: dna_peaks_fasta_dict[k] for k in (dna_peaks_fasta_dict.keys() & peak_ids) } protein_fasta_dict = { k: protein_fasta_dict[k] for k in (protein_fasta_dict.keys() & tr_ids) } logger.info( f"Filtered dictionaries to only sequences in the filtered training data." ) logger.info( f"Total DNA sequences: {len(dna_full_fasta_dict)}. Total peak sequences: {len(dna_peaks_fasta_dict)}. Total protein sequences: {len(protein_fasta_dict)}" ) if cfg.data_task.cluster_dna_full: logger.info(f"Clustering DNA full sequences, with context") cluster_molecules( dna_full_fasta_dict, dna_full_fasta_path, mmseqs_params=dna_full_cfg.mmseqs, output_dir=dna_full_cfg.output_dir, path_to_mmseqs=cfg.data_task.path_to_mmseqs, moltype="dna", ) if cfg.data_task.cluster_dna_peaks: logger.info(f"Clustering DNA peak sequences") cluster_molecules( dna_peaks_fasta_dict, dna_peaks_fasta_path, mmseqs_params=dna_peaks_cfg.mmseqs, output_dir=dna_peaks_cfg.output_dir, path_to_mmseqs=cfg.data_task.path_to_mmseqs, moltype="dna", ) if cfg.data_task.cluster_protein: logger.info("Clustering protein sequences.") cluster_molecules( protein_fasta_dict, protein_fasta_path, mmseqs_params=protein_cfg.mmseqs, output_dir=protein_cfg.output_dir, path_to_mmseqs=cfg.data_task.path_to_mmseqs, moltype="protein", ) logger.info("Clustering pipeline complete") if __name__ == "__main__": main()