| import pandas as pd |
| import random |
| import matplotlib.pyplot as plt |
| import glob |
| import re |
| from pathlib import Path |
|
|
|
|
| def trim_sequence(seq: str, seq_flanked: str, total_len: int): |
| """ |
| Return a substring of seq_flanked of length total_len that contains seq |
| at a random valid position. Also returns (upstream, downstream). |
| """ |
| i = seq_flanked.find(seq) |
| if i < 0: |
| raise ValueError(f"Motif '{seq}' not found in flanked sequence.") |
| motif_len = len(seq) |
| extra = total_len - motif_len |
| left_avail = i |
| right_avail = len(seq_flanked) - (i + motif_len) |
| if extra > left_avail + right_avail: |
| raise ValueError("Not enough flank to reach desired length.") |
| |
| min_left = max(0, extra - right_avail) |
| max_left = min(extra, left_avail) |
| upstream = random.randint(min_left, max_left) |
| downstream = extra - upstream |
| start = i - upstream |
| end = i + motif_len + downstream |
| return seq_flanked[start:end], upstream, downstream |
|
|
|
|
| def process_and_plot(input_csv: str, total_len: int, output_csv: Path, fig_dir: Path): |
| df = pd.read_csv(input_csv) |
| ups, downs, abs_pos, rel_pos = [], [], [], [] |
| trimmed_seqs = [] |
| for _, row in df.iterrows(): |
| trimmed, u, d = trim_sequence(row["seq"], row["seq_flanked"], total_len) |
| trimmed_seqs.append(trimmed) |
| ups.append(u) |
| downs.append(d) |
| abs_pos.append(u) |
| rel_pos.append(u / (total_len - len(row["seq"]))) |
| df_out = df.copy() |
| df_out["seq_trimmed"] = trimmed_seqs |
| df_out["motif_abs_start"] = abs_pos |
| df_out["motif_rel_pos"] = rel_pos |
| df_out.to_csv(output_csv, index=False) |
|
|
| basename = input_csv.stem |
| |
| plt.figure(figsize=(6, 4)) |
| plt.hist(df_out["motif_abs_start"], bins=50, edgecolor="k") |
| plt.title(f"{basename}: Absolute Motif Start") |
| plt.xlabel("Start Index (nt)") |
| plt.ylabel("Count") |
| plt.tight_layout() |
| plt.savefig(fig_dir / f"{basename}_abs.png") |
| plt.close() |
| |
| plt.figure(figsize=(6, 4)) |
| plt.hist(df_out["motif_rel_pos"], bins=50, edgecolor="k") |
| plt.title(f"{basename}: Relative Motif Position") |
| plt.xlabel("Relative Position") |
| plt.ylabel("Count") |
| plt.tight_layout() |
| plt.savefig(fig_dir / f"{basename}_rel.png") |
| plt.close() |
|
|
|
|
| if __name__ == "__main__": |
| |
| PATTERN = "/home/a03-svincoff/DPACMAN/dpacman/data_files/processed/tfclust/hg38/encRegTfbsClustered_hg38_chr*.csv" |
| CHR_FILTER = re.compile( |
| r"encRegTfbsClustered_hg38_chr([1-9]|1[0-9]|2[0-2]|X|Y)\.csv$" |
| ) |
| DESIRED_LEN = 1000 |
| OUTPUT_DIR = Path("trimmed_csvs") |
| FIG_DIR = Path("figures") |
| |
|
|
| OUTPUT_DIR.mkdir(exist_ok=True) |
| FIG_DIR.mkdir(exist_ok=True) |
| |
| for f in FIG_DIR.iterdir(): |
| if f.is_file(): |
| f.unlink() |
|
|
| |
| all_files = glob.glob(PATTERN) |
| files = [Path(f) for f in all_files if CHR_FILTER.match(Path(f).name)] |
| if not files: |
| print(f"No matching chr1-22, X, Y files found (pattern={PATTERN}).") |
| exit(1) |
|
|
| for infile in sorted(files): |
| out_csv = OUTPUT_DIR / f"{infile.stem}_trimmed.csv" |
| try: |
| process_and_plot(infile, DESIRED_LEN, out_csv, FIG_DIR) |
| print(f"Processed {infile.name} -> {out_csv.name}; figures in {FIG_DIR}/") |
| except Exception as e: |
| print(f"Error processing {infile.name}: {e}") |
|
|