import pandas as pd from omegaconf import DictConfig from pathlib import Path import os import rootutils from dpacman.utils import pylogger root = rootutils.setup_root(__file__, indicator=".project-root", pythonpath=True) logger = pylogger.RankedLogger(__name__, rank_zero_only=True) def clean_nr(nr_raw_path: Path | str): """ Clean the non-redundant peaks BED file. Delete duplicate rows, assign columns, only keep columns we need. """ nr = pd.read_csv(nr_raw_path, sep="\t", header=None) nr.columns = [ "chrom", "chromStart", "chromEnd", "biotypes", "score", "strand", "thickStart", "thickEnd", "itemRgb", ] # make sure we correctly interpret column "biotype" as having one transcription factor separated from all relevant biotypes by ONE colon biotype_colon_counts = ( nr["biotypes"] .str.count(":") .value_counts() .reset_index()["biotypes"] .unique() .tolist() ) assert biotype_colon_counts == [ 1 ] # confirm belief that : separates the name of a transcription factor from its biotype - just ONE biotype. # then split the column accordingly into tr (transcriptional regulator) and biotypes nr[["tr", "biotypes"]] = nr["biotypes"].str.split(":", expand=True) # group and concat the scores logger.info( f"Keeping only the following columns: chrom, chromStart, chromEnd, biotypes, tr, score." ) nr = nr[["chrom", "chromStart", "chromEnd", "biotypes", "score", "tr"]] # drop duplicate rows - all fields logger.info(f"Size of database before dropping duplicate rows: {len(nr)}") nr = nr.drop_duplicates().reset_index(drop=True) logger.info(f"Size of database after dropping duplicate rows: {len(nr)}") # look for duplicate rows where it's clearly the same experiment but somehow different scores - chrom, chromStart, chromEnd, tr, biotypes experiment_dups = len( nr.loc[ nr.duplicated(subset=["chrom", "chromStart", "chromEnd", "tr", "biotypes"]) ] ) logger.info( f"{experiment_dups} total rows with same chrom, chromStart, chromEnd, biotypes, tr but different score." ) logger.info( f"Grouping by everything except score, comma-concatenating unique scores" ) nr = ( nr.groupby(["chrom", "chromStart", "chromEnd", "tr", "biotypes"]) .agg({"score": lambda x: ",".join(map(str, sorted(set(x))))}) .reset_index() ) logger.info(f"Final database size: {len(nr)}") nr["chromLen"] = nr["chromEnd"] - nr["chromStart"] return nr def clean_crm(crm_raw_path: Path | str): """ Clean the CRM BED file. Delete duplicate rows, assign columns, only keep columns we need. """ crm = pd.read_csv(crm_raw_path, sep="\t", header=None) crm.columns = [ "chrom", "chromStart", "chromEnd", "tr", "score", "strand", "thickStart", "thickEnd", "reserved", ] # group and concat the scores logger.info( f"Keeping only the following columns: chrom, chromStart, chromEnd, tr, score." ) crm = crm[["chrom", "chromStart", "chromEnd", "tr", "score"]] # drop duplicate rows - all fields logger.info(f"Size of database before dropping duplicate rows: {len(crm)}") crm = crm.drop_duplicates().reset_index(drop=True) logger.info(f"Size of database after dropping duplicate rows: {len(crm)}") # look for duplicate rows where it's clearly the same experiment but somehow different scores - chrom, chromStart, chromEnd, tr experiment_dups = len( crm.loc[crm.duplicated(subset=["chrom", "chromStart", "chromEnd", "tr"])] ) logger.info( f"{experiment_dups} total rows with same chrom, chromStart, chromEnd, tr but different score." ) logger.info( f"Grouping by everything except score, comma-concatenating unique scores" ) crm = ( crm.groupby(["chrom", "chromStart", "chromEnd", "tr"]) .agg({"score": lambda x: ",".join(map(str, sorted(set(x))))}) .reset_index() ) logger.info(f"Final database size: {len(crm)}") crm["chromLen"] = crm["chromEnd"] - crm["chromStart"] return crm def main(cfg: DictConfig): # Define the paths nr_raw_path = Path(root) / cfg.data_task.nr_raw_path nr_processed_dir = Path(root) / cfg.data_task.nr_processed_dir nr_processed_filename = cfg.data_task.nr_processed_filename nr_savepath = os.path.join(nr_processed_dir, nr_processed_filename) crm_raw_path = Path(root) / cfg.data_task.crm_raw_path crm_processed_dir = Path(root) / cfg.data_task.crm_processed_dir crm_processed_filename = cfg.data_task.crm_processed_filename crm_savepath = os.path.join(crm_processed_dir, crm_processed_filename) os.makedirs(nr_processed_dir, exist_ok=True) os.makedirs(crm_processed_dir, exist_ok=True) # Clean and save the non redundant peaks file if not (os.path.exists(nr_savepath)): nr_cleaned = clean_nr(nr_raw_path) nr_cleaned.to_csv(nr_savepath, sep="\t", index=False) logger.info( f"Saved cleaned non-redundant peaks (NR) database to: {nr_savepath}" ) else: nr_cleaned = None logger.info(f"File already exists at {nr_savepath}. Skipping") # Clean and save the CRM file if not (os.path.exists(crm_savepath)): crm_cleaned = clean_crm(crm_raw_path) crm_cleaned.to_csv(crm_savepath, sep="\t", index=False) logger.info( f"Saved cleaned cis-regulatory modules (CRM) database to: {crm_savepath}" ) else: crm_cleaned = None logger.info(f"File already exists at {crm_savepath}. Skipping") # Save example files if cfg.data_task.save_example_files: example_nr_dir = nr_processed_dir / "examples" os.makedirs(example_nr_dir, exist_ok=True) example_nr_savepath = os.path.join( example_nr_dir, "example500_" + nr_processed_filename ) if not (os.path.exists(example_nr_savepath)): if nr_cleaned is None: nr_cleaned = pd.read_csv(nr_savepath, sep="\t") nr_cleaned.sample(n=500, random_state=42).reset_index(drop=True).to_csv( example_nr_savepath, sep="\t", index=False ) logger.info( f"Saved example NR file with 500 rows to: {example_nr_savepath}" ) else: logger.info( f"Example file already exists at {example_nr_savepath}. Skipping" ) # CRM example example_crm_dir = crm_processed_dir / "examples" os.makedirs(example_crm_dir, exist_ok=True) example_crm_savepath = os.path.join( example_crm_dir, "example500_" + crm_processed_filename ) if not (os.path.exists(example_crm_savepath)): if crm_cleaned is None: crm_cleaned = pd.read_csv(crm_savepath, sep="\t") crm_cleaned.sample(n=500, random_state=42).reset_index(drop=True).to_csv( example_crm_savepath, sep="\t", index=False ) logger.info( f"Saved example CRM file with 500 rows to: {example_crm_savepath}" ) else: logger.info( f"Example file already exists at {example_crm_savepath}. Skipping" ) # CRM example for one transcription factor example_crm_tf_savepath = os.path.join( example_crm_dir, "example500_ERG_" + crm_processed_filename ) if not (os.path.exists(example_crm_tf_savepath)): if crm_cleaned is None: crm_cleaned = pd.read_csv(crm_savepath, sep="\t") crm_example_tf_db = crm_cleaned.copy(deep=True) crm_example_tf_db["tr"] = crm_example_tf_db["tr"].apply( lambda x: x.split(",") ) crm_example_tf_db = crm_example_tf_db.explode("tr").reset_index(drop=True) crm_example_tf_db = crm_example_tf_db.loc[crm_example_tf_db["tr"] == "ERG"] crm_example_tf_db = crm_example_tf_db.sample( n=min(500, len(crm_example_tf_db)), random_state=42 ).reset_index(drop=True) crm_example_tf_db.to_csv(example_crm_tf_savepath, sep="\t", index=False) logger.info( f"Saved example CRM file for one TF with 500 rows to: {example_crm_tf_savepath}" ) else: logger.info( f"Example file already exists at {example_crm_tf_savepath}. Skipping" ) if __name__ == "__main__": main()