svincoff commited on
Commit
fc82077
·
1 Parent(s): ad760ef

api_download

Browse files
Files changed (2) hide show
  1. .gitignore +2 -1
  2. dpacman/data/tfclust/api_download.py +448 -0
.gitignore CHANGED
@@ -1 +1,2 @@
1
- dpacman/data_files
 
 
1
+ dpacman/data_files
2
+ dpacman/data/tfclust/*.log
dpacman/data/tfclust/api_download.py ADDED
@@ -0,0 +1,448 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ from time import sleep
3
+ import json
4
+ import logging
5
+ import multiprocessing
6
+ from concurrent.futures import ThreadPoolExecutor, as_completed
7
+ import os
8
+ import pandas as pd
9
+
10
+ def get_all_tfs(genome: str = "hg38"):
11
+ """
12
+ Get all the transcription factors from the appropriate encRegTfbsClusteredWithCells.genome.bed file.
13
+ Available in data_files/raw/tfclust for genomes hg38 and hg19
14
+ """
15
+ # Read raw file
16
+ raw_data = pd.read_csv(
17
+ "../../data_files/encode3TfbsClusteredWithCells.bed", sep="\t", header=None
18
+ )
19
+ raw_data.columns = ["chrom", "start", "end", "tf_name", "score", "cell_line"]
20
+
21
+ # Extract all unique TF names
22
+ all_tfs = encode_raw["tf_name"].unique().tolist()
23
+ logging.info(f"Found {len(all_tfs)} transcription factors in genome {genome}.")
24
+
25
+ return all_tfs
26
+
27
+
28
+ def get_all_chroms(genome: str = "hg38", exclude: list=None, include: list=None, logger: logging.Logger=None):
29
+ """
30
+ Fetch all chromosome names for a genome.
31
+ Note: some chromosomes are in unexpected formats (e.g. there is 'chr15', but also 'chr15_ML143371v1_fix')
32
+ """
33
+ if logger is None:
34
+ logger = logging.getLogger(__name__)
35
+
36
+ url = f"https://api.genome.ucsc.edu/list/chromosomes?genome={genome}"
37
+ try:
38
+ r = requests.get(url)
39
+ r.raise_for_status()
40
+ except:
41
+ raise ValueError(f"Failed to fetch all chromosomes for genome {genome}")
42
+
43
+ if include is not None and exclude is not None:
44
+ raise ValueError(f"Must pass EITHER exclude or include. Cannot pass both.")
45
+
46
+ all_chroms = [chrom for chrom in r.json()["chromosomes"]]
47
+ if include:
48
+ logger.info(f"Including only the following chromosomes: {include}")
49
+ all_chroms = [chrom for chrom in all_chroms if chrom in include]
50
+ if exclude:
51
+ logger.info(f"Excluding the following chromosomes: {exclude}")
52
+ all_chroms = [chrom for chrom in all_chroms if not(chrom in exclude)]
53
+
54
+ logger.info(f"Found {len(all_chroms)} chromosomes in genome {genome}.")
55
+
56
+ return all_chroms
57
+
58
+ def fetch_tfbs_track(chrom: str, genome: str = "hg38", logger:logging.Logger=None):
59
+ """
60
+ Fetch raw data from the track encRegTfbsClustered.
61
+ Returns json data for the specified chromosome, where key information appears as follows:
62
+ "encRegTfbsClustered": [
63
+ {
64
+ "bin": 585,
65
+ "chrom": "chr1",
66
+ "chromStart": 9917,
67
+ "chromEnd": 10247,
68
+ "name": "NUFIP1",
69
+ "score": 680,
70
+ "sourceCount": 1,
71
+ "sourceIds": "1063",
72
+ "sourceScores": "680"
73
+ },...
74
+ ]
75
+
76
+ """
77
+ if logger is None:
78
+ logger = logging.getLogger(__name__)
79
+
80
+ params = {"genome": genome, "track": "encRegTfbsClustered", "chrom": chrom}
81
+ url = f"https://api.genome.ucsc.edu/getData/track?genome={params['genome']};track={params['track']};chrom={params['chrom']}"
82
+ try:
83
+ r = requests.get(url)
84
+ r.raise_for_status()
85
+ except:
86
+ raise ValueError(
87
+ f"Failed to fetch encRegTfbsClustered for {chrom} in genome {genome}"
88
+ )
89
+
90
+ # Extract the output and save it
91
+ json_out_dir = f"../../data_files/raw/tfclust/encRegTfbsClustered_data/{genome}"
92
+ os.makedirs(json_out_dir, exist_ok=True)
93
+
94
+ # Save it
95
+ json_output = r.json()
96
+ with open(
97
+ f"{json_out_dir}/{params['genome']}_{params['track']}_{params['chrom']}.json",
98
+ "w",
99
+ ) as f:
100
+ json.dump(json_output, f, indent=4)
101
+
102
+ logger.info(
103
+ f"Saved to {json_out_dir}/{params['genome']}_{params['track']}_{params['chrom']}.json"
104
+ )
105
+ return json_output
106
+
107
+
108
+ def get_sequence(
109
+ chrom: str,
110
+ start: int,
111
+ end: int,
112
+ flank5: int = 0,
113
+ flank3: int = 0,
114
+ genome: str = "hg38",
115
+ logger: logging.Logger=None
116
+ ):
117
+ """
118
+ Given genome, start position, end position, chromosome, and desired flank size, extract the raw DNA sequence
119
+ """
120
+ if logger is None:
121
+ logger = logging.getLogger(__name__)
122
+
123
+ new_start = max(0, start - flank5)
124
+ new_end = end + flank3
125
+ region = f"{chrom}:{new_start}-{new_end}"
126
+ url = f"https://api.genome.ucsc.edu/getData/sequence?genome={genome};chrom={chrom};start={new_start};end={new_end}"
127
+
128
+ try:
129
+ r = requests.get(url)
130
+ r.raise_for_status()
131
+ except:
132
+ raise ValueError(f"Failed to fetch sequence for {region} in genome {genome}")
133
+
134
+ results_dict = {
135
+ "chromStart": new_start,
136
+ "chromEnd": new_end,
137
+ "seq": r.json()["dna"],
138
+ }
139
+ return results_dict
140
+
141
+
142
+ def extract_tfbs_with_context(
143
+ genome: str = "hg38",
144
+ flank5: int = 500,
145
+ flank3: int = 500,
146
+ control_run: bool = True, # if there's a flank, whether to also run without flank
147
+ out_dir: str = "../../data_files/processed/tfclust",
148
+ allowed_tfs: list = None, # e.g., ['CTCF', 'MAX']
149
+ chroms: list = None,
150
+ logger: logging.Logger = None
151
+ ):
152
+ """
153
+ Loop through raw downloads and extract TF binding sites (bs) with flanks
154
+ Builds a DataFrame with all the available data for each TF. Columns = ["bin", "chrom", "chromStart", "chromEnd", "name", "score", "scoreCount", "sourceIds", "sourceScores", "seq", "seq_flanked", "chromStart_flanked", "chromEnd_flanked"]
155
+ """
156
+ # Prepare logger
157
+ if logger is None:
158
+ logger = logging.getLogger(__name__)
159
+ # Prepare to save output
160
+ os.makedirs(out_dir, exist_ok=True)
161
+
162
+ # Get chromosomes
163
+ if chroms is None:
164
+ logger.info(
165
+ "No chromosomes provided, fetching all chromosomes for the given genome..."
166
+ )
167
+ chroms = get_all_chroms(genome, logger = logger)
168
+ count = 0
169
+ # Initialize the final DF
170
+ results_cols = [
171
+ "bin",
172
+ "chrom",
173
+ "chromStart",
174
+ "chromEnd",
175
+ "name",
176
+ "score",
177
+ "scoreCount",
178
+ "sourceIds",
179
+ "sourceScores",
180
+ "seq",
181
+ "seq_flanked",
182
+ "chromStart_flanked",
183
+ "chromEnd_flanked",
184
+ "flank5",
185
+ "flank3",
186
+ ]
187
+ results_init = pd.DataFrame(columns=results_cols)
188
+
189
+ # Make a list of the types of runs we need
190
+ queries = [{"flank5": flank5, "flank3": flank3}]
191
+ if not ((flank5 == 0) and (flank3 == 0) and control_run):
192
+ queries.append({"type": "control", "flank5": 0, "flank3": 0})
193
+ queries[0]["type"] = "flank"
194
+ elif (flank5 == 0) and (flank3 == 0):
195
+ queries[0]["type"] = "control"
196
+
197
+ # For each chromosome, download the encRegTfbsClustered track, extract the features, and fetch the sequences
198
+ # Loop through chroms
199
+ for chrom in chroms:
200
+ chrom_write_count = 0
201
+ chrom_output_fname = f"{out_dir}/encRegTfbsClustered_{genome}_{chrom}.csv"
202
+ results_init.to_csv(
203
+ chrom_output_fname, index=False
204
+ )
205
+ logger.info(f"Fetching {chrom}...")
206
+ # Fetch the data json (has start and end positions in the chrom, but not the sequence)
207
+ try:
208
+ data = fetch_tfbs_track(chrom, genome=genome, logger=logger)
209
+ logger.info(f" → Fetched {chrom} successfully")
210
+ features = data.get("encRegTfbsClustered", {})
211
+ logger.info(f" → Found {len(features)} features")
212
+ except Exception as e:
213
+ logger.info(f" Failed to fetch {chrom}: {e}")
214
+ continue
215
+
216
+ # Get the sequences of the DNA binding sites
217
+ for feature_no, feature in enumerate(features):
218
+ # Initialize new results row
219
+ new_row = {}
220
+
221
+ # Check if tf is valid
222
+ tf_name = feature.get("name", "UnknownTF")
223
+ if allowed_tfs and tf_name not in allowed_tfs:
224
+ logger.warning(f"TF name {tf_name} not in allowed_tfs. Skipping.")
225
+ continue
226
+ # Make sure the chromosomes match and we have the right sequence!
227
+ assert (
228
+ feature["chrom"] == chrom
229
+ ), f"Chromosome mismatch: {feature['chrom']} != {chrom}"
230
+
231
+ # Add all the cols already in the json, add
232
+ for c in results_cols:
233
+ if c in feature:
234
+ new_row[c] = feature[c]
235
+
236
+ ### Extract sequence
237
+ start = feature["chromStart"]
238
+ end = feature["chromEnd"]
239
+
240
+ for query in queries:
241
+ try:
242
+ results_dict = get_sequence(
243
+ chrom,
244
+ start,
245
+ end,
246
+ flank5=query["flank5"],
247
+ flank3=query["flank3"],
248
+ genome=genome,
249
+ logger = logger
250
+ )
251
+ # Add the returned info
252
+ if query["type"] == "control":
253
+ new_row["seq"] = results_dict["seq"]
254
+ elif query["type"] == "flank":
255
+ new_row["seq_flanked"] = results_dict["seq"]
256
+ new_row["chromStart_flanked"] = results_dict["chromStart"]
257
+ new_row["chromEnd_flanked"] = results_dict["chromEnd"]
258
+ new_row["flank5"] = flank5
259
+ new_row["flank3"] = flank3
260
+ logger.info(
261
+ f" Success on feat. {feature_no} {chrom}:{start}-{end}, type {query['type']}"
262
+ )
263
+ except Exception as e:
264
+ logger.info(
265
+ f" Skipped feat. {feature_no} {chrom}:{start}-{end} due to error: {e}"
266
+ )
267
+ continue
268
+
269
+ sleep(0.05) # Stay within UCSC's 20 req/sec rate limit
270
+
271
+ # Fill out any blank columns
272
+ try:
273
+ for c in results_cols:
274
+ if c not in new_row:
275
+ new_row[c] = None
276
+
277
+ new_row_df = pd.DataFrame(data=new_row, index=[0])
278
+ if new_row_df["seq"] is not None:
279
+ new_row_df.to_csv(
280
+ chrom_output_fname,
281
+ mode="a",
282
+ index=False,
283
+ header=False,
284
+ )
285
+ logger.info(
286
+ f"Wrote new row to {out_dir}/encRegTfbsClustered_{chrom}.csv"
287
+ )
288
+ chrom_write_count += 1
289
+ else:
290
+ logger.info(f"Did not write new row. {new_row}")
291
+ except Exception as e:
292
+ logger.error(F"Failed to write new row to {out_dir}/encRegTfbsClustered_{chrom}.csv: error {e}")
293
+
294
+ logger.info(f"Done. Wrote {chrom_write_count} sequences to {out_dir}/encRegTfbsClustered_{chrom}.csv")
295
+ count += chrom_write_count
296
+
297
+ logger.info(f"Done with all chroms. Wrote {count} sequences to {out_dir}.")
298
+
299
+ def setup_chrom_logger(chrom: str, genome: str, out_dir: str) -> logging.Logger:
300
+ """Set up a dedicated logger for a given chromosome."""
301
+ logger = logging.getLogger(f"{genome}_{chrom}")
302
+ logger.setLevel(logging.DEBUG)
303
+ logger.propagate = False
304
+
305
+ # Avoid duplicate handlers if reused
306
+ if not logger.handlers:
307
+ os.makedirs(out_dir, exist_ok=True)
308
+ log_path = os.path.join(out_dir, f"log_{genome}_{chrom}.log")
309
+ handler = logging.FileHandler(log_path, mode='w', encoding='utf-8')
310
+ formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
311
+ handler.setFormatter(formatter)
312
+ logger.addHandler(handler)
313
+
314
+ return logger
315
+
316
+ # Thread function for one chromosome
317
+ def process_chrom(
318
+ chrom: str = "chr1",
319
+ genome: str = "hg38",
320
+ flank5: int = 500,
321
+ flank3: int = 500,
322
+ control_run: bool = True,
323
+ out_dir: str = "../../data_files/processed/tfclust",
324
+ allowed_tfs: list = None,
325
+ ):
326
+ """
327
+ Called within parallel method to strat a thread
328
+ """
329
+ chrom_logger = setup_chrom_logger(chrom, genome, f"{out_dir}/logs")
330
+ chrom_logger.info(f"Starting thread for {chrom}")
331
+
332
+ logging.info(f"Starting thread for {chrom}")
333
+ try:
334
+ extract_tfbs_with_context(
335
+ genome=genome,
336
+ flank5=flank5,
337
+ flank3=flank3,
338
+ control_run=control_run,
339
+ out_dir=out_dir,
340
+ allowed_tfs=allowed_tfs,
341
+ chroms=[chrom], # important: wrap in list
342
+ logger=chrom_logger
343
+ )
344
+ chrom_logger.info(f"Finished {chrom}")
345
+ except Exception as e:
346
+ chrom_logger.error(f"Error processing {chrom}: {e}")
347
+
348
+
349
+ import multiprocessing
350
+ from concurrent.futures import ThreadPoolExecutor, as_completed
351
+
352
+ def parallel_extract_tfbs_for_genome(
353
+ genome: str,
354
+ flank5: int,
355
+ flank3: int,
356
+ control_run: bool,
357
+ out_dir: str,
358
+ allowed_tfs: list,
359
+ chroms: list,
360
+ max_workers: int,
361
+ ):
362
+ logger = logging.getLogger(f"{genome}")
363
+ logger.info(f"Using {max_workers} threads for {genome}...")
364
+
365
+ if chroms is None:
366
+ chroms = get_all_chroms(genome=genome)
367
+
368
+ futures = {}
369
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
370
+ for chrom in chroms:
371
+ future = executor.submit(
372
+ process_chrom,
373
+ chrom=chrom,
374
+ genome=genome,
375
+ flank5=flank5,
376
+ flank3=flank3,
377
+ control_run=control_run,
378
+ out_dir=f"{out_dir}/{genome}",
379
+ allowed_tfs=allowed_tfs,
380
+ )
381
+ futures[future] = f"{genome}:{chrom}"
382
+
383
+ for future in as_completed(futures):
384
+ label = futures[future]
385
+ try:
386
+ future.result()
387
+ except Exception as e:
388
+ logger.error(f"{label} raised an exception: {e}")
389
+
390
+
391
+ def parallel_extract_tfbs_with_context(
392
+ genomes=["hg38", "hg19"],
393
+ flank5=500,
394
+ flank3=500,
395
+ control_run=True,
396
+ out_dir="../../data_files/processed/tfclust",
397
+ allowed_tfs=None,
398
+ chroms=None,
399
+ ):
400
+ total_cpus = multiprocessing.cpu_count()
401
+ cpu_per_genome = total_cpus // len(genomes)
402
+
403
+ logging.info(f"Total CPUs: {total_cpus}")
404
+ logging.info(f"Launching {len(genomes)} genome pipelines with {cpu_per_genome} threads each")
405
+
406
+ processes = []
407
+ for genome in genomes:
408
+ p = multiprocessing.Process(
409
+ target=parallel_extract_tfbs_for_genome,
410
+ args=(
411
+ genome,
412
+ flank5,
413
+ flank3,
414
+ control_run,
415
+ out_dir,
416
+ allowed_tfs,
417
+ chroms,
418
+ cpu_per_genome
419
+ )
420
+ )
421
+ p.start()
422
+ processes.append(p)
423
+
424
+ for p in processes:
425
+ p.join()
426
+
427
+ def main():
428
+ genomes = ["hg38", "hg19"]
429
+
430
+ parallel_extract_tfbs_with_context(
431
+ genomes=genomes,
432
+ flank5=500,
433
+ flank3=500,
434
+ control_run=True, # if there's a flank, whether to also run without flank
435
+ out_dir=f"../../data_files/processed/tfclust",
436
+ allowed_tfs=None, # e.g., ['CTCF', 'MAX']
437
+ chroms=None,
438
+ )
439
+
440
+ if __name__ == "__main__":
441
+ logger = logging.getLogger(__name__)
442
+ logging.basicConfig(
443
+ filename="download.log",
444
+ encoding="utf-8",
445
+ level=logging.DEBUG,
446
+ filemode="w",
447
+ )
448
+ main()