svincoff commited on
Commit
dca7b00
·
1 Parent(s): fc82077

download working

Browse files
Files changed (3) hide show
  1. .gitignore +2 -1
  2. README.md +48 -18
  3. dpacman/data/tfclust/download.py +316 -214
.gitignore CHANGED
@@ -1,2 +1,3 @@
1
  dpacman/data_files
2
- dpacman/data/tfclust/*.log
 
 
1
  dpacman/data_files
2
+ dpacman/data/tfclust/*.log
3
+ dpacman/data/tfclust/temp.py
README.md CHANGED
@@ -8,22 +8,52 @@ license: cc-by-nc-nd-4.0
8
  .
9
  ├── README.md
10
  ├── dpacman
11
- ├── data
12
- ├── README.md
13
- ├── chip_atlas
14
- ├── full_data_loading.py
15
- └── smaller_data_loading.py
16
- └── tfclust
17
- ── download.py
18
- ── data_files
19
- ├── processed
20
- ── tfclust
21
- └── raw
22
- ── chip_atlas
23
- │ └── experimentList.tab
24
- └── tfclust
25
- ├── encRegTfbsClusteredWithCells.hg19.bed
26
- ── encRegTfbsClusteredWithCells.hg38.bed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
  ├── environment.yaml
28
- ── setup.py
29
- ```
 
 
 
 
 
8
  .
9
  ├── README.md
10
  ├── dpacman
11
+    ├── data
12
+       ├── README.md
13
+       ├── chip_atlas
14
+          ├── full_data_loading.py
15
+          └── smaller_data_loading.py
16
+       └── tfclust
17
+       ├── api_download.py
18
+    │   ├── download.log
19
+    │   ├── download.py
20
+       ├── hg38_success_download.log
21
+    │   └── temp.py
22
+    └── data_files
23
+    ├── processed
24
+    │   └── tfclust
25
+    │   ├── hg19
26
+    │   │   ├── encRegTfbsClustered_hg19_chr1.csv
27
+ │   │   │   └── logs
28
+ │   │   │   ├── completed.txt
29
+ │   │   │   ├── completed_worker_0.txt
30
+ │   │   │   ├── worker_0.log
31
+ │   │   └── hg38
32
+ │   │   ├── encRegTfbsClustered_hg38_chr1.csv
33
+ │   │   └── logs
34
+ │   │   ├── completed.txt
35
+ │   │   ├── completed_worker_0.txt
36
+ │   │   ├── worker_0.log
37
+ │   └── raw
38
+ │   ├── chip_atlas
39
+ │   │   └── experimentList.tab
40
+ │   ├── genomes
41
+ │   │   ├── hg19
42
+ │   │   │   ├── hg19_chr1.json
43
+ │   │   └── hg38
44
+ │   │   ├── hg38_chr1.json
45
+ │   └── tfclust
46
+ │   ├── encRegTfbsClusteredWithCells.hg19.bed
47
+ │   ├── encRegTfbsClusteredWithCells.hg38.bed
48
+ │   └── encRegTfbsClustered_data
49
+ │   ├── hg19
50
+ │   │   ├── hg19_encRegTfbsClustered_chr1.json
51
+ │   └── hg38
52
+ │   ├── hg38_encRegTfbsClustered_chr1.json
53
  ├── environment.yaml
54
+ ── setup.py
55
+ └── tree_output.txt
56
+ ```
57
+ 20 directories, 3089 files
58
+
59
+ In `data_files` subfolders, only representative files for certain chromosomes are shown. In reality, any file that contains the substring "_chr" exists for every chromosome in that genome. Genome hg38 has 711 chromosomes. Genome hg19 has 298 chromosomes.
dpacman/data/tfclust/download.py CHANGED
@@ -1,50 +1,90 @@
 
 
1
  import requests
2
- from time import sleep
3
  import json
4
- import logging
5
  import multiprocessing
6
- from concurrent.futures import ThreadPoolExecutor, as_completed
7
- import os
8
- import pandas as pd
9
-
10
 
11
  def get_all_tfs(genome: str = "hg38"):
12
- """
13
- Get all the transcription factors from the appropriate encRegTfbsClusteredWithCells.genome.bed file.
14
- Available in data_files/raw/tfclust for genomes hg38 and hg19
15
- """
16
- # Read raw file
17
  raw_data = pd.read_csv(
18
- "../data_files/encode3TfbsClusteredWithCells.bed", sep="\t", header=None
19
  )
20
  raw_data.columns = ["chrom", "start", "end", "tf_name", "score", "cell_line"]
21
-
22
- # Extract all unique TF names
23
- all_tfs = encode_raw["tf_name"].unique().tolist()
24
  logging.info(f"Found {len(all_tfs)} transcription factors in genome {genome}.")
25
-
26
  return all_tfs
27
 
28
-
29
- def get_all_chroms(genome: str = "hg38"):
30
  """
31
  Fetch all chromosome names for a genome.
32
  Note: some chromosomes are in unexpected formats (e.g. there is 'chr15', but also 'chr15_ML143371v1_fix')
33
  """
 
 
 
34
  url = f"https://api.genome.ucsc.edu/list/chromosomes?genome={genome}"
35
  try:
36
  r = requests.get(url)
37
  r.raise_for_status()
38
  except:
39
- raise ValueError(f"Failed to fetch all chromosomes for genome {genome}")
40
-
41
- all_chroms = [chrom for chrom in r.json()["chromosomes"]]
42
- logging.info(f"Found {len(all_chroms)} chromosomes in genome {genome}.")
 
 
 
 
 
 
 
 
 
 
43
 
44
  return all_chroms
45
 
46
-
47
- def fetch_tfbs_track(chrom: str, genome: str = "hg38"):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
  """
49
  Fetch raw data from the track encRegTfbsClustered.
50
  Returns json data for the specified chromosome, where key information appears as follows:
@@ -63,87 +103,106 @@ def fetch_tfbs_track(chrom: str, genome: str = "hg38"):
63
  ]
64
 
65
  """
 
 
 
66
  params = {"genome": genome, "track": "encRegTfbsClustered", "chrom": chrom}
67
- url = f"https://api.genome.ucsc.edu/getData/track?genome={params['genome']};track={params['track']};chrom={params['chrom']}"
68
- try:
69
- r = requests.get(url)
70
- r.raise_for_status()
71
- except:
72
- raise ValueError(
73
- f"Failed to fetch encRegTfbsClustered for {chrom} in genome {genome}"
74
- )
75
-
76
- # Extract the output and save it
77
- json_out_dir = f"../data_files/raw/tfclust/encRegTfbsClustered_data/{genome}"
78
- os.makedirs(json_out_dir, exist_ok=True)
79
-
80
- # Save it
81
- json_output = r.json()
82
- with open(
83
- f"{json_out_dir}/{params['genome']}_{params['track']}_{params['chrom']}.json",
84
- "w",
85
- ) as f:
86
- json.dump(json_output, f, indent=4)
 
 
 
 
 
 
 
87
 
88
- logging.info(
89
- f"Saved to {json_out_dir}/{params['genome']}_{params['track']}_{params['chrom']}.json"
90
- )
91
  return json_output
92
 
93
-
94
  def get_sequence(
95
- chrom: str,
96
  start: int,
97
  end: int,
98
  flank5: int = 0,
99
  flank3: int = 0,
100
  genome: str = "hg38",
 
101
  ):
102
  """
103
- Given genome, start position, end position, chromosome, and desired flank size, extract the raw DNA sequence
104
- """
105
- new_start = max(0, start - flank)
106
- new_end = end + flank
107
- region = f"{chrom}:{new_start}-{new_end}"
108
- url = f"https://api.genome.ucsc.edu/getData/sequence?genome={genome};chrom={chrom};start={new_start};end={new_end}"
109
- try:
110
- r = requests.get(url)
111
- r.raise_for_status()
112
- except:
113
- raise ValueError(f"Failed to fetch sequence for {region} in genome {genome}")
114
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
115
  results_dict = {
116
  "chromStart": new_start,
117
  "chromEnd": new_end,
118
- "seq": r.json()["dna"],
119
  }
120
  return results_dict
121
 
122
-
123
  def extract_tfbs_with_context(
124
- genome: str = "hg38",
125
- flank5: int = 500,
126
- flank3: int = 500,
 
 
127
  control_run: bool = True, # if there's a flank, whether to also run without flank
128
- out_dir: str = "../data_files/processed/tfclust",
129
- allowed_tfs: list = None, # e.g., ['CTCF', 'MAX']
130
  chroms: list = None,
 
 
 
131
  ):
132
  """
133
- Loop through raw downloads and extract TF binding sites (bs) with flanks
134
- Builds a DataFrame with all the available data for each TF. Columns = ["bin", "chrom", "chromStart", "chromEnd", "name", "score", "scoreCount", "sourceIds", "sourceScores", "seq", "seq_flanked", "chromStart_flanked", "chromEnd_flanked"]
135
  """
136
- # Prepare to save output
137
- os.makedirs(out_dir, exist_ok=True)
138
-
139
- # Get chromosomes
140
  if chroms is None:
141
- logging.info(
142
- "No chromosomes provided, fetching all chromosomes for the given genome..."
 
 
 
 
143
  )
144
- chroms = get_all_chroms(genome)
145
- count = 0
146
-
147
  # Initialize the final DF
148
  results_cols = [
149
  "bin",
@@ -163,7 +222,7 @@ def extract_tfbs_with_context(
163
  "flank3",
164
  ]
165
  results_init = pd.DataFrame(columns=results_cols)
166
-
167
  # Make a list of the types of runs we need
168
  queries = [{"flank5": flank5, "flank3": flank3}]
169
  if not ((flank5 == 0) and (flank3 == 0) and control_run):
@@ -171,22 +230,51 @@ def extract_tfbs_with_context(
171
  queries[0]["type"] = "flank"
172
  elif (flank5 == 0) and (flank3 == 0):
173
  queries[0]["type"] = "control"
174
-
175
- # For each chromosome, download the encRegTfbsClustered track, extract the features, and fetch the sequences
176
- # Loop through chroms
177
- for chrom in chroms:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
178
  results_init.to_csv(
179
- f"{out_dir}/encRegTfbsClustered_{genome}_{chrom}.csv", index=False
180
  )
181
- logging.info(f"Fetching {chrom}...")
 
182
  # Fetch the data json (has start and end positions in the chrom, but not the sequence)
183
  try:
184
- data = fetch_tfbs_track(chrom, genome=genome)
185
- logging.info(f" → Fetched {chrom} successfully")
186
  features = data.get("encRegTfbsClustered", {})
187
- logging.info(f" → Found {len(features)} features")
188
  except Exception as e:
189
- logging.info(f" Failed to fetch {chrom}: {e}")
190
  continue
191
 
192
  # Get the sequences of the DNA binding sites
@@ -197,9 +285,8 @@ def extract_tfbs_with_context(
197
  # Check if tf is valid
198
  tf_name = feature.get("name", "UnknownTF")
199
  if allowed_tfs and tf_name not in allowed_tfs:
 
200
  continue
201
- else:
202
- logging.warning(f"TF name {tf_name} not in allowed_tfs. Skipping.")
203
  # Make sure the chromosomes match and we have the right sequence!
204
  assert (
205
  feature["chrom"] == chrom
@@ -215,146 +302,161 @@ def extract_tfbs_with_context(
215
  end = feature["chromEnd"]
216
 
217
  for query in queries:
218
- try:
219
- results_dict = get_sequence(
220
- chrom,
221
- start,
222
- end,
223
- flank5=query["flank5"],
224
- flank3=query["flank3"],
225
- genome=genome,
226
- )
227
- logging.info(
228
- f" Success on feat. {feature_no} {chrom}:{start}-{end}, type {query['type']}"
229
- )
230
- # Add the returned info
231
- if type == "control":
232
- new_row["seq"] = results_dict["seq"]
233
- else:
234
- new_row["seq_flanked"] = results_dict["seq"]
235
- new_row["chromStart_flanked"] = results_dict["chromStart"]
236
- new_row["chromEnd_flanked"] = results_dict["chromEnd"]
237
- new_row["flank5"] = flank5
238
- new_row["flank3"] = flank3
239
- count += 1
240
- except Exception as e:
241
- logging.info(
242
- f" Skipped feat. {feature_no} {chrom}:{start}-{end} due to error: {e}"
243
- )
244
- continue
245
-
246
- sleep(0.05) # Stay within UCSC's 20 req/sec rate limit
247
-
248
- # Fill out any blank columns
249
- for c in results_cols:
250
- if c not in new_row:
251
- new_row[c] = None
252
-
253
- new_row_df = pd.DataFrame(data=new_row, columns=results_cols)
254
- if new_row_df["seq"] is not None:
255
- new_row_df.to_csv(
256
- f"{out_dir}/encRegTfbsClustered_{chrom}.csv",
257
- mode="a",
258
- index=False,
259
- header=False,
260
- )
261
- logging.info(
262
- f"Wrote new row to {out_dir}/encRegTfbsClustered_{chrom}.csv"
263
  )
 
 
 
 
 
 
 
 
 
264
 
265
- logging.info(f"Done. Wrote {count} sequences to {output}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
266
 
 
 
 
 
 
 
267
 
268
- # Thread function for one chromosome
269
- def process_chrom(
270
- chrom: str = "chr1",
271
- genome: str = "hg38",
272
- flank5: int = 500,
273
- flank3: int = 500,
274
- control_run: bool = True,
275
- out_dir: str = "../data_files/processed/tfclust",
276
- allowed_tfs: list = None,
277
- max_cpu_frac: float = None,
278
- ):
279
  """
280
- Called within parallel method to strat a thread
281
  """
282
- logging.info(f"Starting thread for {chrom}")
283
- try:
284
- extract_tfbs_with_context(
285
- genome=genome,
286
- flank5=flank5,
287
- flank3=flank3,
288
- control_run=control_run,
289
- out_dir=out_dir,
290
- allowed_tfs=allowed_tfs,
291
- chroms=[chrom], # important: wrap in list
292
- )
293
- logging.info(f"Finished {chrom}")
294
- except Exception as e:
295
- logging.error(f"Error processing {chrom}: {e}")
296
 
 
 
297
 
298
- def parallel_extract_tfbs_with_context(
299
- genome: str = "hg38",
300
- flank5: int = 500,
301
- flank3: int = 500,
302
- control_run: bool = True,
303
- out_dir: str = "../data_files/processed/tfclust",
304
- allowed_tfs: list = None,
305
- chroms: list = None,
306
- max_cpu_frac: float = None,
307
- ):
 
308
  """
309
- Call extract_tfbs_with_context() using multithreading, one thread per chromosome.
310
  """
311
- # Get all chromosomes if not supplied
312
- if chroms is None:
313
- chroms = get_all_chroms(genome=genome)
314
-
315
- # Determine max workers
316
- max_workers = len(chroms)
317
- max_available = int(multiprocessing.cpu_count())
318
- if max_cpu_frac is not None:
319
- max_available = int(multiprocessing.cpu_count() * max_cpu_frac)
320
- max_workers = min(max_workers, max_available)
321
- logging.info(
322
- f"{max_available} CPU cores available. Using {max_workers} threads for genome {genome}..."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
323
  )
324
 
325
- # Launch threads
326
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
327
- futures = {executor.submit(process_chrom, chrom): chrom for chrom in chroms}
328
- for future in as_completed(futures):
329
- chrom = futures[future]
330
- try:
331
- future.result()
332
- except Exception as e:
333
- logging.error(f"Chromosome {chrom} raised an exception: {e}")
334
 
335
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
336
  def main():
337
  genomes = ["hg38", "hg19"]
338
- frac_per_genome = round(1 / len(genomes), 1)
 
 
 
339
  for genome in genomes:
340
- all_chroms = get_all_chroms(genome=genome)
341
- parallel_extract_tfbs_with_context(
342
- genome=genome,
343
- flank5=500,
344
- flank3=500,
345
- control_run=True, # if there's a flank, whether to also run without flank
346
- out_dir=f"../data_files/processed/tfclust/{genome}",
347
- allowed_tfs=None, # e.g., ['CTCF', 'MAX']
348
- chroms=None,
349
- max_cpu_frac=frac_per_genome,
350
- )
351
-
352
 
353
  if __name__ == "__main__":
 
354
  logger = logging.getLogger(__name__)
355
- logging.basicConfig(
356
- filename="download.log",
357
- encoding="utf-8",
358
- level=logging.DEBUG,
359
- filemode="w",
360
- )
 
1
+ import os
2
+ import logging
3
  import requests
4
+ import pandas as pd
5
  import json
 
6
  import multiprocessing
7
+ from math import ceil
8
+ from datetime import datetime
 
 
9
 
10
  def get_all_tfs(genome: str = "hg38"):
 
 
 
 
 
11
  raw_data = pd.read_csv(
12
+ f"../../data_files/encode3TfbsClusteredWithCells.bed", sep="\t", header=None
13
  )
14
  raw_data.columns = ["chrom", "start", "end", "tf_name", "score", "cell_line"]
15
+ all_tfs = raw_data["tf_name"].unique().tolist()
 
 
16
  logging.info(f"Found {len(all_tfs)} transcription factors in genome {genome}.")
 
17
  return all_tfs
18
 
19
+ def get_all_chroms(genome: str = "hg38", exclude: list=None, include: list=None, logger: logging.Logger=None):
 
20
  """
21
  Fetch all chromosome names for a genome.
22
  Note: some chromosomes are in unexpected formats (e.g. there is 'chr15', but also 'chr15_ML143371v1_fix')
23
  """
24
+ if logger is None:
25
+ logger = logging.getLogger(__name__)
26
+
27
  url = f"https://api.genome.ucsc.edu/list/chromosomes?genome={genome}"
28
  try:
29
  r = requests.get(url)
30
  r.raise_for_status()
31
  except:
32
+ logger.error(f"Failed to fetch all chromosomes for genome {genome}")
33
+
34
+ if include is not None and exclude is not None:
35
+ raise ValueError(f"Must pass EITHER exclude or include. Cannot pass both.")
36
+
37
+ all_chroms = list(r.json()["chromosomes"].keys())
38
+ if include is not None:
39
+ logger.info(f"Including only the following chromosomes: {include}")
40
+ all_chroms = [chrom for chrom in all_chroms if chrom in include]
41
+ if exclude is not None:
42
+ logger.info(f"Excluding the following chromosomes: {exclude}")
43
+ all_chroms = [chrom for chrom in all_chroms if not(chrom in exclude)]
44
+
45
+ logger.info(f"Found {len(all_chroms)} chromosomes in genome {genome}.")
46
 
47
  return all_chroms
48
 
49
+ def get_all_chrom_fasta_files(genome: str = "hg38", exclude: list=None, include: list=None, logger: logging.Logger=None, out_dir="../../data_files/raw/genomes"):
50
+ """
51
+ Get FASTA files for each chromosome for a current genome
52
+ """
53
+ if logger is None:
54
+ logger = logging.getLogger(__name__)
55
+
56
+ if include is not None and exclude is not None:
57
+ raise ValueError(f"Must pass EITHER exclude or include. Cannot pass both.")
58
+
59
+ chroms = get_all_chroms(genome=genome, exclude=exclude, include=include, logger=logger)
60
+
61
+ genome_out_dir = os.path.join(out_dir,genome)
62
+ os.makedirs(genome_out_dir, exist_ok=True)
63
+
64
+ for chrom in chroms:
65
+ chrom_save_path = os.path.join(genome_out_dir,f"{genome}_{chrom}.json")
66
+ if not(os.path.exists(chrom_save_path)):
67
+ url = f"https://api.genome.ucsc.edu/getData/sequence?genome={genome};chrom={chrom}"
68
+ try:
69
+ r = requests.get(url)
70
+ r.raise_for_status()
71
+ json_output = r.json()
72
+
73
+ with open(chrom_save_path, "w") as f:
74
+ json.dump(json_output, f, indent=4)
75
+
76
+ logger.info(f"Downloaded {chrom} in genome {genome}.")
77
+
78
+ except:
79
+ logger.error(f"Failed to fetch all {chrom} for genome {genome}")
80
+ else:
81
+ logger.info(f"Already downloaded {chrom} in genome {genome}. Skipping.")
82
+
83
+ logger.info(f"Downloaded {len(chroms)} chromosomes in genome {genome}.")
84
+
85
+ return chroms
86
+
87
+ def fetch_tfbs_track(chrom: str, genome: str = "hg38", logger:logging.Logger=None):
88
  """
89
  Fetch raw data from the track encRegTfbsClustered.
90
  Returns json data for the specified chromosome, where key information appears as follows:
 
103
  ]
104
 
105
  """
106
+ if logger is None:
107
+ logger = logging.getLogger(__name__)
108
+
109
  params = {"genome": genome, "track": "encRegTfbsClustered", "chrom": chrom}
110
+ json_out_dir = os.path.join("../../data_files/raw/tfclust/encRegTfbsClustered_data", genome)
111
+ json_out_path = os.path.join(json_out_dir, f"{params['genome']}_{params['track']}_{params['chrom']}.json")
112
+ if not(os.path.exists(json_out_path)):
113
+ url = f"https://api.genome.ucsc.edu/getData/track?genome={params['genome']};track={params['track']};chrom={params['chrom']}"
114
+ try:
115
+ r = requests.get(url)
116
+ r.raise_for_status()
117
+
118
+ # Extract the output and save it
119
+ os.makedirs(json_out_dir, exist_ok=True)
120
+
121
+ # Save it
122
+ json_output = r.json()
123
+ with open(json_out_path, "w") as f:
124
+ json.dump(json_output, f, indent=4)
125
+
126
+ logger.info(
127
+ f"Saved to {json_out_path}"
128
+ )
129
+ except:
130
+ logger.error(
131
+ f"Failed to fetch encRegTfbsClustered for {chrom} in genome {genome}"
132
+ )
133
+ else:
134
+ logging.info(f"Already downloaded encRegTfbsClustered for {chrom} in {genome}. Skipping download.")
135
+ with open(json_out_path, "r") as f:
136
+ json_output = json.load(f)
137
 
 
 
 
138
  return json_output
139
 
 
140
  def get_sequence(
141
+ chrom_json: dict,
142
  start: int,
143
  end: int,
144
  flank5: int = 0,
145
  flank3: int = 0,
146
  genome: str = "hg38",
147
+ logger: logging.Logger=None
148
  ):
149
  """
150
+ Given genome, start position, end position, chromosome json, and desired flank size, extract the raw DNA sequence
151
+
152
+ chrom_json has keys: "downloadTime", "downloadTimeStamp","genome", "chrom", "start", "end", "dna"
 
 
 
 
 
 
 
 
153
 
154
+ """
155
+ if logger is None:
156
+ logger = logging.getLogger(__name__)
157
+
158
+ chrom_seq = chrom_json["dna"]
159
+ chrom = chrom_json["chrom"]
160
+ if chrom_json["start"] != 0:
161
+ logger.warning(f"Start position of chromosome is not 0. Start position: {chrom_json['start']}")
162
+
163
+ # Calculate new start and end indices
164
+ new_start = max(0, start - flank5)
165
+ new_end = end + flank3
166
+ if new_end > chrom_json["end"]:
167
+ logger.warning(f"Attempting to query {chrom} from {new_start} to {new_end}, but last index is {chrom_json['end']}. Manually setting last index to {chrom_json['end']}")
168
+ new_end = chrom_json['end']
169
+
170
  results_dict = {
171
  "chromStart": new_start,
172
  "chromEnd": new_end,
173
+ "seq": chrom_seq[new_start:new_end+1]
174
  }
175
  return results_dict
176
 
 
177
  def extract_tfbs_with_context(
178
+ genome: str,
179
+ flank5: int=500,
180
+ flank3: int=500,
181
+ allowed_tfs: list=None,
182
+ out_dir: str="../../data_files/processed/tfclust",
183
  control_run: bool = True, # if there's a flank, whether to also run without flank
 
 
184
  chroms: list = None,
185
+ logger: logging.Logger=None,
186
+ redo: bool = False, # whether to redo even if we've already processed this
187
+ idx: int=0 # index of worker
188
  ):
189
  """
190
+ Main method for a genome. By calling helpers, gets all chromosomes and their sequences, gets encRegTfbsClustered, and queries the feature indices in encRegTfbsClustered against chrom seqs for binding site sequences.
 
191
  """
192
+ if logger is None:
193
+ logger = logging.getLogger(__name__)
194
+
195
+ # Get all chromosomes for the current genome, including downloading thier sequences
196
  if chroms is None:
197
+ all_chroms = get_all_chrom_fasta_files(genome=genome, logger=logger)
198
+ else:
199
+ all_chroms = get_all_chrom_fasta_files(
200
+ genome=genome,
201
+ exclude=[c for c in get_all_chroms(genome) if c not in chroms],
202
+ logger=logger
203
  )
204
+
205
+ # For each chrom, (1) download full fasta sequence, (2) download encRegTfbsClustered, (3) query features from [2] through [1]
 
206
  # Initialize the final DF
207
  results_cols = [
208
  "bin",
 
222
  "flank3",
223
  ]
224
  results_init = pd.DataFrame(columns=results_cols)
225
+
226
  # Make a list of the types of runs we need
227
  queries = [{"flank5": flank5, "flank3": flank3}]
228
  if not ((flank5 == 0) and (flank3 == 0) and control_run):
 
230
  queries[0]["type"] = "flank"
231
  elif (flank5 == 0) and (flank3 == 0):
232
  queries[0]["type"] = "control"
233
+
234
+ merged_done_txt_path = os.path.join("../../data_files/processed/tfclust", genome, "logs", f"completed.txt")
235
+ done_txt_path = os.path.join("../../data_files/processed/tfclust", genome, "logs", f"completed_worker_{idx}.txt")
236
+ if os.path.exists(merged_done_txt_path):
237
+ completed_chroms = pd.read_csv(merged_done_txt_path, sep="\t")
238
+ completed_chroms = list(completed_chroms["chrom"])
239
+ else:
240
+ completed_chroms = []
241
+
242
+ with open(done_txt_path, "w") as f:
243
+ f.write("chrom\trow_count\n")
244
+
245
+ logger.info(f"{len(completed_chroms)} already complete: {','.join(completed_chroms)}")
246
+
247
+ count = 0
248
+ # Iterate through chromosomes (1) download encRegTfbsClustered, (2) query each feature in the chrom sequence
249
+ for chrom in all_chroms:
250
+ chrom_write_count = 0
251
+ chrom_output_fname = os.path.join("../../data_files/processed/tfclust", genome, f"encRegTfbsClustered_{genome}_{chrom}.csv")
252
+
253
+ # If we've already done it, no need
254
+ if chrom in completed_chroms and not(redo):
255
+ chrom_write_count = len(pd.read_csv(chrom_output_fname))
256
+ with open(done_txt_path, "a") as f:
257
+ f.write(f"{chrom}\t{chrom_write_count}\n")
258
+ continue
259
+
260
+ #### If we ARE processing this, process it!
261
+ # Load chromosome sequence
262
+ with open(os.path.join("../../data_files/raw/genomes",genome,f"{genome}_{chrom}.json"), "r") as f:
263
+ chrom_json = json.load(f)
264
+
265
  results_init.to_csv(
266
+ chrom_output_fname, index=False
267
  )
268
+ logger.info(f"Fetching {chrom}...")
269
+
270
  # Fetch the data json (has start and end positions in the chrom, but not the sequence)
271
  try:
272
+ data = fetch_tfbs_track(chrom, genome=genome, logger=logger)
273
+ logger.info(f" → Fetched {chrom} successfully")
274
  features = data.get("encRegTfbsClustered", {})
275
+ logger.info(f" → Found {len(features)} features")
276
  except Exception as e:
277
+ logger.info(f" Failed to fetch {chrom}: {e}")
278
  continue
279
 
280
  # Get the sequences of the DNA binding sites
 
285
  # Check if tf is valid
286
  tf_name = feature.get("name", "UnknownTF")
287
  if allowed_tfs and tf_name not in allowed_tfs:
288
+ logger.warning(f"TF name {tf_name} not in allowed_tfs. Skipping.")
289
  continue
 
 
290
  # Make sure the chromosomes match and we have the right sequence!
291
  assert (
292
  feature["chrom"] == chrom
 
302
  end = feature["chromEnd"]
303
 
304
  for query in queries:
305
+ results_dict = get_sequence(
306
+ chrom_json,
307
+ start,
308
+ end,
309
+ flank5=query["flank5"],
310
+ flank3=query["flank3"],
311
+ genome=genome,
312
+ logger = logger
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
313
  )
314
+ # Add the returned info
315
+ if query["type"] == "control":
316
+ new_row["seq"] = results_dict["seq"] # note: these sequences will have soft-masked repeats!
317
+ elif query["type"] == "flank":
318
+ new_row["seq_flanked"] = results_dict["seq"]
319
+ new_row["chromStart_flanked"] = results_dict["chromStart"]
320
+ new_row["chromEnd_flanked"] = results_dict["chromEnd"]
321
+ new_row["flank5"] = flank5
322
+ new_row["flank3"] = flank3
323
 
324
+ # Fill out any blank columns
325
+ try:
326
+ for c in results_cols:
327
+ if c not in new_row:
328
+ new_row[c] = None
329
+
330
+ new_row_df = pd.DataFrame(data=new_row, index=[0])
331
+ new_row_df = new_row_df[results_cols] # assert the right column order
332
+ if new_row_df["seq"] is not None:
333
+ new_row_df.to_csv(
334
+ chrom_output_fname,
335
+ mode="a",
336
+ index=False,
337
+ header=False,
338
+ )
339
+ logger.info(
340
+ f"Wrote new row to {out_dir}/encRegTfbsClustered_{chrom}.csv"
341
+ )
342
+ chrom_write_count += 1
343
+ else:
344
+ logger.info(f"Did not write new row. {new_row}")
345
+ except Exception as e:
346
+ logger.error(F"Failed to write new row to {out_dir}/encRegTfbsClustered_{chrom}.csv: error {e}")
347
 
348
+ logger.info(f"Done. Wrote {chrom_write_count} sequences to {out_dir}/encRegTfbsClustered_{chrom}.csv")
349
+ with open(done_txt_path, "a") as f:
350
+ f.write(f"{chrom}\t{chrom_write_count}\n")
351
+ count += chrom_write_count
352
+
353
+ logger.info(f"Done with all chroms. Wrote {count} sequences to {out_dir}.")
354
 
355
+ def merge_completed_files(genome: str):
 
 
 
 
 
 
 
 
 
 
356
  """
357
+ Merge all completed_worker_*.txt files into a single completed.txt file
358
  """
359
+ logs_dir = os.path.join("../../data_files/processed/tfclust", genome, "logs")
360
+ merged_path = os.path.join(logs_dir, "completed.txt")
 
 
 
 
 
 
 
 
 
 
 
 
361
 
362
+ with open(merged_path, "w") as outfile:
363
+ outfile.write("chrom\trow_count\n") # header
364
 
365
+ for fname in os.listdir(logs_dir):
366
+ if fname.startswith("completed_worker_") and fname.endswith(".txt"):
367
+ with open(os.path.join(logs_dir, fname), "r") as infile:
368
+ for line in infile:
369
+ if line.startswith("chrom"): # skip header lines
370
+ continue
371
+ outfile.write(line)
372
+
373
+ print(f"Merged completed_worker_*.txt into {merged_path}")
374
+
375
+ def worker(args):
376
  """
377
+ Worker function for parallel processing
378
  """
379
+ # Extract args
380
+ chrom_group, idx, genome, flank5, flank3, logs_dir = args
381
+ os.makedirs(logs_dir, exist_ok=True)
382
+
383
+ # Define logger
384
+ logger = logging.getLogger(f"worker_{idx}")
385
+ logger.setLevel(logging.DEBUG)
386
+ logger.propagate = False
387
+
388
+ log_file = os.path.join(logs_dir, f"worker_{idx}.log")
389
+ fh = logging.FileHandler(log_file, mode="w", encoding="utf-8")
390
+ fh.setLevel(logging.DEBUG)
391
+ formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
392
+ fh.setFormatter(formatter)
393
+ logger.addHandler(fh)
394
+
395
+ logger.info(f"Starting worker {idx} for chromosomes: {chrom_group}")
396
+
397
+ extract_tfbs_with_context(
398
+ genome=genome,
399
+ flank5=flank5,
400
+ flank3=flank3,
401
+ allowed_tfs=None,
402
+ out_dir=f"../../data_files/processed/tfclust",
403
+ control_run=True,
404
+ chroms=chrom_group,
405
+ logger=logger,
406
+ idx=idx
407
  )
408
 
409
+ logger.info(f"Finished worker {idx}")
 
 
 
 
 
 
 
 
410
 
411
 
412
+ def parallel_extract(genome: str, flank5: int, flank3: int):
413
+ """
414
+ Run extract_tfbs_with_context in parallel for groups of chromosomes in the genome to speed up processing.
415
+ """
416
+ chroms = get_all_chroms(genome)
417
+ num_cores = multiprocessing.cpu_count()
418
+
419
+ # Separate primary vs accessory chromosomes
420
+ primary_chroms = [c for c in chroms if "_" not in c]
421
+ accessory_chroms = [c for c in chroms if "_" in c]
422
+
423
+ # Distribute primary chromosomes round-robin across workers
424
+ chunks = [[] for _ in range(num_cores)]
425
+ for i, chrom in enumerate(primary_chroms):
426
+ chunks[i % num_cores].append(chrom)
427
+
428
+ # Now add accessory chromosomes to the least-loaded chunk (by count)
429
+ for chrom in accessory_chroms:
430
+ min_idx = min(range(num_cores), key=lambda i: len(chunks[i]))
431
+ chunks[min_idx].append(chrom)
432
+
433
+ # Log how we split it up - want to see which chromosomes are in which chunks.
434
+ logging.info(f"{num_cores} CPU cores available. Primary chromosomes distributed round-robin.")
435
+ for chunk_no, chunk in enumerate(chunks):
436
+ logging.info(f"Chunk {chunk_no}. Chromosomes = {','.join(chunk)}")
437
+
438
+ logs_dir = os.path.join("../../data_files/processed/tfclust", genome, "logs")
439
+ os.makedirs(logs_dir, exist_ok=True)
440
+
441
+ args_list = [(chunk, i, genome, flank5, flank3, logs_dir) for i, chunk in enumerate(chunks)]
442
+
443
+ with multiprocessing.Pool(processes=num_cores) as pool:
444
+ pool.map(worker, args_list)
445
+
446
+ merge_completed_files(genome)
447
+
448
  def main():
449
  genomes = ["hg38", "hg19"]
450
+ flank5 = 1000
451
+ flank3 = 1000
452
+
453
+ # Iterate through genomes
454
  for genome in genomes:
455
+ # Extract TF binding sites from bed - 500 flank
456
+ parallel_extract(genome, flank5, flank3)
 
 
 
 
 
 
 
 
 
 
457
 
458
  if __name__ == "__main__":
459
+ logging.basicConfig(filename="download.log", encoding="utf-8", level=logging.DEBUG, filemode="w")
460
  logger = logging.getLogger(__name__)
461
+
462
+ main()