| """ |
| Holds Python methods for clustering Remap DNA sequences. |
| """ |
|
|
| import argparse |
| import numpy as np |
| import pandas as pd |
| from pathlib import Path |
| import random |
| import sys |
| import subprocess |
| from collections import defaultdict |
| import os |
| import json |
| from omegaconf import DictConfig |
| from hydra.core.hydra_config import HydraConfig |
|
|
| from dpacman.utils.clustering import ( |
| make_fasta, |
| process_fasta, |
| analyze_clustering_result, |
| run_mmseqs_clustering, |
| cluster_summary, |
| ) |
|
|
| import rootutils |
| from dpacman.utils import pylogger |
|
|
| root = rootutils.setup_root(__file__, indicator=".project-root", pythonpath=True) |
| logger = pylogger.RankedLogger(__name__, rank_zero_only=True) |
|
|
|
|
| def cluster_molecules( |
| fasta_dict, |
| fasta_path, |
| mmseqs_params: DictConfig, |
| output_dir="", |
| path_to_mmseqs="../softwares/mmseqs", |
| moltype="dna", |
| use_gpu=True, |
| ): |
| """ |
| Args: |
| - fasta_dict: dictionary object where the keys are sequence IDs, and the values are sequences |
| - fasta_path: str or Path to where the output fasta should be saved |
| - mmseqs_params: DictConfig of mmseqs hparams |
| - type: molecule type, "dna" or "protein" |
| """ |
|
|
| |
| logger.info(f"Making fasta at: {fasta_path}") |
| fasta_path = str(make_fasta(fasta_dict, fasta_path)) |
|
|
| |
| output_dir = str(Path(root) / output_dir) |
| path_to_mmseqs = str(Path(root) / path_to_mmseqs) |
|
|
| |
| dbtype = 1 |
| if moltype == "dna": |
| dbtype = 2 |
| run_mmseqs_clustering( |
| fasta_path, |
| output_dir, |
| min_seq_id=mmseqs_params.min_seq_id, |
| c=mmseqs_params.c, |
| cov_mode=mmseqs_params.cov_mode, |
| cluster_mode=mmseqs_params.cluster_mode, |
| dbtype=dbtype, |
| path_to_mmseqs=path_to_mmseqs, |
| ) |
|
|
| tsv_path = [x for x in os.listdir(output_dir) if x.endswith(".tsv")][0] |
| clusters = analyze_clustering_result(fasta_path, Path(output_dir) / tsv_path) |
| logger.info(f"Made clusters DataFrame:\n{clusters.head()}") |
| cluster_summary(clusters) |
|
|
|
|
| def read_input_data(input_path): |
| """ |
| Read the data from the input path. |
| It may be a csv or parquet |
| """ |
| input_path = Path(root) / input_path |
| df = None |
| if str(input_path).endswith(".parquet"): |
| df = pd.read_parquet(input_path, engine="pyarrow") |
| elif str(input_path).endswith(".csv"): |
| df = pd.read_csv(input_path) |
| elif str(input_path).endswith(".tsv") or str(input_path).endswith(".txt"): |
| df = pd.read_csv(input_path, sep="\t") |
| else: |
| raise Exception(f"Cannot read input data from {input_path}: invalid file type") |
| return df |
|
|
|
|
| def main(cfg: DictConfig): |
| """ |
| Run clustering on Remap protein AND DNA sequences. |
| Get clusters for each. |
| """ |
| |
| |
| df = read_input_data(cfg.data_task.input_data_path) |
|
|
| |
| dna_full_cfg = cfg.data_task.dna_full |
| dna_peaks_cfg = cfg.data_task.dna_peaks |
| protein_cfg = cfg.data_task.protein |
| logger.info( |
| f"Clustering DNA full: {cfg.data_task.cluster_dna_full}. Clustering DNA peaks: {cfg.data_task.cluster_dna_peaks}. Clustering protein: {cfg.data_task.cluster_protein}." |
| ) |
|
|
| |
| dna_full_fasta_path = Path(root) / dna_full_cfg.fasta_path |
| dna_peaks_fasta_path = Path(root) / dna_peaks_cfg.fasta_path |
| protein_fasta_path = Path(root) / protein_cfg.fasta_path |
| os.makedirs(dna_full_fasta_path.parent, exist_ok=True) |
| os.makedirs(dna_peaks_fasta_path.parent, exist_ok=True) |
| os.makedirs(protein_fasta_path.parent, exist_ok=True) |
|
|
| |
| with open(Path(root) / dna_full_cfg.input_map_path, "r") as f: |
| dna_full_fasta_dict = json.load(f) |
|
|
| with open(Path(root) / dna_peaks_cfg.input_map_path, "r") as f: |
| dna_peaks_fasta_dict = json.load(f) |
|
|
| with open(Path(root) / protein_cfg.input_map_path, "r") as f: |
| protein_fasta_dict = json.load(f) |
|
|
| logger.info( |
| f"Loaded DNA seq dict from: {dna_full_cfg.input_map_path}. Size: {len(dna_full_fasta_dict)}" |
| ) |
| logger.info( |
| f"Loaded DNA peaks dict from: {dna_peaks_cfg.input_map_path}. Size: {len(dna_peaks_fasta_dict)}" |
| ) |
| logger.info( |
| f"Loaded TR (protein) seq dict from: {protein_cfg.input_map_path}. Size: {len(protein_fasta_dict)}" |
| ) |
|
|
| |
| dna_ids = set(df["dna_seqid"].dropna()) |
| peak_ids = set(df["peak_seqid"].dropna()) |
| tr_ids = set(df["tr_seqid"].dropna()) |
|
|
| |
| dna_full_fasta_dict = { |
| k: dna_full_fasta_dict[k] for k in (dna_full_fasta_dict.keys() & dna_ids) |
| } |
| dna_peaks_fasta_dict = { |
| k: dna_peaks_fasta_dict[k] for k in (dna_peaks_fasta_dict.keys() & peak_ids) |
| } |
| protein_fasta_dict = { |
| k: protein_fasta_dict[k] for k in (protein_fasta_dict.keys() & tr_ids) |
| } |
|
|
| logger.info( |
| f"Filtered dictionaries to only sequences in the filtered training data." |
| ) |
| logger.info( |
| f"Total DNA sequences: {len(dna_full_fasta_dict)}. Total peak sequences: {len(dna_peaks_fasta_dict)}. Total protein sequences: {len(protein_fasta_dict)}" |
| ) |
|
|
| if cfg.data_task.cluster_dna_full: |
| logger.info(f"Clustering DNA full sequences, with context") |
| cluster_molecules( |
| dna_full_fasta_dict, |
| dna_full_fasta_path, |
| mmseqs_params=dna_full_cfg.mmseqs, |
| output_dir=dna_full_cfg.output_dir, |
| path_to_mmseqs=cfg.data_task.path_to_mmseqs, |
| moltype="dna", |
| ) |
|
|
| if cfg.data_task.cluster_dna_peaks: |
| logger.info(f"Clustering DNA peak sequences") |
| cluster_molecules( |
| dna_peaks_fasta_dict, |
| dna_peaks_fasta_path, |
| mmseqs_params=dna_peaks_cfg.mmseqs, |
| output_dir=dna_peaks_cfg.output_dir, |
| path_to_mmseqs=cfg.data_task.path_to_mmseqs, |
| moltype="dna", |
| ) |
|
|
| if cfg.data_task.cluster_protein: |
| logger.info("Clustering protein sequences.") |
| cluster_molecules( |
| protein_fasta_dict, |
| protein_fasta_path, |
| mmseqs_params=protein_cfg.mmseqs, |
| output_dir=protein_cfg.output_dir, |
| path_to_mmseqs=cfg.data_task.path_to_mmseqs, |
| moltype="protein", |
| ) |
|
|
| logger.info("Clustering pipeline complete") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|