svincoff commited on
Commit
1927f8f
·
1 Parent(s): 3b9cde0

finished fimo pipeline

Browse files
.gitignore CHANGED
@@ -14,5 +14,7 @@ dpacman/data_tasks/clean/__pycache__/
14
  dpacman/data_tasks/download/__pycache__/
15
  dpacman/data_tasks/fimo/__pycache__/
16
  dpacman/scripts/__pycache__/
 
 
17
  logs/
18
  tree.txt
 
14
  dpacman/data_tasks/download/__pycache__/
15
  dpacman/data_tasks/fimo/__pycache__/
16
  dpacman/scripts/__pycache__/
17
+ dpacman/temp.py
18
+ dpacman/temp2.py
19
  logs/
20
  tree.txt
configs/data_task/download/genome.yaml CHANGED
@@ -1,5 +1,5 @@
1
  name: genome
2
  type: download
3
- output_dir: dpacman/classifier/data_files/raw/genomes
4
  genomes:
5
  - hg38
 
1
  name: genome
2
  type: download
3
+ output_dir: dpacman/data_files/raw/genomes
4
  genomes:
5
  - hg38
configs/data_task/fimo/post_fimo.yaml CHANGED
@@ -1,6 +1,12 @@
1
  name: post_fimo
2
  type: fimo
3
 
4
- input_csv: dpacman/data_files/processed/fimo/remap2022_crm_fimo_output.csv
5
- output_csv: dpacman/data_files/processed/fimo/remap2022_crm_fimo_output_processed.csv
6
- json_dir: dpacman/data_files/raw/genomes/hg38
 
 
 
 
 
 
 
1
  name: post_fimo
2
  type: fimo
3
 
4
+ fimo_out_dir: dpacman/data_files/processed/fimo/fimo_out_q
5
+ unprocessed_output_csv: dpacman/data_files/processed/fimo/remap2022_crm_fimo_output_q_unprocessed.csv
6
+ processed_output_csv: dpacman/data_files/processed/fimo/remap2022_crm_fimo_output_q_processed.csv
7
+ json_dir: dpacman/data_files/raw/genomes/hg38
8
+ idmap_path: dpacman/data_files/raw/remap/idmapping_reviewed_true_2025_08_11.tsv
9
+
10
+ jaspar_boost: 100
11
+
12
+ debug: false
configs/data_task/fimo/pre_fimo.yaml CHANGED
@@ -1,8 +1,12 @@
1
  name: pre_fimo
2
  type: fimo
3
 
4
- input_csv: dpacman/data_files/processed/remap/remap2022_crm_macs2_hg38_v1_0_clean.tsv
5
- output_csv: dpacman/data_files/processed/fimo/remap2022_crm_fimo_input.csv
 
 
 
 
6
  window_total: 500
7
 
8
  save_example_files: true
 
1
  name: pre_fimo
2
  type: fimo
3
 
4
+ paths:
5
+ input_csv: dpacman/data_files/processed/remap/remap2022_crm_macs2_hg38_v1_0_clean.tsv
6
+ output_csv: dpacman/data_files/processed/fimo/remap2022_crm_fimo_input.csv
7
+ chrom_output_path: dpacman/data_files/processed/fimo/chrom_inputs
8
+ json_dir: dpacman/data_files/raw/genomes/hg38
9
+
10
  window_total: 500
11
 
12
  save_example_files: true
configs/data_task/fimo/run_fimo.yaml CHANGED
@@ -1,24 +1,26 @@
1
- name: post_fimo
2
  type: fimo
3
 
4
- debug: false
5
 
6
  paths:
7
- input_csv: dpacman/data_files/processed/fimo/remap2022_crm_fimo_input.tsv
8
- output_csv: dpacman/data_files/processed/fimo/remap2022_crm_fimo_output.csv
9
  json_dir: dpacman/data_files/raw/genomes/hg38
10
-
11
- meme:
12
- fimo_bin: dpacman/softwares/meme/bin/fimo
13
- fasta_get_markov: dpacman/softwares/meme/libexec/meme-5.5.8/fasta-get-markov
14
- jaspar_motif_file: dpacman/softwares/meme-5.5.8/tests/common/JASPAR_CORE_2014_vertebrates.meme
15
-
16
- fnames:
17
  seq_fasta: to_scan.fa
18
  bg_model: bg_model.txt
19
- fimo_outdir: fimo_out
 
 
 
 
20
 
21
  fimo:
22
- pval_thresh: 1e-4
 
23
  max_stored: 1000000
24
- njobs: max
 
 
 
 
1
+ name: run_fimo
2
  type: fimo
3
 
4
+ debug: true
5
 
6
  paths:
7
+ input_csv: dpacman/data_files/processed/fimo/remap2022_crm_fimo_input.csv
 
8
  json_dir: dpacman/data_files/raw/genomes/hg38
9
+ input_fasta_outer_dir: dpacman/data_files/processed/fimo/chrom_inputs
10
+ fimo_outdir: dpacman/data_files/processed/fimo/fimo_out
 
 
 
 
 
11
  seq_fasta: to_scan.fa
12
  bg_model: bg_model.txt
13
+
14
+ meme:
15
+ fimo_bin: /vast/projects/pranam/lab/shared/meme/bin/fimo
16
+ fasta_get_markov: /vast/projects/pranam/lab/shared/meme/libexec/meme-5.5.8/fasta-get-markov
17
+ jaspar_motif_file: /vast/projects/pranam/lab/shared/meme-5.5.8/tests/common/JASPAR_CORE_2014_vertebrates.meme
18
 
19
  fimo:
20
+ thresh: 1e-2
21
+ thresh_mode: q
22
  max_stored: 1000000
23
+ njobs: 64
24
+
25
+ chroms: [1]
26
+ all_caps: true
dpacman/data_tasks/fimo/post_fimo.py CHANGED
@@ -1,120 +1,391 @@
1
  #!/usr/bin/env python3
2
  import os
3
- import json
4
  import uuid
5
- import pandas as pd
 
 
 
6
  import numpy as np
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7
 
8
- # ─────────────────────────────────────────────────────────────────────────────
9
- # PATHS edit these if needed
10
- INPUT_CSV = "/home/a03-akrishna/DPACMAN/data_files/processed/post_fimo.csv"
11
- OUTPUT_CSV = "/home/a03-akrishna/DPACMAN/data_files/processed/final.csv"
12
- JSON_DIR = "/home/a03-svincoff/DPACMAN/dpacman/data_files/raw/genomes/hg38"
13
- # ─────────────────────────────────────────────────────────────────────────────
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
 
 
 
 
15
 
16
- def load_chrom_dna(chrom, cache):
17
- """Load & cache the full chromosome 'dna' string from hg38_chr{chrom}.json."""
18
- if chrom in cache:
19
- return cache[chrom]
20
- path = os.path.join(JSON_DIR, f"hg38_chr{chrom}.json")
21
- if not os.path.isfile(path):
22
- raise FileNotFoundError(f"Missing JSON for chr{chrom}: {path}")
23
- with open(path) as f:
24
- data = json.load(f)
25
- cache[chrom] = data["dna"]
26
- return cache[chrom]
27
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
 
29
- def sigmoid_array(arr: np.ndarray) -> np.ndarray:
30
- """Elementwise logistic sigmoid → values in (0,1)."""
31
- return 1.0 / (1.0 + np.exp(-arr))
 
 
 
 
 
 
 
 
 
 
32
 
 
 
 
 
 
 
 
 
33
 
34
- def main():
35
- # 1) load post‐FIMO results
36
- df = pd.read_csv(INPUT_CSV)
 
37
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
38
  dna_cache = {}
 
 
 
39
  records = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
 
41
- # 2) for each TF‐peak row, extract sequence & build scores
42
- for _, row in df.iterrows():
43
- tfid = row["TF_id"]
44
- chrom = str(row["#chrom"])
45
- cstart = int(row["contextStart"])
46
- cend = int(row["contextEnd"])
47
- peak_s = int(row["ChIPStart"])
48
- peak_e = int(row["ChIPEnd"])
49
- chipscore = int(row["chipscore"])
50
- jaspar = str(row["jaspar"])
51
-
52
- # pull out the exact context sequence (including any Ns)
53
- dna = load_chrom_dna(chrom, dna_cache)
54
- seq = dna[cstart:cend]
55
- L = len(seq)
56
-
57
- # initialize base‐resolution scores
58
- scores = np.zeros(L, dtype=int)
59
-
60
- # fill ChIP‐seq peak region
61
- ps = peak_s - cstart
62
- pe = peak_e - cstart
63
- scores[ps:pe] = chipscore
64
-
65
- # overlay Jaspar hits (+100)
66
- if jaspar.strip():
67
- for hit in jaspar.split(","):
68
- hs, he = hit.split("-")
69
- hs_i = max(int(hs) - cstart, 0)
70
- he_i = min(int(he) - cstart, L)
71
- scores[hs_i:he_i] = chipscore + 100
72
-
73
- # stringify the raw scores
74
- score_str = ",".join(map(str, scores.tolist()))
75
-
76
- # sigmoid‐transform
77
- sig_vals = sigmoid_array(scores.astype(float))
78
- score_sig = ",".join(f"{v:.4f}" for v in sig_vals.tolist())
79
 
 
80
  records.append(
81
- {
82
- "TF_id": tfid,
83
- "dna_sequence": seq,
84
- "score_str": score_str,
85
- "score_sig_r2": score_sig,
86
- }
87
  )
88
 
89
- # 3) assemble into a DataFrame
90
- final_df = pd.DataFrame.from_records(records)
 
 
 
 
 
 
 
 
 
 
 
 
 
91
 
92
- # 4) drop any exact TF+DNA duplicates
93
- final_df = final_df.drop_duplicates(subset=["TF_id", "dna_sequence"]).reset_index(
94
- drop=True
95
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
 
97
- # 5) assign random IDs
98
- tf_map = {tf: uuid.uuid4().hex[:8] for tf in final_df["TF_id"].unique()}
99
- dna_map = {sq: uuid.uuid4().hex[:8] for sq in final_df["dna_sequence"].unique()}
 
 
 
 
 
 
 
 
 
100
 
101
- final_df["tf_seq_id"] = final_df["TF_id"].map(tf_map)
102
- final_df["dna_seq_id"] = final_df["dna_sequence"].map(dna_map)
103
- final_df["ID"] = final_df["tf_seq_id"] + "_" + final_df["dna_seq_id"]
104
 
105
- # 6) reorder and write out
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
106
  cols = [
107
- "TF_id",
108
- "tf_seq_id",
109
- "dna_sequence",
110
- "dna_seq_id",
111
- "score_str",
112
- "score_sig_r2",
113
- "ID",
114
  ]
115
- final_df[cols].to_csv(OUTPUT_CSV, index=False)
116
- print(f"Wrote {len(final_df)} rows → {OUTPUT_CSV}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
118
 
119
  if __name__ == "__main__":
 
 
 
120
  main()
 
1
  #!/usr/bin/env python3
2
  import os
 
3
  import uuid
4
+ import logging
5
+ from pathlib import Path
6
+ import multiprocessing as mp
7
+
8
  import numpy as np
9
+ import pandas as pd
10
+ import math
11
+ import rootutils
12
+ import polars as pl
13
+ from omegaconf import DictConfig
14
+ from hydra.core.hydra_config import HydraConfig
15
+
16
+ from dpacman.data_tasks.fimo.pre_fimo import load_chrom_dna
17
+
18
+ root = rootutils.setup_root(__file__, indicator=".project-root", pythonpath=True)
19
+ logger = logging.getLogger(__name__)
20
+
21
+ def normalize_array(arr: np.ndarray, max_chipseq_score: int=1000, jaspar_boost:int=100) -> np.ndarray:
22
+ normalization_factor = max_chipseq_score + jaspar_boost
23
+ return arr / normalization_factor
24
 
25
+ def format_sig(sig_vals, decimals=4, atol=0.0, rtol=1e-5):
26
+ a = np.asarray(sig_vals, dtype=float)
27
+ scale = 10.0 ** decimals
28
+ thresh = 0.5 / scale # 0.00005 for 4 dp
29
+
30
+ # Would display as 0.0000 or 1.0000 at given precision?
31
+ m0 = np.isclose(a, 0.0, atol=atol, rtol=rtol) | (np.abs(a) <= thresh)
32
+ m1 = np.isclose(a, 1.0, atol=atol, rtol=rtol) | (np.abs(a - 1.0) <= thresh)
33
+
34
+ out = np.char.mod(f'%.{decimals}f', a)
35
+ out = np.where(m0, '0', out)
36
+ out = np.where(m1 & ~m0, '1', out) # don’t overwrite any zeros
37
+ return ",".join(out.tolist())
38
+
39
+ def _safe_process(task):
40
+ try:
41
+ return ("ok", _process_one_chrom_folder(task))
42
+ except Exception as e:
43
+ return ("err", (task[0], repr(e), traceback.format_exc()))
44
+
45
+ def discover_chrom_folders(fimo_out_dir: Path) -> list[str]:
46
+ return sorted(
47
+ name for name in os.listdir(fimo_out_dir)
48
+ if name.startswith("chrom") and (fimo_out_dir / name / "final.csv").exists()
49
+ )
50
 
51
+ def _process_one_row(row, dna: str, jaspar_boost: int = 100) -> dict:
52
+ # row order: TR, chrom, cstart, cend, peak_s, peak_e, chipscore, jaspar
53
+ trname, chrom, cstart, cend, peak_s, peak_e, chipscore, jaspar = row
54
 
55
+ seq = dna[cstart:cend]
56
+ L = len(seq)
57
+ scores = np.zeros(L)
 
 
 
 
 
 
 
 
58
 
59
+ # ChIP peak
60
+ ps = peak_s - cstart
61
+ pe = peak_e - cstart
62
+ peak_seq = ""
63
+ if ps < L and pe > 0:
64
+ scores[max(ps, 0):min(pe, L)] = chipscore
65
+ peak_seq = dna[max(ps, 0):min(pe, L)]
66
+
67
+ # JASPAR hits (+jaspar_boost)
68
+ # only run if the peak is not np.nan
69
+ total_jaspar = 0
70
+ if isinstance(jaspar, str) and jaspar.strip():
71
+ for hit in jaspar.split(","):
72
+ total_jaspar+=1
73
+ hs, he = hit.split("-")
74
+ hs_i = max(int(hs) - cstart, 0)
75
+ he_i = min(int(he) - cstart, L)
76
+ if hs_i < he_i:
77
+ scores[hs_i:he_i] = chipscore + jaspar_boost
78
 
79
+ score_str = ",".join(map(str, [int(x) for x in scores.tolist()]))
80
+ #sig_vals = normalize_array(scores.astype(np.float32))
81
+ # store out to 4 decimal places unless it's 0
82
+ #score_sig = format_sig(sig_vals)
83
+ return {
84
+ "chrom": chrom,
85
+ "tr_name": trname,
86
+ "dna_sequence": seq,
87
+ "peak_sequence": peak_seq,
88
+ "chipscore": chipscore,
89
+ "total_jaspar_hits": total_jaspar,
90
+ "scores": score_str,
91
+ }
92
 
93
+ def _process_one_chrom_folder(task) -> pd.DataFrame:
94
+ """Runs inside a worker process. Reads one chrom’s final.csv, loads DNA once, builds records."""
95
+ chrom_folder, fimo_out_dir_str, json_dir, jaspar_boost, output_parts_folder = task
96
+
97
+ # make unique logger for this process
98
+ log_dir = Path(HydraConfig.get().run.dir) / "logs"
99
+ log_dir.mkdir(parents=True, exist_ok=True)
100
+ output_parts_folder.mkdir(parents=True,exist_ok=True)
101
 
102
+ log_file = log_dir / f"fimo_{chrom_folder}.log"
103
+ wlogger = logging.getLogger(f"fimo_{chrom_folder}")
104
+ wlogger.setLevel(logging.DEBUG)
105
+ wlogger.propagate = False # Don't double-log to root
106
 
107
+ if not any(isinstance(h, logging.FileHandler) for h in wlogger.handlers):
108
+ fh = logging.FileHandler(log_file, mode="w", encoding="utf-8")
109
+ fh.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
110
+ wlogger.addHandler(fh)
111
+
112
+ fimo_out_dir = Path(fimo_out_dir_str)
113
+ final_csv = fimo_out_dir / chrom_folder / "final.csv"
114
+ if not final_csv.exists():
115
+ return pd.DataFrame()
116
+
117
+ usecols = ["TR", "#chrom", "contextStart", "contextEnd",
118
+ "ChIPStart", "ChIPEnd", "chipscore", "jaspar"]
119
+ df = pd.read_csv(final_csv, usecols=usecols)
120
+
121
+ if df.empty:
122
+ return pd.DataFrame()
123
+
124
+ # Normalize dtypes up-front
125
+ df["#chrom"] = df["#chrom"].astype(str)
126
+ for col in ("contextStart", "contextEnd", "ChIPStart", "ChIPEnd", "chipscore"):
127
+ df[col] = pd.to_numeric(df[col], downcast="integer")
128
+
129
+ chrom = df["#chrom"].iloc[0]
130
  dna_cache = {}
131
+ dna = load_chrom_dna(str(chrom), dna_cache, json_dir).upper() # just capitalize it for training
132
+ wlogger.info(f"Loaded DNA for {chrom}, length {len(dna)}")
133
+
134
  records = []
135
+
136
+ # rename to make processing easier
137
+ rename = {
138
+ "#chrom": "chrom",
139
+ "contextStart": "cstart",
140
+ "contextEnd": "cend",
141
+ "ChIPStart": "peak_s",
142
+ "ChIPEnd": "peak_e",
143
+ "TR": "tr_name",
144
+ }
145
+ df = df.rename(columns=rename)
146
+
147
+ # (Optional) ensure numeric dtypes; will raise if non-numeric
148
+ for col in ["cstart", "cend", "peak_s", "peak_e", "chipscore"]:
149
+ df[col] = pd.to_numeric(df[col], errors="raise")
150
 
151
+ total = len(df)
152
+ last_decile = 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
153
 
154
+ for i, row in enumerate(df.itertuples(index=False), start=1):
155
  records.append(
156
+ _process_one_row(
157
+ (row.tr_name, row.chrom, int(row.cstart), int(row.cend),
158
+ int(row.peak_s), int(row.peak_e), int(row.chipscore), row.jaspar),
159
+ dna, jaspar_boost
160
+ )
 
161
  )
162
 
163
+ # progress every ~10%
164
+ decile = (i * 10) // max(total, 1)
165
+ if decile > last_decile:
166
+ last_decile = decile
167
+ wlogger.info("Progress: %d%% (%d/%d)", decile * 10, i, total)
168
+
169
+ wlogger.info(f"Completed processing {len(records)} rows for {chrom_folder}")
170
+
171
+ # make into a DataFrame and save
172
+ records_df = pd.DataFrame.from_records(records)
173
+ savepath = output_parts_folder / f"{chrom_folder}_processed.csv"
174
+ records_df.to_csv(savepath, index=False)
175
+ wlogger.info(f"Saved records to {savepath}")
176
+
177
+ return savepath
178
 
179
+ def build_dataset_fast_mp(fimo_out_dir: Path, json_dir: str, debug: bool, max_workers: int | None, jaspar_boost: int = 100, output_parts_folder: str = None) -> pd.DataFrame:
180
+ """
181
+ Multiprocessing to build final dataset across chromosomes
182
+ """
183
+ chrom_folders = discover_chrom_folders(fimo_out_dir)
184
+ if not chrom_folders:
185
+ logger.warning(f"No chrom* folders with final.csv under {fimo_out_dir}")
186
+ return pd.DataFrame()
187
+
188
+ if debug:
189
+ chrom_folders = ["chromY"] if "chromY" in chrom_folders else chrom_folders[:1]
190
+ logger.info(f"DEBUG MODE: considering {chrom_folders[0]} only")
191
+
192
+ tasks = [(cf, str(fimo_out_dir), json_dir, jaspar_boost, output_parts_folder) for cf in chrom_folders]
193
+
194
+ # serial path (debug/deterministic)
195
+ if max_workers is not None and max_workers <= 1 or len(tasks) == 1:
196
+ parts = []
197
+ errs = []
198
+ for t in tasks:
199
+ status, payload = _safe_process(t)
200
+ if status == "ok" and isinstance(payload, pd.DataFrame) and not payload.empty:
201
+ parts.append(payload)
202
+ else:
203
+ errs.append(payload)
204
+ if errs:
205
+ for chrom, msg, tb in errs:
206
+ logger.error("Worker error for %s: %s\n%s", chrom, msg, tb)
207
+ # raise after serial run if you want hard failure
208
+ return pd.concat(parts, ignore_index=True) if parts else pd.DataFrame()
209
+
210
+ # parallel path
211
+ procs = min(max_workers or mp.cpu_count(), len(tasks))
212
+ logger.info(f"Using {procs} parallel workers for {len(tasks)} chrom folders")
213
 
214
+ paths, errs = [], []
215
+ with mp.Pool(processes=procs, maxtasksperchild=10) as pool:
216
+ for status, payload in pool.imap_unordered(_safe_process, tasks, chunksize=1):
217
+ if status == "ok" and isinstance(payload, pd.DataFrame) and not payload.empty:
218
+ paths.append(payload)
219
+ else:
220
+ errs.append(payload)
221
+
222
+ if errs:
223
+ for chrom, msg, tb in errs:
224
+ logger.error("Worker error for %s: %s\n%s", chrom, msg, tb)
225
+ # optional: raise RuntimeError("One or more workers failed, see logs.")
226
 
227
+ paths = [p for p in parts if os.path.exists(p)]
228
+ return paths
 
229
 
230
+ def combine_processed_with_polars(
231
+ paths_to_processed_dfs: list[str],
232
+ idmap_path: str, # TSV with columns: From, Entry, Sequence
233
+ out_path: str, # e.g., "processed_out.parquet" or ".csv"
234
+ ):
235
+ if not paths_to_processed_dfs:
236
+ logger.info("No records produced; nothing to write.")
237
+ return
238
+
239
+ # 1) Scan all CSVs lazily (no full read)
240
+ lfs = [pl.scan_csv(p, infer_schema_length=0) for p in paths_to_processed_dfs]
241
+ lf = pl.concat(lfs, how="vertical")
242
+ logger.info(f"Scanned CSVs")
243
+
244
+ # 2) Drop duplicate occurrences of tr_name and peak_sequence, because these are the same peak
245
+ lf = lf.unique(subset=["tr_name", "peak_sequence"], keep="first", maintain_order=True)
246
+ logger.info(f"Dropped duplicate examples of tr_name + peak_sequence")
247
+
248
+ # 3) Join small idmap (read eagerly; it’s tiny)
249
+ idmap = (
250
+ pl.read_csv(idmap_path, separator="\t", columns=["From", "Entry", "Sequence"])
251
+ .rename({"From": "tr_name", "Entry": "tr_uniprot", "Sequence": "tr_sequence"})
252
+ )
253
+ lf = lf.join(idmap.lazy(), on="tr_name", how="left")
254
+ logger.info(f"Merged in UniProt IDs and TR sequences from UniProt ID mappping")
255
+
256
+ # 4) Per-chromosome unique peak index and peak_id
257
+ # (dense rank over peak_sequence per chrom; if you require "first-appearance" order,
258
+ # see the note below for an alternate approach.)
259
+ lf = lf.with_columns([
260
+ pl.col("peak_sequence").fill_null("").alias("peak_sequence"),
261
+ pl.col("chrom").cast(pl.Utf8),
262
+ ])
263
+ lf = lf.with_columns(
264
+ pl.col("peak_sequence")
265
+ .rank(method="dense") # 1,2,3,... per group
266
+ .over("chrom")
267
+ .cast(pl.Int64)
268
+ .alias("chrom_peak_idx")
269
+ )
270
+ lf = lf.with_columns(
271
+ pl.format("chrom{}_peak{}", pl.col("chrom"), pl.col("chrom_peak_idx")).alias("peak_id")
272
+ )
273
+ logger.info(f"Assigned unique peak_ids per chromosome based on peak_sequence")
274
+
275
+ # 5) Build stable IDs for dna_sequence and tr_sequence based on first appearance
276
+ # (do this by creating small maps with unique(..., maintain_order=True) and joining)
277
+ dna_map = (
278
+ lf.select("dna_sequence")
279
+ .unique(maintain_order=True)
280
+ .with_row_index("dna_idx", offset=1)
281
+ .with_columns(pl.format("dnaseq{}", pl.col("dna_idx")).alias("dna_seqid"))
282
+ .select("dna_sequence", "dna_seqid")
283
+ )
284
+ logger.info(f"Assigned dna_sequence IDs")
285
+ tr_map = (
286
+ lf.select("tr_sequence")
287
+ .unique(maintain_order=True)
288
+ .with_row_index("tr_idx", offset=1)
289
+ .with_columns(pl.format("trseq{}", pl.col("tr_idx")).alias("tr_seqid"))
290
+ .select("tr_sequence", "tr_seqid")
291
+ )
292
+ logger.info(f"Assigned tr_sequence IDs")
293
+ lf = lf.join(dna_map, on="dna_sequence", how="left").join(tr_map, on="tr_sequence", how="left")
294
+ logger.info(f"Applied dna_sequence and tr_sequence IDs to main table")
295
+
296
+ # 6) Final ID and column selection
297
+ lf = lf.with_columns(
298
+ (pl.col("tr_seqid") + pl.lit("_") + pl.col("dna_seqid")).alias("ID")
299
+ )
300
  cols = [
301
+ "ID", "tr_name", "peak_id", "chipscore", "total_jaspar_hits",
302
+ "dna_sequence", "tr_sequence", "scores"
 
 
 
 
 
303
  ]
304
+ lf_out = lf.select(cols)
305
+ #n_rows = lf_out.select(pl.len().alias("rows")).collect(streaming=True)["rows"][0]
306
+ logger.info(f"Selected final columns")
307
+
308
+ # 7) Write streaming to disk
309
+ out_path = str(out_path)
310
+ Path(out_path).parent.mkdir(parents=True, exist_ok=True)
311
+
312
+ if out_path.lower().endswith(".parquet"):
313
+ lf_out.sink_parquet(out_path, compression="zstd", statistics=True, row_group_size=128_000)
314
+ logger.info(f"Wrote parquet file to {out_path}")
315
+ elif out_path.lower().endswith(".csv"):
316
+ # NOTE: collect(streaming=True) still returns an in-memory DataFrame;
317
+ # prefer Parquet for very large outputs.
318
+ lf_out.collect(streaming=True).write_csv(out_path)
319
+ logger.info(f"Wrote csv file to {out_path}")
320
+ else:
321
+ # default to Parquet if no/unknown extension
322
+ lf_out.sink_parquet(out_path + ".parquet", compression="zstd", statistics=True)
323
+ logger.info(f"Wrote parquet file to {out_path}")
324
 
325
+ # 8) (Optional) small summary: unique peaks per chrom
326
+ peaks_per_chrom = (
327
+ lf.select("chrom", "peak_sequence")
328
+ .unique()
329
+ .group_by("chrom")
330
+ .len()
331
+ .collect(streaming=True)
332
+ .sort("chrom")
333
+ )
334
+ logger.info(f"Summary per chromosome:\n{peaks_per_chrom}")
335
+
336
+ logger.info("Schema:")
337
+ for name, dtype in lf_out.schema.items():
338
+ logger.info(f" {name}: {dtype}")
339
+
340
+ # Quick preview of a few rows (safe)
341
+ logger.info("\nHead(5):")
342
+ logger.info(lf_out.head(5).collect()) # or: lf.limit(5).collect()
343
+
344
+ # Save the FIRST 1000 rows to CSV (streaming-friendly)
345
+ df_first = lf_out.limit(1000).collect(streaming=True)
346
+ example_out_path = Path(root) / "dpacman/data_files/processed/remap/examples" / "example1000_remap2022_crm_fimo_output_q_processed.csv"
347
+ df_first.write_csv(example_out_path)
348
+ logger.info(f"Wrote first 1000 rows to {example_out_path} as an example")
349
+
350
+ def main(cfg: DictConfig):
351
+ debug = bool(cfg.data_task.debug)
352
+ json_dir = cfg.data_task.json_dir
353
+ fimo_out_dir = Path(root) / cfg.data_task.fimo_out_dir
354
+ processed_output_csv = Path(root) / cfg.data_task.processed_output_csv
355
+ output_parts_folder = processed_output_csv.parent / "temp_parts"
356
+ max_workers = getattr(cfg.data_task, "max_workers", None)
357
+
358
+ logger.info(f"Debug: {debug}")
359
+ logger.info(f"Reading per-chrom final.csv under: {fimo_out_dir}")
360
+
361
+ if False:
362
+ paths_to_processed_dfs = build_dataset_fast_mp(
363
+ fimo_out_dir=fimo_out_dir,
364
+ json_dir=json_dir,
365
+ debug=debug,
366
+ max_workers=max_workers,
367
+ jaspar_boost=cfg.data_task.jaspar_boost,
368
+ output_parts_folder=output_parts_folder
369
+ )
370
+ else:
371
+ paths_to_processed_dfs = [output_parts_folder/x for x in os.listdir(output_parts_folder)] if output_parts_folder.exists() else []
372
+
373
+ logger.info(f"Combining {len(paths_to_processed_dfs)} processed parts with Polars")
374
+ combine_processed_with_polars(
375
+ paths_to_processed_dfs=paths_to_processed_dfs,
376
+ idmap_path=Path(root) / cfg.data_task.idmap_path,
377
+ out_path=str(processed_output_csv).replace(".csv", ".parquet")
378
+ )
379
+
380
+ # Delete the folder that had the temporary DFs, don't need these
381
+ if False and output_parts_folder.exists():
382
+ for f in output_parts_folder.glob("*.csv"):
383
+ f.unlink()
384
+ output_parts_folder.rmdir()
385
+ logger.info(f"Cleaned up temporary files in {output_parts_folder}")
386
 
387
  if __name__ == "__main__":
388
+ # On some clusters with older Python, 'fork' is default and fine.
389
+ # If you hit issues (e.g., with threads/IO), uncomment spawn:
390
+ # mp.set_start_method("spawn", force=True)
391
  main()
dpacman/data_tasks/fimo/pre_fimo.py CHANGED
@@ -4,84 +4,229 @@ import numpy as np
4
  import rootutils
5
  import logging
6
  import os
 
 
 
7
  from omegaconf import DictConfig
8
  from pathlib import Path
 
9
 
10
  root = rootutils.setup_root(__file__, indicator=".project-root", pythonpath=True)
11
  logger = logging.getLogger(__name__)
12
 
13
- def main(cfg: DictConfig):
14
- # 1) load
15
- input_path = Path(root) / cfg.data_task.input_csv
16
- df = pd.read_csv(input_path, sep="\t")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17
 
18
- # 2) normalize chromosomes and exclude non-whole chromosomes
19
- df["chrom"] = df["chrom"].str.replace(r"^chr", "", regex=True)
20
-
21
- valid = [str(i) for i in range(1, 23)] + ["X", "Y"]
22
- df = df[df["chrom"].isin(valid)].reset_index(drop=True)
23
-
24
- # 3) explode TF names
25
- df["tr_list"] = df["tr"].str.split(",")
26
- df = df.explode("tr_list").rename(columns={"tr_list": "TR"})
27
- df["TR"] = df["TR"].str.strip()
28
-
29
- # 4) draw a random left‐flank between 0 and WINDOW_TOTAL,
30
- # then right‐flank is whatever remains to sum to WINDOW_TOTAL
31
- n = len(df)
32
- df["left_context"] = np.random.randint(0, cfg.data_task.window_total + 1, size=n)
33
- df["right_context"] = cfg.data_task.window_total - df["left_context"]
34
-
35
- # 5) compute contextStart / contextEnd
36
- df["contextStart"] = (
37
- (df["chromStart"] - df["left_context"]).clip(lower=0).astype(int)
38
- )
39
- df["contextEnd"] = (df["chromEnd"] + df["right_context"]).astype(int)
40
-
41
- # 6) assemble output
42
- out = df[
43
- [
44
- "chrom",
45
- "contextStart",
46
- "chromStart", # original ChIPStart
47
- "chromEnd", # original ChIPEnd
48
- "contextEnd",
49
- "score", # original score column
50
- "TR",
51
- ]
52
- ].rename(
53
- columns={
54
- "chrom": "#chrom",
55
- "chromStart": "ChIPStart",
56
- "chromEnd": "ChIPEnd",
57
- "score": "chipscore",
58
- }
59
- )
60
-
61
- # 7 make folder for tsv
62
- output_path = Path(root) / cfg.data_task.output_csv
63
  os.makedirs(output_path.parent, exist_ok=True)
64
 
65
- # 8) write csv
66
- out.to_csv(output_path, index=False)
67
- logger.info(f"Wrote {len(out)} rows to {output_path}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
 
 
 
 
 
69
  # 9) write example csv if necessary
70
- if cfg.data_task.save_example_files:
71
  example_dir = output_path.parent / "examples"
72
  os.makedirs(example_dir, exist_ok=True)
73
- output_csv_name = cfg.data_task.output_csv.split("/")[-1]
74
  example_savepath = os.path.join(
75
  example_dir, "example500_" + output_csv_name
76
  )
77
 
78
  if not (os.path.exists(example_savepath)):
79
  out.sample(n=500, random_state=42).reset_index(drop=True).to_csv(
80
- example_savepath, sep="\t", index=False
81
  )
82
  logger.info(
83
  f"Saved example FIMO input file with 500 rows to: {example_savepath}"
84
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
85
 
86
 
87
  if __name__ == "__main__":
 
4
  import rootutils
5
  import logging
6
  import os
7
+ import json
8
+ import multiprocessing as mp
9
+ from multiprocessing import Pool, cpu_count
10
  from omegaconf import DictConfig
11
  from pathlib import Path
12
+ from hydra.core.hydra_config import HydraConfig
13
 
14
  root = rootutils.setup_root(__file__, indicator=".project-root", pythonpath=True)
15
  logger = logging.getLogger(__name__)
16
 
17
+ def init_worker(log_file, logger_name):
18
+ """Initialize a logger in each worker."""
19
+ wlogger = logging.getLogger(logger_name)
20
+ wlogger.setLevel(logging.INFO)
21
+ wlogger.propagate = False # Don't double-log to root
22
+
23
+ # Avoid re-adding handlers if this logger is reused
24
+ if not wlogger.handlers:
25
+ handler = logging.FileHandler(log_file)
26
+ formatter = logging.Formatter('%(asctime)s - %(message)s')
27
+ handler.setFormatter(formatter)
28
+ wlogger.addHandler(handler)
29
+
30
+ return wlogger
31
+
32
+ def process_chromosome(chrom, df, json_dir, output_path, example_dir, save_example_files, log_dir):
33
+ log_file = Path(log_dir) / f"chrom_{chrom}.log"
34
+ logger_name = f"logger_chrom_{chrom}"
35
+ wlogger = init_worker(log_file, logger_name)
36
+
37
+ wlogger.info(f"Processing chromosome {chrom}")
38
+
39
+ sub_df = df[df["#chrom"] == chrom].reset_index(drop=True)
40
+ sub_out_dir = Path(output_path) / f"chr{chrom}"
41
+ os.makedirs(sub_out_dir, exist_ok=True)
42
+
43
+ seq_fasta_path = sub_out_dir / "to_scan.fa"
44
+ extract_sequences(sub_df, seq_fasta_path, json_dir, wlogger)
45
+
46
+ if save_example_files:
47
+ example_out = Path(example_dir) / f"example_chr{chrom}_to_scan.fa"
48
+ if not example_out.exists():
49
+ with open(seq_fasta_path, "r") as f:
50
+ lines = f.readlines()
51
+ with open(example_out, "w") as f:
52
+ f.write("".join(lines[:50]))
53
+ wlogger.info(f"Saved example: {example_out}")
54
 
55
+ def assemble_main_input(input_csv: str, window_total: int, output_csv: str, save_example_files: bool):
56
+ """
57
+ Method for assembling the main input dataframe
58
+
59
+ Args:
60
+ - input_csv: path to input csv that is converted to the file for FIMO input
61
+ - window_total: int determining the total non-ChIPseq-peak nucleotides included in a datapoint
62
+ - output_csv: where processed file will be saved
63
+ - save_example_files: bool determining whether we save example files that can be easily viewed
64
+ """
65
+ # 1) make input and output paths
66
+ input_path = Path(root) / input_csv
67
+ df = pd.read_csv(input_path, sep="\t")
68
+ out = None # initialize out
69
+
70
+ output_path = Path(root) / output_csv
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
  os.makedirs(output_path.parent, exist_ok=True)
72
 
73
+ if not(os.path.exists(output_path)):
74
+ # 2) normalize chromosomes and exclude non-whole chromosomes
75
+ df["chrom"] = df["chrom"].str.replace(r"^chr", "", regex=True)
76
+
77
+ valid = [str(i) for i in range(1, 23)] + ["X", "Y"]
78
+ df = df[df["chrom"].isin(valid)].reset_index(drop=True)
79
+
80
+ # 3) explode TF names
81
+ df["tr_list"] = df["tr"].str.split(",")
82
+ df = df.explode("tr_list").rename(columns={"tr_list": "TR"})
83
+ df["TR"] = df["TR"].str.strip()
84
+
85
+ # 4) draw a random left‐flank between 0 and WINDOW_TOTAL,
86
+ # then right‐flank is whatever remains to sum to WINDOW_TOTAL
87
+ n = len(df)
88
+ df["left_context"] = np.random.randint(0, window_total + 1, size=n)
89
+ df["right_context"] = window_total - df["left_context"]
90
+
91
+ # 5) compute contextStart / contextEnd
92
+ df["contextStart"] = (
93
+ (df["chromStart"] - df["left_context"]).clip(lower=0).astype(int)
94
+ )
95
+ df["contextEnd"] = (df["chromEnd"] + df["right_context"]).astype(int)
96
+
97
+ # 6) assemble output
98
+ out = df[
99
+ [
100
+ "chrom",
101
+ "contextStart",
102
+ "chromStart", # original ChIPStart
103
+ "chromEnd", # original ChIPEnd
104
+ "contextEnd",
105
+ "score", # original score column
106
+ "TR",
107
+ ]
108
+ ].rename(
109
+ columns={
110
+ "chrom": "#chrom",
111
+ "chromStart": "ChIPStart",
112
+ "chromEnd": "ChIPEnd",
113
+ "score": "chipscore",
114
+ }
115
+ )
116
+
117
+ # 8) write csv
118
+ out.to_csv(output_path, index=False)
119
+ logger.info(f"Wrote {len(out)} rows to {output_path}")
120
 
121
+ # Load the DF if we need
122
+ if out is None:
123
+ out = pd.read_csv(output_path)
124
+
125
  # 9) write example csv if necessary
126
+ if save_example_files:
127
  example_dir = output_path.parent / "examples"
128
  os.makedirs(example_dir, exist_ok=True)
129
+ output_csv_name = output_csv.split("/")[-1]
130
  example_savepath = os.path.join(
131
  example_dir, "example500_" + output_csv_name
132
  )
133
 
134
  if not (os.path.exists(example_savepath)):
135
  out.sample(n=500, random_state=42).reset_index(drop=True).to_csv(
136
+ example_savepath, index=False
137
  )
138
  logger.info(
139
  f"Saved example FIMO input file with 500 rows to: {example_savepath}"
140
  )
141
+
142
+ return out
143
+
144
+ def load_chrom_dna(chrom, cache, json_dir):
145
+ """
146
+ Load DNA from the chromosome that we pre-downloaded
147
+ """
148
+ json_dir = Path(root) / json_dir
149
+ if chrom in cache:
150
+ return cache[chrom]
151
+ fname = os.path.join(json_dir, f"hg38_chr{chrom}.json")
152
+ if not os.path.isfile(fname):
153
+ raise FileNotFoundError(f"Chrom JSON not found: {fname}")
154
+ with open(fname) as f:
155
+ cache[chrom] = json.load(f)["dna"]
156
+ return cache[chrom]
157
+
158
+ def parallel_make_all_fasta_inputs(df, json_dir, output_path, example_dir, save_example_files=True, max_workers=8):
159
+ df["#chrom"] = df["#chrom"].astype(str)
160
+ chromosomes = df["#chrom"].unique().tolist()
161
+
162
+ log_dir = Path(HydraConfig.get().run.dir) / "logs"
163
+
164
+ os.makedirs(log_dir, exist_ok=True)
165
+ logger.info(f"Created {log_dir} for storing logs for subprocesses.")
166
+
167
+ os.makedirs(example_dir, exist_ok=True)
168
+ logger.info(f"Created {example_dir} for storing example inputs")
169
+
170
+ args = [
171
+ (chrom, df, json_dir, output_path, example_dir, save_example_files, log_dir)
172
+ for chrom in chromosomes
173
+ ]
174
+
175
+ with mp.Pool(processes=max_workers) as pool:
176
+ pool.starmap(process_chromosome, args)
177
+
178
+ def extract_sequences(df, seq_fasta, json_dir, wlogger):
179
+ """
180
+ Make the main sequence fasta for this chromosome. Used for building the background model.
181
+ """
182
+ dna_cache = {}
183
+ n_rows = len(df)
184
+ checkpoints = set(int(n_rows * i / 100) for i in range(1, 101)) # 1% to 100%
185
+
186
+ wlogger.info(f"Writing to {seq_fasta}")
187
+ if not(os.path.exists(seq_fasta)):
188
+ with open(seq_fasta, "w") as fa:
189
+ for idx, row in df.iterrows():
190
+ chrom = str(row["#chrom"])
191
+ tr = str(row["TR"])
192
+ dna = load_chrom_dna(chrom, dna_cache, json_dir)
193
+ start = int(row["contextStart"])
194
+ end = int(row["contextEnd"])
195
+ seq = dna[start:end] # end index is not included in ChIP-seq peaks
196
+ header = f"{idx}_chr{chrom}_{tr}_{start}_{end}"
197
+ fa.write(f">{header}\n{seq}\n")
198
+
199
+ # log every 1%
200
+ if idx in checkpoints:
201
+ wlogger.info(f" Reached {idx / n_rows:.0%} of the DataFrame (index {idx})")
202
+
203
+ def main(cfg: DictConfig):
204
+ # 1) make the full input CSV
205
+ paths = cfg.data_task.paths
206
+ df = assemble_main_input(input_csv=paths.input_csv,
207
+ window_total=cfg.data_task.window_total,
208
+ output_csv=paths.output_csv,
209
+ save_example_files=cfg.data_task.save_example_files)
210
+
211
+ # Make example dir to use in future methods
212
+ example_dir = Path(root) / paths.output_csv
213
+ example_dir = example_dir.parent / "examples"
214
+ os.makedirs(example_dir, exist_ok=True)
215
+
216
+ # 2) Make individual input files per chromosome
217
+ total_chroms = len(df["#chrom"].unique().tolist())
218
+ max_workers = cpu_count() - 1
219
+ logger.info(f"Max workers available (cpu_count - 1): {max_workers}")
220
+ max_workers = min(max_workers, total_chroms)
221
+ logger.info(f"min(max_workers, total_chroms) = {max_workers}")
222
+
223
+ parallel_make_all_fasta_inputs(df,
224
+ json_dir=paths.json_dir,
225
+ output_path=Path(root) / paths.chrom_output_path,
226
+ example_dir=example_dir,
227
+ save_example_files=cfg.data_task.save_example_files,
228
+ max_workers=max_workers)
229
+
230
 
231
 
232
  if __name__ == "__main__":
dpacman/data_tasks/fimo/run_fimo.py CHANGED
@@ -9,34 +9,13 @@ import rootutils
9
  import logging
10
  from omegaconf import DictConfig
11
  from pathlib import Path
 
 
 
12
 
13
  root = rootutils.setup_root(__file__, indicator=".project-root", pythonpath=True)
14
  logger = logging.getLogger(__name__)
15
 
16
-
17
- def load_chrom_dna(chrom, cache, json_dir):
18
- if chrom in cache:
19
- return cache[chrom]
20
- fname = os.path.join(json_dir, f"hg38_chr{chrom}.json")
21
- if not os.path.isfile(fname):
22
- raise FileNotFoundError(f"Chrom JSON not found: {fname}")
23
- with open(fname) as f:
24
- cache[chrom] = json.load(f)["dna"]
25
- return cache[chrom]
26
-
27
-
28
- def extract_sequences(df, seq_fasta, json_dir):
29
- dna_cache = {}
30
- with open(seq_fasta, "w") as fa:
31
- for idx, row in df.iterrows():
32
- chrom = str(row["#chrom"])
33
- dna = load_chrom_dna(chrom, dna_cache, json_dir)
34
- start = int(row["contextStart"])
35
- end = int(row["contextEnd"])
36
- seq = dna[start:end]
37
- fa.write(f">{idx}\n{seq}\n")
38
-
39
-
40
  def run_markov(fasta_get_markov, seq_fasta, bg_model):
41
  subprocess.check_call(
42
  [fasta_get_markov, seq_fasta, bg_model],
@@ -44,31 +23,55 @@ def run_markov(fasta_get_markov, seq_fasta, bg_model):
44
  stderr=subprocess.DEVNULL,
45
  )
46
 
 
 
 
 
 
 
 
 
 
47
 
48
- def split_fasta(n_chunks, seq_fasta):
49
- """Round-robin split SEQ_FASTA into chunked FASTA files."""
50
- out_handles = [open(f"to_scan_{i}.fa", "w") for i in range(n_chunks)]
51
- with open(seq_fasta) as inf:
52
  header = None
53
  seq_lines = []
 
54
  for line in inf:
55
  if line.startswith(">"):
56
  if header is not None:
57
- idx = int(header[1:].split()[0]) % n_chunks
58
- out_handles[idx].write(header)
59
- out_handles[idx].write("".join(seq_lines))
 
 
 
 
60
  header = line
61
  seq_lines = []
62
  else:
63
  seq_lines.append(line)
 
64
  # last record
65
  if header is not None:
66
- idx = int(header[1:].split()[0]) % n_chunks
67
- out_handles[idx].write(header)
68
- out_handles[idx].write("".join(seq_lines))
 
 
 
 
 
69
  for o in out_handles:
70
  o.close()
71
- return [f"to_scan_{i}.fa" for i in range(n_chunks)]
 
 
 
 
 
72
 
73
 
74
  def run_fimo_chunk(cfg):
@@ -83,41 +86,80 @@ def run_fimo_chunk(cfg):
83
  - bg_model
84
  - max_stored
85
  - motif_file
86
- - pval_thresh
 
 
87
  """
88
- outdir = f"{cfg['fimo_outdir']}_{cfg['chunk_id']}"
 
 
 
 
 
 
 
 
 
89
  os.makedirs(outdir, exist_ok=True)
90
- logger.info(f"Chunk {cfg['chunk_id']} starting FIMO")
91
- subprocess.check_call(
92
- [
93
- cfg["fimo_bin"],
94
- "--oc",
95
- outdir,
96
- "--bgfile",
97
- cfg["bg_model"],
98
- "--max-stored-scores",
99
- str(cfg["max_stored"]),
100
- "--thresh",
101
- str(cfg["pval_thresh"]),
102
- cfg["motif_file"],
103
- cfg["fasta_path"],
104
- ]
105
- )
106
- logger.info(f"Chunk {cfg['chunk_id']} finished")
107
- return os.path.join(outdir, "fimo.tsv")
108
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
109
 
110
- def annotate_with_fimo(df, fimo_tsv):
111
- fdf = pd.read_csv(fimo_tsv, sep="\t", comment="#")
112
- fdf["idx"] = fdf["sequence_name"].astype(int)
113
- fdf = fdf.merge(df[["idx", "contextStart"]], on="idx", how="left")
114
  fdf["genomic_start"] = fdf["contextStart"] + fdf["start"] - 1
115
  fdf["genomic_end"] = fdf["contextStart"] + fdf["stop"]
116
  fdf["coord"] = (
117
  fdf["genomic_start"].astype(str) + "-" + fdf["genomic_end"].astype(str)
118
  )
119
- agg = fdf.groupby("idx")["coord"].agg(lambda hits: ",".join(hits))
120
- df["jaspar"] = df["idx"].map(agg).fillna("")
121
  return df
122
 
123
 
@@ -128,81 +170,116 @@ def main(cfg: DictConfig):
128
  # 0) configs
129
  paths = cfg.data_task.paths
130
  fimo = cfg.data_task.fimo
131
- fnames = cfg.data_task.fnames
132
  meme = cfg.data_task.meme
133
-
134
  # set njobs to max or whatever # is specified by user
135
  njobs = fimo.njobs
136
  if njobs == "max":
137
- njobs = cpu_count() - 1
138
  else:
139
- njobs = min(cpu_count() - 1, int(njobs))
140
-
141
- # 1) load & explode
142
- input_csv_path = Path(root) / paths.input_csv
143
- df = pd.read_csv(input_csv_path, low_memory=False)
144
- df = df.reset_index().rename(columns={"index": "idx"})
145
- df["TF_occurrence"] = df.groupby("TF").cumcount() + 1
146
- df["TF_id"] = df["TF"] + "_seq" + df["TF_occurrence"].astype(str)
147
 
 
 
 
 
 
 
 
 
148
  # 2) extract sequences & build BG model
149
- extract_sequences(df, fnames.seq_fasta, paths.json_dir)
150
- logger.info("Building background model…")
151
- run_markov(meme.fasta_get_markov, fnames.seq_fasta, fnames.bg_model)
152
-
153
- # 3) chunk FASTA and run FIMO in parallel
154
- chunks = split_fasta(njobs)
155
- chunk_cfgs = [
156
- dict(
157
- chunk_id=i,
158
- fasta_path=chunk,
159
- fimo_outdir=fnames.fimo_outdir,
160
- fimo_bin=paths.fimo_bin,
161
- bg_model=fnames.bg_model,
162
- max_stored=fimo.max_stored,
163
- motif_file=meme.jaspar_motif_file,
164
- pval_thresh=fimo.pval_thresh,
165
- )
166
- for i, chunk in enumerate(chunks)
167
- ]
168
- logger.info(f"Running FIMO in parallel ({njobs} jobs)…")
169
- with Pool(njobs) as pool:
170
- tsv_paths = list(
171
- tqdm(
172
- pool.imap(run_fimo_chunk, chunk_cfgs),
173
- total=len(chunks),
174
- desc="FIMO chunks",
175
- leave=True,
176
- )
177
- )
178
 
179
- # 4) merge chunked TSVs
180
- combined = pd.concat(
181
- [pd.read_csv(tsv, sep="\t", comment="#") for tsv in tsv_paths],
182
- ignore_index=True,
183
- )
184
- merged_tsv = "fimo_combined.tsv"
185
- combined.to_csv(merged_tsv, sep="\t", index=False)
186
-
187
- # 5) annotate & write final CSV
188
- df = annotate_with_fimo(df, merged_tsv)
189
- final = df[
190
- [
191
- "#chrom",
192
- "contextStart",
193
- "ChIPStart",
194
- "ChIPEnd",
195
- "contextEnd",
196
- "chipscore",
197
- "TF",
198
- "TF_id",
199
- "jaspar",
 
 
 
200
  ]
201
- ]
202
- output_csv_path = Path(root) / paths.output_csv
203
- final.to_csv(output_csv_path, index=False)
204
- logger.info(f"Wrote {len(final)} rows → {output_csv_path}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
205
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
206
 
207
  if __name__ == "__main__":
208
  main()
 
9
  import logging
10
  from omegaconf import DictConfig
11
  from pathlib import Path
12
+ import time
13
+ import shutil
14
+ from hydra.core.hydra_config import HydraConfig
15
 
16
  root = rootutils.setup_root(__file__, indicator=".project-root", pythonpath=True)
17
  logger = logging.getLogger(__name__)
18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
  def run_markov(fasta_get_markov, seq_fasta, bg_model):
20
  subprocess.check_call(
21
  [fasta_get_markov, seq_fasta, bg_model],
 
23
  stderr=subprocess.DEVNULL,
24
  )
25
 
26
+ def split_fasta(n_chunks, input_file, output_dir, debug=False, debug_n=1000, all_caps=True):
27
+ """
28
+ Round-robin split SEQ_FASTA into chunked FASTA files.
29
+ If in debug mode, only keep the first 5 entries for each.
30
+ """
31
+ output_dir = Path(root) / output_dir
32
+ out_names = [os.path.join(output_dir, f"to_scan_{i}.fa") for i in range(n_chunks)]
33
+ out_handles = [open(out_names[i], "w") for i in range(n_chunks)]
34
+ chunk_counts = [0] * n_chunks # Count sequences per chunk
35
 
36
+ logger.info(f"ALL CAPS mode: {all_caps}")
37
+
38
+ with open(input_file) as inf:
 
39
  header = None
40
  seq_lines = []
41
+
42
  for line in inf:
43
  if line.startswith(">"):
44
  if header is not None:
45
+ idx = int(header[1:].split("_")[0]) % n_chunks
46
+ if not debug or chunk_counts[idx] < debug_n:
47
+ out_handles[idx].write(header)
48
+ seqj = "".join(seq_lines)
49
+ if all_caps: seqj = seqj.upper()
50
+ out_handles[idx].write(seqj)
51
+ chunk_counts[idx] += 1
52
  header = line
53
  seq_lines = []
54
  else:
55
  seq_lines.append(line)
56
+
57
  # last record
58
  if header is not None:
59
+ idx = int(header[1:].split("_")[0]) % n_chunks
60
+ if not debug or chunk_counts[idx] < debug_n:
61
+ out_handles[idx].write(header)
62
+ seqj = "".join(seq_lines)
63
+ if all_caps: seqj = seqj.upper()
64
+ out_handles[idx].write(seqj)
65
+ chunk_counts[idx] += 1
66
+
67
  for o in out_handles:
68
  o.close()
69
+
70
+ # Log chunk sizes
71
+ for i, count in enumerate(chunk_counts):
72
+ logger.info(f"Chunk {i}: {count} sequences")
73
+
74
+ return out_names
75
 
76
 
77
  def run_fimo_chunk(cfg):
 
86
  - bg_model
87
  - max_stored
88
  - motif_file
89
+ - thresh
90
+ - thresh_mode
91
+ - outdir
92
  """
93
+ chunk_id = cfg["chunk_id"]
94
+ log_dir = Path(HydraConfig.get().run.dir) / "logs"
95
+ log_dir.mkdir(parents=True, exist_ok=True)
96
+
97
+ log_file = log_dir / f"fimo_chunk_{chunk_id}.log"
98
+ wlogger = logging.getLogger(f"fimo_chunk_{chunk_id}")
99
+ wlogger.setLevel(logging.DEBUG)
100
+ wlogger.propagate = False # Don't double-log to root
101
+
102
+ outdir = Path(cfg["outdir"])
103
  os.makedirs(outdir, exist_ok=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
 
105
+ if not any(isinstance(h, logging.FileHandler) for h in wlogger.handlers):
106
+ fh = logging.FileHandler(log_file, mode="w", encoding="utf-8")
107
+ fh.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
108
+ wlogger.addHandler(fh)
109
+
110
+ # make an output directory for this chromosome
111
+ wlogger.info(f"Chunk {cfg['chunk_id']} starting FIMO")
112
+ wlogger.info(f"Threshold mode: {cfg['thresh_mode']}")
113
+
114
+ try:
115
+ call_list = [
116
+ cfg["fimo_bin"],
117
+ "--oc",
118
+ outdir,
119
+ "--bfile",
120
+ cfg["bg_model"],
121
+ "--max-stored-scores",
122
+ str(cfg["max_stored"]),
123
+ "--thresh",
124
+ str(cfg["thresh"]),
125
+ "--qv-thresh", # threshold on q-value
126
+ "--no-pgc", # suppress parsing of genomic coordinates in FASTA sequence header
127
+ cfg["motif_file"],
128
+ cfg["fasta_path"],
129
+ ]
130
+ if cfg["thresh_mode"]!="q":
131
+ call_list = [x for x in call_list if x!="--qv-thresh"]
132
+ assert "--qv-thresh" not in call_list
133
+ with open(log_file, "a") as log_fh:
134
+ subprocess.check_call(
135
+ call_list,
136
+ stdout=log_fh,
137
+ stderr=log_fh,
138
+ )
139
+ wlogger.info(f"\tChunk {cfg['chunk_id']} finished")
140
+
141
+ # Delete the file - gotta save space!
142
+ file_path = Path(cfg["fasta_path"])
143
+ if file_path.exists() and file_path.is_file():
144
+ file_path.unlink()
145
+ wlogger.info(f"\tDeleted file: {file_path}")
146
+
147
+ except subprocess.CalledProcessError as e:
148
+ wlogger.error(f"\tChunk {chunk_id}: FIMO failed with error code {e.returncode}")
149
+ raise
150
+ return os.path.join(outdir, f"fimo.tsv")
151
 
152
+ def annotate_with_fimo(df, fdf):
153
+ df = df.reset_index().rename(columns={"index":"idx"})
154
+ df["sequence_name"] = df["idx"].astype(str) + "_chr" + df["#chrom"] + "_" + df["TR"] + "_" + df["contextStart"].astype(str) + "_" + df["contextEnd"].astype(str) #construt it the same way as headers
155
+ fdf = fdf.merge(df[["sequence_name", "contextStart"]], on="sequence_name", how="left")
156
  fdf["genomic_start"] = fdf["contextStart"] + fdf["start"] - 1
157
  fdf["genomic_end"] = fdf["contextStart"] + fdf["stop"]
158
  fdf["coord"] = (
159
  fdf["genomic_start"].astype(str) + "-" + fdf["genomic_end"].astype(str)
160
  )
161
+ agg = fdf.groupby("sequence_name")["coord"].agg(lambda hits: ",".join(hits))
162
+ df["jaspar"] = df["sequence_name"].map(agg).fillna("")
163
  return df
164
 
165
 
 
170
  # 0) configs
171
  paths = cfg.data_task.paths
172
  fimo = cfg.data_task.fimo
 
173
  meme = cfg.data_task.meme
174
+
175
  # set njobs to max or whatever # is specified by user
176
  njobs = fimo.njobs
177
  if njobs == "max":
178
+ njobs = cpu_count()
179
  else:
180
+ njobs = min(cpu_count(), int(njobs))
 
 
 
 
 
 
 
181
 
182
+ # 1) Optionally, acitvate test mode
183
+ # ── TEST MODE: extract just chromosome 1 to benchmark a smaller job ──
184
+ chroms = [str(x) for x in cfg.data_task.chroms]
185
+ logger.info(f"Debug setting: {cfg.data_task.debug}")
186
+ if cfg.data_task.debug:
187
+ chroms = chroms[0:1]
188
+ logging.info(f" DEBUG MODE: running on only one chromosome: {chroms}")
189
+
190
  # 2) extract sequences & build BG model
191
+ for chrom in chroms:
192
+ path_to_fasta = Path(root) / Path(paths.input_fasta_outer_dir) / f"chr{chrom}" / paths.seq_fasta
193
+ path_to_bg = Path(root) / Path(paths.input_fasta_outer_dir) / f"chr{chrom}" / paths.bg_model
194
+ logging.info(f"Path to fasta file: {path_to_fasta}")
195
+ logger.info(f"Building background model at {path_to_bg}…")
196
+ run_markov(Path(root)/meme.fasta_get_markov, path_to_fasta, Path(root) / path_to_bg)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
197
 
198
+ # 3) chunk FASTA and run FIMO in parallel
199
+ # make a folder to store the split fastas
200
+ chunk_folder = Path(path_to_fasta.parent) / "chunks"
201
+ os.makedirs(chunk_folder, exist_ok=True)
202
+ logger.info(f"Made directory {chunk_folder} to store {njobs} chunked fastas")
203
+ chunks = split_fasta(njobs, input_file=path_to_fasta, output_dir=chunk_folder, debug=cfg.data_task.debug, all_caps=cfg.data_task.all_caps)
204
+
205
+ chrom_outdir = Path(root) / paths.fimo_outdir / f"chrom{chrom}"
206
+ os.makedirs(chrom_outdir, exist_ok=True)
207
+
208
+ chunk_cfgs = [
209
+ dict(
210
+ chunk_id=i,
211
+ fasta_path=chunk,
212
+ fimo_outdir=Path(root)/ paths.fimo_outdir,
213
+ fimo_bin=Path(root) / meme.fimo_bin,
214
+ bg_model=path_to_bg,
215
+ max_stored=fimo.max_stored,
216
+ motif_file=Path(root) / meme.jaspar_motif_file,
217
+ thresh=fimo.thresh,
218
+ thresh_mode=fimo.thresh_mode,
219
+ outdir=Path(chrom_outdir) / f"chunk{i}"
220
+ )
221
+ for i, chunk in enumerate(chunks)
222
  ]
223
+ logger.info(f"Running FIMO in parallel ({njobs} jobs)…")
224
+ start_time = time.time()
225
+ # Call the parallel jobs and get back a list of tsv paths
226
+ with Pool(njobs) as pool:
227
+ tsv_paths = pool.map(run_fimo_chunk, chunk_cfgs)
228
+ end_time = time.time()
229
+ logger.info(f"COMPLETED FIMO ({njobs} parallel jobs) in {end_time-start_time:.2f}s")
230
+ # cleanup! delete the chunked input files
231
+ if not any(chunk_folder.iterdir()): # Empty folder
232
+ chunk_folder.rmdir()
233
+ logger.info(f"Deleted empty folder: {chunk_folder}")
234
+
235
+ # 4) merge chunked TSVs. Some may be empty, so can't do a simple loop
236
+ # delete intermediate folders as we go
237
+ dfs = []
238
+ for tsv in tsv_paths:
239
+ try:
240
+ df = pd.read_csv(tsv, sep="\t", comment="#")
241
+ if not df.empty:
242
+ dfs.append(df)
243
+ except pd.errors.EmptyDataError:
244
+ logger.warning(f"Skipped empty TSV (only comments or blank): {tsv}")
245
+ except Exception as e:
246
+ logger.error(f"Error reading {tsv}: {e}")
247
+ raise # Or continue, depending on your needs
248
+
249
+ # delete this folder to save storage
250
+ chunk_dir = Path(tsv).parent
251
+ try:
252
+ shutil.rmtree(chunk_dir)
253
+ logger.info(f"Deleted chunk directory: {chunk_dir}")
254
+ except Exception as e:
255
+ logger.warning(f"Could not delete chunk dir {chunk_dir}: {e}")
256
 
257
+ combined = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()
258
+
259
+ # 5) annotate & write final CSV
260
+ df = pd.read_csv(Path(root) / paths.input_csv, low_memory=False)
261
+ df["#chrom"] = df["#chrom"].astype(str)
262
+ df = df.loc[df["#chrom"]==chrom].reset_index(drop=True)
263
+ output_full_csv_path = Path(root) / chrom_outdir / f"fimo_annotations.csv"
264
+ combined.to_csv(output_full_csv_path, index=False)
265
+ logger.info(f"Merging FIMO results into input DataFrame, which has {len(df)} rows for chromosome {chrom}")
266
+ df = annotate_with_fimo(df, combined)
267
+
268
+ final = df[
269
+ [
270
+ "#chrom",
271
+ "contextStart",
272
+ "ChIPStart",
273
+ "ChIPEnd",
274
+ "contextEnd",
275
+ "chipscore",
276
+ "TR",
277
+ "jaspar",
278
+ ]
279
+ ]
280
+ output_csv_path = Path(root) / chrom_outdir / f"final.csv"
281
+ final.to_csv(output_csv_path, index=False)
282
+ logger.info(f"Wrote {len(final)} rows to {output_csv_path}")
283
 
284
  if __name__ == "__main__":
285
  main()
dpacman/scripts/run_download.sh CHANGED
@@ -2,7 +2,7 @@
2
 
3
  # Manually specify values used in the config
4
  main_task="preprocess"
5
- data_task_type="download"
6
  timestamp=$(date "+%Y-%m-%d_%H-%M-%S")
7
 
8
  run_dir="$HOME/DPACMAN/logs/${main_task}/${data_task_type}/runs/${timestamp}"
 
2
 
3
  # Manually specify values used in the config
4
  main_task="preprocess"
5
+ data_task_type="clean"
6
  timestamp=$(date "+%Y-%m-%d_%H-%M-%S")
7
 
8
  run_dir="$HOME/DPACMAN/logs/${main_task}/${data_task_type}/runs/${timestamp}"
dpacman/scripts/run_fimo.sh CHANGED
@@ -8,9 +8,10 @@ timestamp=$(date "+%Y-%m-%d_%H-%M-%S")
8
  run_dir="$HOME/DPACMAN/logs/${main_task}/${data_task_type}/runs/${timestamp}"
9
  mkdir -p "$run_dir"
10
 
11
- CUDA_VISIBLE_DEVICES=0 nohup python -u -m scripts.preprocess \
12
  hydra.run.dir="${run_dir}" \
13
- data_task=${data_task_type}/pre_fimo \
 
14
  > "${run_dir}/run.log" 2>&1 &
15
 
16
  echo $! > "${run_dir}/pid.txt"
 
8
  run_dir="$HOME/DPACMAN/logs/${main_task}/${data_task_type}/runs/${timestamp}"
9
  mkdir -p "$run_dir"
10
 
11
+ nohup python -u -m scripts.preprocess \
12
  hydra.run.dir="${run_dir}" \
13
+ data_task=${data_task_type}/post_fimo \
14
+ data_task.debug="false" \
15
  > "${run_dir}/run.log" 2>&1 &
16
 
17
  echo $! > "${run_dir}/pid.txt"
dpacman/scripts/run_fimo_batch.sh ADDED
@@ -0,0 +1,59 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/bin/bash
2
+
3
+ # -------------------------
4
+ # Slurm + Hydra Configuration
5
+ # -------------------------
6
+
7
+ main_task="preprocess"
8
+ data_task_type="fimo"
9
+ fimo_thresh="1e-2"
10
+ fimo_thresh_mode="q"
11
+ max_stored="100000"
12
+ fimo_outdir="dpacman/data_files/processed/fimo/fimo_out_q"
13
+ debug="false"
14
+
15
+ # Chromosomes to run
16
+ #chromosomes=('1' '10' '11' '12' '13' '14' '15' '16' '17' '18' '19' '2' '20' '21' '22' '3' '4' '5' '6' '7' '8' '9' 'X' 'Y')
17
+ chromosomes=('3')
18
+ # -------------------------
19
+ # Slurm Job Submission
20
+ # -------------------------
21
+
22
+ for chrom in "${chromosomes[@]}"; do
23
+ timestamp=$(date "+%Y-%m-%d_%H-%M-%S")
24
+ run_dir="$HOME/DPACMAN/logs/${main_task}/${data_task_type}/runs/${timestamp}_chr${chrom}"
25
+ mkdir -p "$run_dir"
26
+
27
+ sbatch <<EOF
28
+ #!/bin/bash
29
+ #SBATCH --job-name=fimo${chrom}
30
+ #SBATCH --partition=genoa-lrg-mem
31
+ #SBATCH -N 1 ## Number of nodes
32
+ #SBATCH --mem=0
33
+ #SBATCH --ntasks-per-node=64
34
+ #SBATCH --output=${run_dir}/run.log
35
+ #SBATCH --error=${run_dir}/run.log
36
+
37
+ echo "Running FIMO for chromosome ${chrom} at \$(date)"
38
+ cd /vast/projects/pranam/lab/sophie/DPACMAN/dpacman
39
+
40
+ # Load environment
41
+ source /vast/projects/pranam/lab/shared/miniconda3/etc/profile.d/conda.sh
42
+ conda activate dpacman
43
+
44
+ # Run Hydra-based script
45
+ python -u -m scripts.preprocess \\
46
+ hydra.run.dir="${run_dir}" \\
47
+ data_task=${data_task_type}/run_fimo \\
48
+ data_task.chroms=["${chrom}"] \\
49
+ data_task.fimo.thresh=${fimo_thresh} \\
50
+ data_task.fimo.thresh_mode=${fimo_thresh_mode} \\
51
+ data_task.paths.fimo_outdir=${fimo_outdir} \\
52
+ data_task.fimo.max_stored=${max_stored}\\
53
+ data_task.debug=${debug}
54
+
55
+ # Save SLURM job ID
56
+ echo \$SLURM_JOB_ID > "${run_dir}/pid.txt"
57
+ EOF
58
+
59
+ done
environment.yaml CHANGED
@@ -9,6 +9,7 @@
9
  name: dpacman
10
 
11
  channels:
 
12
  - conda-forge
13
  - defaults
14
 
@@ -24,8 +25,10 @@ dependencies:
24
  - python=3.10
25
  - dask[complete]
26
  - pip>=23
 
27
  - pip:
28
  - rootutils==1.0.7
 
29
  - hydra-core==1.3.2 # Hydra for config management
30
  - hydra-colorlog==1.2.0 # Allow colorful logging in Hydra
31
  - omegaconf==2.3.0 # Required by hydra-core
 
9
  name: dpacman
10
 
11
  channels:
12
+ - bioconda
13
  - conda-forge
14
  - defaults
15
 
 
25
  - python=3.10
26
  - dask[complete]
27
  - pip>=23
28
+ - ghostscript=9.18
29
  - pip:
30
  - rootutils==1.0.7
31
+ - polars==1.32.2
32
  - hydra-core==1.3.2 # Hydra for config management
33
  - hydra-colorlog==1.2.0 # Allow colorful logging in Hydra
34
  - omegaconf==2.3.0 # Required by hydra-core