Spaces:
Running
Running
| import streamlit as st | |
| import pandas as pd | |
| import io | |
| import re | |
| import struct | |
| import numpy as np | |
| import openpyxl | |
| import base64 | |
| import matplotlib.pyplot as plt | |
| import matplotlib.colors as mcolors | |
| from scipy.stats import gaussian_kde | |
| from PIL import Image | |
| # ========================= | |
| # Streamlit App Setup | |
| # ========================= | |
| st.set_page_config(page_title="Bitconverter", layout="wide") | |
| st.title("Bitconverter") | |
| # ========================= | |
| # Encoding Schemes | |
| # ========================= | |
| ENCODING_OPTIONS = ["Voyager 6-bit", "Base64 (6-bit)", "ASCII (7-bit)", "UTF-8 (8-bit)"] | |
| BITS_PER_UNIT = { | |
| "Voyager 6-bit": 6, | |
| "Base64 (6-bit)": 6, | |
| "ASCII (7-bit)": 7, | |
| "UTF-8 (8-bit)": 8, | |
| } | |
| # ========================= | |
| # Voyager ASCII 6-bit Table | |
| # ========================= | |
| voyager_table = { | |
| i: ch for i, ch in enumerate([ | |
| ' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', | |
| 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', | |
| 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2', | |
| '3', '4', '5', '6', '7', '8', '9', '.', ',', '(', | |
| ')','+', '-', '*', '/', '=', '$', '!', ':', '%', | |
| '"', '#', '@', "'", '?', '&' | |
| ]) | |
| } | |
| reverse_voyager_table = {v: k for k, v in voyager_table.items()} | |
| B64_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" | |
| # ========================= | |
| # 4-bit Grayscale Helpers | |
| # ========================= | |
| # 4-bit grayscale, uniform quantization in sRGB/BT.601 luma code space | |
| # (0=black, 15=white). Two pixels per byte, high-nibble first; | |
| # rows top-to-bottom, no row padding. | |
| # ========================= | |
| def quantize_to_4bit(gray8: np.ndarray) -> np.ndarray: | |
| """Quantize 8-bit grayscale (0..255) to 4-bit (0..15) with nearest rounding.""" | |
| v4 = np.round(gray8.astype(np.float32) * (15.0 / 255.0)).astype(np.uint8) | |
| np.clip(v4, 0, 15, out=v4) | |
| return v4 | |
| def gray4_to_gray8(gray4: np.ndarray) -> np.ndarray: | |
| """Expand 4-bit values (0..15) to 8-bit grayscale (0..255) for viewing.""" | |
| return np.round(gray4.astype(np.float32) * (255.0 / 15.0)).astype(np.uint8) | |
| def pack_4bpp_rows(gray4: np.ndarray) -> bytes: | |
| """ | |
| Pack a 2D array of 4-bit values (0..15) into bytes: two pixels per byte. | |
| High nibble = first pixel, Low nibble = second pixel. | |
| If width is odd, pad the last low nibble with 0. | |
| """ | |
| h, w = gray4.shape | |
| bytes_per_row = (w + 1) // 2 | |
| out = bytearray(bytes_per_row * h) | |
| idx = 0 | |
| for r in range(h): | |
| row = gray4[r, :] | |
| i = 0 | |
| while i < w: | |
| hi = int(row[i] & 0x0F) | |
| lo = int(row[i + 1] & 0x0F) if i + 1 < w else 0 | |
| out[idx] = (hi << 4) | lo | |
| idx += 1 | |
| i += 2 | |
| return bytes(out) | |
| def unpack_4bpp_rows(packed: bytes, w: int, h: int) -> np.ndarray: | |
| """ | |
| Unpack row-major 4bpp data into a 2D array (H, W) with values 0..15. | |
| Two pixels per byte, high nibble first. | |
| """ | |
| bytes_per_row = (w + 1) // 2 | |
| if len(packed) != bytes_per_row * h: | |
| raise ValueError("Packed data length mismatch for given dimensions") | |
| gray4 = np.zeros((h, w), dtype=np.uint8) | |
| pos = 0 | |
| for r in range(h): | |
| col = 0 | |
| for _ in range(bytes_per_row): | |
| b = packed[pos]; pos += 1 | |
| hi = (b >> 4) & 0x0F | |
| lo = b & 0x0F | |
| gray4[r, col] = hi; col += 1 | |
| if col < w: | |
| gray4[r, col] = lo; col += 1 | |
| return gray4 | |
| def save_g4_bytes(gray4: np.ndarray) -> bytes: | |
| """ | |
| Build a .g4 file in memory with a simple header and packed 4bpp payload. | |
| Header (LE): magic 'G4' (2B), version (1B=1), width (uint32), | |
| height (uint32), reserved (uint32=0). Payload: ceil(width/2)*height bytes. | |
| """ | |
| h, w = gray4.shape | |
| payload = pack_4bpp_rows(gray4) | |
| buf = io.BytesIO() | |
| buf.write(b"G4") | |
| buf.write(struct.pack("<B", 1)) | |
| buf.write(struct.pack("<I", w)) | |
| buf.write(struct.pack("<I", h)) | |
| buf.write(struct.pack("<I", 0)) | |
| buf.write(payload) | |
| return buf.getvalue() | |
| def load_g4_bytes(data: bytes): | |
| """ | |
| Load a .g4 file from bytes, returning (gray4, width, height). | |
| """ | |
| offset = 0 | |
| if data[offset:offset+2] != b"G4": | |
| raise ValueError("Not a G4 file") | |
| offset += 2 | |
| version = data[offset]; offset += 1 | |
| if version != 1: | |
| raise ValueError(f"Unsupported G4 version: {version}") | |
| w = struct.unpack_from("<I", data, offset)[0]; offset += 4 | |
| h = struct.unpack_from("<I", data, offset)[0]; offset += 4 | |
| _reserved = struct.unpack_from("<I", data, offset)[0]; offset += 4 | |
| bytes_per_row = (w + 1) // 2 | |
| expected = bytes_per_row * h | |
| payload = data[offset:offset+expected] | |
| if len(payload) != expected: | |
| raise ValueError("Payload length mismatch") | |
| gray4 = unpack_4bpp_rows(payload, w=w, h=h) | |
| return gray4, w, h | |
| def gray4_to_binary_flat(gray4: np.ndarray) -> list[int]: | |
| """Convert 4-bit value matrix to flat binary list (4 bits per pixel, MSB first).""" | |
| bits = [] | |
| for val in gray4.flatten(): | |
| v = int(val) & 0x0F | |
| bits.extend([(v >> b) & 1 for b in range(3, -1, -1)]) | |
| return bits | |
| def binary_flat_to_gray4(bits: list[int], width: int) -> np.ndarray: | |
| """Convert flat binary list (4 bits per pixel) back to 4-bit value matrix.""" | |
| n_pixels = len(bits) // 4 | |
| values = [] | |
| for i in range(0, n_pixels * 4, 4): | |
| chunk = bits[i:i+4] | |
| val = sum(b << (3 - j) for j, b in enumerate(chunk)) | |
| values.append(val) | |
| height = max(1, int(np.ceil(n_pixels / width))) | |
| padded = np.zeros(width * height, dtype=np.uint8) | |
| padded[:len(values)] = values | |
| return padded.reshape((height, width)) | |
| # ========================= | |
| # Encoding Functions | |
| # ========================= | |
| def encode_to_binary(text: str, scheme: str) -> tuple[list[int], list[str], list[str]]: | |
| """ | |
| Returns (flat_bits, display_units, source_chars). | |
| - display_units: the encoded representation (Base64 symbol, hex byte, ASCII code, Voyager char) | |
| - source_chars: the original text character each chunk maps to | |
| """ | |
| if scheme == "Voyager 6-bit": | |
| bits = [] | |
| for char in text: | |
| val = reverse_voyager_table.get(char.upper(), 0) | |
| bits.extend([(val >> b) & 1 for b in range(5, -1, -1)]) | |
| labels = list(text.upper()) | |
| return bits, labels, list(text) | |
| elif scheme == "ASCII (7-bit)": | |
| bits = [] | |
| for c in text: | |
| val = ord(c) & 0x7F | |
| bits.extend([(val >> b) & 1 for b in range(6, -1, -1)]) | |
| labels = [f"0x{ord(c) & 0x7F:02X}" for c in text] | |
| return bits, labels, list(text) | |
| elif scheme == "UTF-8 (8-bit)": | |
| raw = text.encode("utf-8") | |
| bits = [] | |
| for byte in raw: | |
| bits.extend([(byte >> b) & 1 for b in range(7, -1, -1)]) | |
| labels = [f"0x{b:02X}" for b in raw] | |
| source = [] | |
| for ch in text: | |
| n_bytes = len(ch.encode("utf-8")) | |
| source.extend([ch] * n_bytes) | |
| return bits, labels, source | |
| elif scheme == "Base64 (6-bit)": | |
| raw_bytes = text.encode("utf-8") | |
| b64_str = base64.b64encode(raw_bytes).decode("ascii") | |
| bits = [] | |
| clean = b64_str.rstrip("=") | |
| for c in clean: | |
| val = B64_ALPHABET.index(c) | |
| bits.extend([(val >> b) & 1 for b in range(5, -1, -1)]) | |
| labels = list(clean) | |
| byte_to_char = [] | |
| for ch in text: | |
| n_bytes = len(ch.encode("utf-8")) | |
| byte_to_char.extend([ch] * n_bytes) | |
| source = [] | |
| for j in range(len(clean)): | |
| byte_idx = (j * 6) // 8 | |
| if byte_idx < len(byte_to_char): | |
| source.append(byte_to_char[byte_idx]) | |
| else: | |
| source.append("?") | |
| return bits, labels, source | |
| return [], [], [] | |
| # ========================= | |
| # Decoding Functions | |
| # ========================= | |
| def decode_from_binary(bits: list[int], scheme: str) -> str: | |
| if scheme == "Voyager 6-bit": | |
| chars = [] | |
| for i in range(0, len(bits), 6): | |
| chunk = bits[i:i + 6] | |
| if len(chunk) < 6: | |
| chunk += [0] * (6 - len(chunk)) | |
| val = sum(b << (5 - j) for j, b in enumerate(chunk)) | |
| chars.append(voyager_table.get(val, '?')) | |
| return ''.join(chars) | |
| elif scheme == "ASCII (7-bit)": | |
| chars = [] | |
| for i in range(0, len(bits), 7): | |
| chunk = bits[i:i + 7] | |
| if len(chunk) < 7: | |
| chunk += [0] * (7 - len(chunk)) | |
| val = sum(b << (6 - j) for j, b in enumerate(chunk)) | |
| chars.append(chr(val) if 32 <= val < 127 else '?') | |
| return ''.join(chars) | |
| elif scheme == "UTF-8 (8-bit)": | |
| byte_list = [] | |
| for i in range(0, len(bits), 8): | |
| chunk = bits[i:i + 8] | |
| if len(chunk) < 8: | |
| chunk += [0] * (8 - len(chunk)) | |
| val = sum(b << (7 - j) for j, b in enumerate(chunk)) | |
| byte_list.append(val) | |
| return bytes(byte_list).decode("utf-8", errors="replace") | |
| elif scheme == "Base64 (6-bit)": | |
| chars = [] | |
| for i in range(0, len(bits), 6): | |
| chunk = bits[i:i + 6] | |
| if len(chunk) < 6: | |
| chunk += [0] * (6 - len(chunk)) | |
| val = sum(b << (5 - j) for j, b in enumerate(chunk)) | |
| chars.append(B64_ALPHABET[val]) | |
| b64_str = ''.join(chars) | |
| while len(b64_str) % 4 != 0: | |
| b64_str += '=' | |
| try: | |
| return base64.b64decode(b64_str).decode("utf-8", errors="replace") | |
| except Exception: | |
| return "[Base64 decode error]" | |
| return "" | |
| # ========================= | |
| # Tabs | |
| # ========================= | |
| tab1, tab2, tab3, tab4 = st.tabs(["Encoding", "Decoding", "Data Analytics", "Writing"]) | |
| # -------------------------------------------------- | |
| # TAB 1: Text/Image → Binary | |
| # -------------------------------------------------- | |
| with tab1: | |
| st.markdown(""" | |
| Convert text or an image into binary labels. | |
| Choose an input mode, encoding scheme, and control grouping. | |
| """) | |
| input_mode = st.selectbox("Input mode:", ["Text", "Image"], key="input_mode") | |
| if input_mode == "Text": | |
| st.subheader("Step 1 – Choose Encoding & Input Text") | |
| encoding_scheme = st.selectbox( | |
| "Encoding scheme:", | |
| ENCODING_OPTIONS, | |
| index=0, | |
| key="enc_scheme", | |
| help=( | |
| "**Voyager 6-bit** – Custom 56-character table (A-Z, 0-9, punctuation). 6 bits/char.\n\n" | |
| "**Base64 (6-bit)** – Standard Base64 encoding of UTF-8 bytes. 6 bits/symbol.\n\n" | |
| "**ASCII (7-bit)** – Standard 7-bit ASCII. 7 bits/char.\n\n" | |
| "**UTF-8 (8-bit)** – Full UTF-8 byte encoding. 8 bits/byte. Supports all Unicode." | |
| ) | |
| ) | |
| bits_per = BITS_PER_UNIT[encoding_scheme] | |
| if encoding_scheme == "Voyager 6-bit": | |
| supported = ''.join(voyager_table[i] for i in range(len(voyager_table))) | |
| st.caption(f"Supported characters ({len(voyager_table)}): `{supported}`") | |
| user_input = st.text_input("Enter your text:", value="DNA", key="input_text") | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| group_size = st.slider("Select number of target positions:", min_value=12, max_value=128, value=25) | |
| with col2: | |
| custom_cols = st.number_input("Or enter custom number:", min_value=1, max_value=512, value=group_size) | |
| if custom_cols != group_size: | |
| group_size = custom_cols | |
| if user_input: | |
| binary_labels, display_units, source_chars = encode_to_binary(user_input, encoding_scheme) | |
| binary_concat = ''.join(map(str, binary_labels)) | |
| st.markdown("### Output 1 – Binary Labels per Character") | |
| st.caption(f"Encoding: **{encoding_scheme}** — {bits_per} bits per unit") | |
| grouped_bits = [binary_labels[i:i + bits_per] for i in range(0, len(binary_labels), bits_per)] | |
| scroll_html = ( | |
| "<div style='max-height:300px; overflow-y:auto; font-family:monospace; " | |
| "padding:6px; border:1px solid #ccc;'>" | |
| ) | |
| for i, bits in enumerate(grouped_bits): | |
| src = source_chars[i] if i < len(source_chars) else "?" | |
| enc = display_units[i] if i < len(display_units) else "?" | |
| if encoding_scheme == "Voyager 6-bit": | |
| scroll_html += f"<div>'{src}' → {bits}</div>" | |
| else: | |
| scroll_html += f"<div>'{src}' → '{enc}' → {bits}</div>" | |
| scroll_html += "</div>" | |
| st.markdown(scroll_html, unsafe_allow_html=True) | |
| per_char_lines = [] | |
| for i, bits in enumerate(grouped_bits): | |
| src = source_chars[i] if i < len(source_chars) else "?" | |
| enc = display_units[i] if i < len(display_units) else "?" | |
| bit_str = ''.join(map(str, bits)) | |
| if encoding_scheme == "Voyager 6-bit": | |
| per_char_lines.append(f"'{src}' → {bit_str}") | |
| else: | |
| per_char_lines.append(f"'{src}' → '{enc}' → {bit_str}") | |
| st.download_button( | |
| "⬇️ Download Binary per Character (.txt)", | |
| data='\n'.join(per_char_lines), | |
| file_name="binary_per_unit.txt", | |
| mime="text/plain", | |
| key="download_per_unit" | |
| ) | |
| st.download_button( | |
| "⬇️ Download Concatenated Binary String", | |
| data=binary_concat, | |
| file_name="binary_full.txt", | |
| mime="text/plain", | |
| key="download_binary_txt" | |
| ) | |
| st.markdown("### Output 2 – Binary matrix split into reactions grouped by target position") | |
| groups = [] | |
| for i in range(0, len(binary_labels), group_size): | |
| group = binary_labels[i:i + group_size] | |
| if len(group) < group_size: | |
| group += [0] * (group_size - len(group)) | |
| groups.append(group) | |
| columns = [f"Position {i+1}" for i in range(group_size)] | |
| df = pd.DataFrame(groups, columns=columns) | |
| df.insert(0, "Sample", range(1, len(df) + 1)) | |
| st.dataframe(df, width="stretch") | |
| st.download_button( | |
| "⬇️ Download as CSV", | |
| df.to_csv(index=False), | |
| file_name=f"binary_labels_{group_size}_positions.csv", | |
| mime="text/csv", | |
| key="download_binary_csv" | |
| ) | |
| else: | |
| st.info("👆 Enter text above to see binary labels.") | |
| # ===================================================== | |
| # IMAGE INPUT MODE | |
| # ===================================================== | |
| else: | |
| st.subheader("Step 1 – Upload Image & Set Resolution") | |
| image_type = st.selectbox( | |
| "Image type:", | |
| ["Black & White (1-bit)", "Grayscale (4-bit)"], | |
| key="enc_image_type", | |
| help=( | |
| "**Black & White (1-bit)** — Each pixel = 1 bit (0 or 1). Uses a brightness threshold.\n\n" | |
| "**Grayscale (4-bit)** — Each pixel = 4 bits (0–15 levels). " | |
| "Uniform quantization in sRGB/BT.601 luma space. 0 = black, 15 = white. " | |
| "Two pixels per byte, high-nibble first; rows top-to-bottom, no row padding." | |
| ) | |
| ) | |
| uploaded_img = st.file_uploader( | |
| "Upload an image (PNG, JPG, BMP, etc.):", | |
| type=["png", "jpg", "jpeg", "bmp", "gif", "tiff", "webp"], | |
| key="img_uploader" | |
| ) | |
| if uploaded_img is not None: | |
| img = Image.open(uploaded_img).convert("L") # grayscale | |
| orig_w, orig_h = img.size | |
| aspect = orig_h / orig_w | |
| st.image(img, caption=f"Original (grayscale) — {orig_w}×{orig_h} px", use_container_width=True) | |
| st.markdown("#### ⚙️ Resolution") | |
| target_width = st.slider( | |
| "Output width (pixels):", | |
| min_value=8, max_value=min(orig_w, 256), value=min(64, orig_w), step=1, | |
| help="Height is auto-calculated from aspect ratio." | |
| ) | |
| target_height = max(1, int(round(target_width * aspect))) | |
| img_resized = img.resize((target_width, target_height), Image.LANCZOS) | |
| img_array = np.array(img_resized) | |
| # =========================================================== | |
| # BLACK & WHITE (1-bit) | |
| # =========================================================== | |
| if image_type == "Black & White (1-bit)": | |
| total_bits = target_width * target_height | |
| st.caption(f"Output size: **{target_width} × {target_height}** = **{total_bits:,}** bits (1 bit/pixel)") | |
| threshold = st.slider( | |
| "Black/white threshold:", | |
| min_value=0, max_value=255, value=128, | |
| help="Pixels darker than this → 1 (black). Brighter → 0 (white)." | |
| ) | |
| binary_matrix = (img_array < threshold).astype(int) | |
| st.markdown("### Preview — Black & White Output") | |
| col_prev1, col_prev2 = st.columns(2) | |
| with col_prev1: | |
| st.image(img_resized, caption=f"Resized grayscale ({target_width}×{target_height})", use_container_width=True) | |
| with col_prev2: | |
| bw_display = Image.fromarray(((1 - binary_matrix) * 255).astype(np.uint8)) | |
| st.image(bw_display, caption=f"Binary B&W ({target_width}×{target_height})", use_container_width=True) | |
| binary_labels = binary_matrix.flatten().tolist() | |
| binary_concat = ''.join(map(str, binary_labels)) | |
| n_ones = sum(binary_labels) | |
| st.markdown("### Output 1 – Image Info") | |
| st.markdown( | |
| f"- **Dimensions:** {target_width} × {target_height} \n" | |
| f"- **Bits per pixel:** 1 \n" | |
| f"- **Total bits:** {total_bits:,} \n" | |
| f"- **Black pixels (1):** {n_ones:,} \n" | |
| f"- **White pixels (0):** {total_bits - n_ones:,}" | |
| ) | |
| st.download_button( | |
| "⬇️ Download Concatenated Binary String", | |
| data=binary_concat, | |
| file_name="image_binary_full.txt", | |
| mime="text/plain", | |
| key="download_img_binary_txt" | |
| ) | |
| st.markdown("### Output 2 – Binary Matrix by dimension (Samples × Positions)") | |
| columns = [f"Position {i+1}" for i in range(target_width)] | |
| df_img = pd.DataFrame(binary_matrix, columns=columns) | |
| df_img.insert(0, "Sample", range(1, len(df_img) + 1)) | |
| st.dataframe(df_img, width="stretch") | |
| st.download_button( | |
| "⬇️ Download as CSV", | |
| df_img.to_csv(index=False), | |
| file_name=f"image_binary_{target_width}x{target_height}.csv", | |
| mime="text/csv", | |
| key="download_img_csv" | |
| ) | |
| st.markdown("### Output 3 – Custom Grouped Matrix by Number of Target Positions") | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| img_group_size = st.slider( | |
| "Select number of target positions:", | |
| min_value=12, max_value=128, value=target_width, key="img_group_slider" | |
| ) | |
| with col2: | |
| img_custom_cols = st.number_input( | |
| "Or enter custom number:", | |
| min_value=1, max_value=512, value=img_group_size, key="img_custom_cols" | |
| ) | |
| if img_custom_cols != img_group_size: | |
| img_group_size = img_custom_cols | |
| groups = [] | |
| for i in range(0, len(binary_labels), img_group_size): | |
| group = binary_labels[i:i + img_group_size] | |
| if len(group) < img_group_size: | |
| group += [0] * (img_group_size - len(group)) | |
| groups.append(group) | |
| columns_g = [f"Position {i+1}" for i in range(img_group_size)] | |
| df_grouped = pd.DataFrame(groups, columns=columns_g) | |
| df_grouped.insert(0, "Sample", range(1, len(df_grouped) + 1)) | |
| st.dataframe(df_grouped, width="stretch") | |
| st.download_button( | |
| "⬇️ Download Grouped CSV", | |
| df_grouped.to_csv(index=False), | |
| file_name=f"image_binary_grouped_{img_group_size}_positions.csv", | |
| mime="text/csv", | |
| key="download_img_grouped_csv" | |
| ) | |
| # =========================================================== | |
| # GRAYSCALE (4-bit) | |
| # =========================================================== | |
| else: | |
| n_pixels = target_width * target_height | |
| total_bits = n_pixels * 4 | |
| st.caption( | |
| f"Output size: **{target_width} × {target_height}** = **{n_pixels:,}** pixels × 4 bits = " | |
| f"**{total_bits:,}** bits" | |
| ) | |
| gray4_matrix = quantize_to_4bit(img_array) | |
| gray8_preview = gray4_to_gray8(gray4_matrix) | |
| st.markdown("### Preview — 4-bit Grayscale (16 levels)") | |
| col_prev1, col_prev2 = st.columns(2) | |
| with col_prev1: | |
| st.image(img_resized, caption=f"Original resized ({target_width}×{target_height}, 256 levels)", use_container_width=True) | |
| with col_prev2: | |
| st.image( | |
| Image.fromarray(gray8_preview), | |
| caption=f"4-bit quantized ({target_width}×{target_height}, 16 levels)", | |
| use_container_width=True | |
| ) | |
| # Binary flat | |
| binary_labels = gray4_to_binary_flat(gray4_matrix) | |
| binary_concat = ''.join(map(str, binary_labels)) | |
| st.markdown("### Output 1 – Image Info") | |
| unique_vals, counts = np.unique(gray4_matrix, return_counts=True) | |
| st.markdown( | |
| f"- **Dimensions:** {target_width} × {target_height} \n" | |
| f"- **Bits per pixel:** 4 (values 0–15) \n" | |
| f"- **Total pixels:** {n_pixels:,} \n" | |
| f"- **Total bits:** {total_bits:,} \n" | |
| f"- **Unique levels used:** {len(unique_vals)} of 16" | |
| ) | |
| # Downloads: binary string, packed .g4 file | |
| col_dl1, col_dl2 = st.columns(2) | |
| with col_dl1: | |
| st.download_button( | |
| "⬇️ Download Binary String (.txt, 4 bits/pixel)", | |
| data=binary_concat, | |
| file_name="image_gray4_binary_full.txt", | |
| mime="text/plain", | |
| key="download_g4_binary_txt" | |
| ) | |
| with col_dl2: | |
| g4_bytes = save_g4_bytes(gray4_matrix) | |
| st.download_button( | |
| "⬇️ Download Packed .g4 File", | |
| data=g4_bytes, | |
| file_name=f"image_{target_width}x{target_height}.g4", | |
| mime="application/octet-stream", | |
| key="download_g4_file" | |
| ) | |
| # Value matrix (0-15 per pixel) | |
| st.markdown("### Output 2 – Value Matrix (0–15 per pixel)") | |
| st.caption("Each cell = one pixel's 4-bit grayscale level. 0 = black, 15 = white.") | |
| columns_v = [f"Position {i+1}" for i in range(target_width)] | |
| df_val = pd.DataFrame(gray4_matrix.astype(int), columns=columns_v) | |
| df_val.insert(0, "Sample", range(1, len(df_val) + 1)) | |
| st.dataframe(df_val, width="stretch") | |
| st.download_button( | |
| "⬇️ Download Value Matrix CSV (0–15)", | |
| df_val.to_csv(index=False), | |
| file_name=f"image_gray4_values_{target_width}x{target_height}.csv", | |
| mime="text/csv", | |
| key="download_g4_values_csv" | |
| ) | |
| # Binary matrix (4 bits per pixel → width*4 binary columns per row) | |
| st.markdown("### Output 3 – Binary Matrix (4 bits per pixel)") | |
| st.caption("Each pixel expanded to 4 binary columns. Row width = image width × 4.") | |
| bin_width = target_width * 4 | |
| bin_matrix = np.array(binary_labels).reshape((target_height, bin_width)) | |
| columns_b = [f"Position {i+1}" for i in range(bin_width)] | |
| df_bin = pd.DataFrame(bin_matrix, columns=columns_b) | |
| df_bin.insert(0, "Sample", range(1, len(df_bin) + 1)) | |
| st.dataframe(df_bin, width="stretch") | |
| st.download_button( | |
| "⬇️ Download Binary Matrix CSV", | |
| df_bin.to_csv(index=False), | |
| file_name=f"image_gray4_binary_{target_width}x{target_height}.csv", | |
| mime="text/csv", | |
| key="download_g4_binary_csv" | |
| ) | |
| # Custom grouped | |
| st.markdown("### Output 4 – Custom Grouped Matrix by Number of Target Positions") | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| g4_group_size = st.slider( | |
| "Select number of target positions:", | |
| min_value=12, max_value=256, value=bin_width, key="g4_group_slider" | |
| ) | |
| with col2: | |
| g4_custom_cols = st.number_input( | |
| "Or enter custom number:", | |
| min_value=1, max_value=1024, value=g4_group_size, key="g4_custom_cols" | |
| ) | |
| if g4_custom_cols != g4_group_size: | |
| g4_group_size = g4_custom_cols | |
| groups = [] | |
| for i in range(0, len(binary_labels), g4_group_size): | |
| group = binary_labels[i:i + g4_group_size] | |
| if len(group) < g4_group_size: | |
| group += [0] * (g4_group_size - len(group)) | |
| groups.append(group) | |
| columns_cg = [f"Position {i+1}" for i in range(g4_group_size)] | |
| df_cg = pd.DataFrame(groups, columns=columns_cg) | |
| df_cg.insert(0, "Sample", range(1, len(df_cg) + 1)) | |
| st.dataframe(df_cg, width="stretch") | |
| st.download_button( | |
| "⬇️ Download Grouped CSV", | |
| df_cg.to_csv(index=False), | |
| file_name=f"image_gray4_grouped_{g4_group_size}_positions.csv", | |
| mime="text/csv", | |
| key="download_g4_grouped_csv" | |
| ) | |
| else: | |
| st.info("👆 Upload an image to encode it as binary.") | |
| # -------------------------------------------------- | |
| # TAB 2: Decoding (Text & Image) | |
| # -------------------------------------------------- | |
| with tab2: | |
| st.markdown(""" | |
| Decode binary data back into **text** or render it as an **image**. | |
| """) | |
| decode_mode = st.selectbox("Output mode:", ["Text", "Image"], key="decode_mode") | |
| if decode_mode == "Text": | |
| st.markdown(""" | |
| Upload either: | |
| - `.csv` file with 0/1 values (any number of columns/rows) | |
| - `.xlsx` Excel file | |
| - `.txt` file containing a concatenated binary string (e.g. `010101...`) | |
| """) | |
| decode_scheme = st.selectbox( | |
| "Decoding scheme (must match the encoding used):", | |
| ENCODING_OPTIONS, | |
| index=0, | |
| key="dec_scheme", | |
| help="Select the same encoding scheme that was used to produce the binary data." | |
| ) | |
| uploaded_decode = st.file_uploader( | |
| "Upload your file (.csv, .xlsx, or .txt):", | |
| type=["csv", "xlsx", "txt"], | |
| key="decode_uploader" | |
| ) | |
| if uploaded_decode is not None: | |
| try: | |
| if uploaded_decode.name.endswith(".csv"): | |
| df = pd.read_csv(uploaded_decode) | |
| bits = df.values.flatten().astype(int).tolist() | |
| elif uploaded_decode.name.endswith(".xlsx"): | |
| df = pd.read_excel(uploaded_decode) | |
| bits = df.values.flatten().astype(int).tolist() | |
| elif uploaded_decode.name.endswith(".txt"): | |
| content = uploaded_decode.read().decode().strip() | |
| bits = [int(b) for b in content if b in ['0', '1']] | |
| else: | |
| bits = [] | |
| if not bits: | |
| st.warning("No binary data detected.") | |
| else: | |
| recovered_text = decode_from_binary(bits, decode_scheme) | |
| st.success(f"✅ Conversion complete using **{decode_scheme}**!") | |
| st.markdown("**Recovered text:**") | |
| st.text_area("Output", recovered_text, height=150) | |
| st.download_button( | |
| "⬇️ Download Recovered Text (.txt)", | |
| data=recovered_text, | |
| file_name="recovered_text.txt", | |
| mime="text/plain", | |
| key="download_recovered" | |
| ) | |
| except Exception as e: | |
| st.error(f"Error reading or converting file: {e}") | |
| else: | |
| st.info("👆 Upload a file to start the reverse conversion.") | |
| # ===================================================== | |
| # IMAGE DECODE MODE | |
| # ===================================================== | |
| else: | |
| dec_image_type = st.selectbox( | |
| "Image type:", | |
| ["Black & White (1-bit)", "Grayscale (4-bit)"], | |
| key="dec_image_type", | |
| help=( | |
| "**Black & White** — Input is 0/1 binary data. Each value = 1 pixel.\n\n" | |
| "**Grayscale (4-bit)** — Input is a **value matrix (0–15)**, **binary data** " | |
| "(every 4 bits = one pixel), or a packed **.g4 file**." | |
| ) | |
| ) | |
| # =========================================================== | |
| # DECODE: B&W (1-bit) | |
| # =========================================================== | |
| if dec_image_type == "Black & White (1-bit)": | |
| st.markdown(""" | |
| Render binary data (0/1) as a **black & white image**. | |
| Upload a binary matrix CSV (rows × positions) or a concatenated binary `.txt` string. | |
| """) | |
| img_preview_file = st.file_uploader( | |
| "📤 Upload binary data file (.csv, .xlsx, or .txt):", | |
| type=["csv", "xlsx", "txt"], | |
| key="img_preview_uploader" | |
| ) | |
| if img_preview_file is not None: | |
| try: | |
| if img_preview_file.name.endswith(".csv"): | |
| idf = pd.read_csv(img_preview_file) | |
| if "Sample" in idf.columns or "sample" in idf.columns: | |
| idf = idf.drop(columns=[c for c in idf.columns if c.lower() == "sample"]) | |
| bits_matrix = idf.values.flatten().astype(int) | |
| detected_width = len(idf.columns) | |
| elif img_preview_file.name.endswith(".xlsx"): | |
| idf = pd.read_excel(img_preview_file) | |
| if "Sample" in idf.columns or "sample" in idf.columns: | |
| idf = idf.drop(columns=[c for c in idf.columns if c.lower() == "sample"]) | |
| bits_matrix = idf.values.flatten().astype(int) | |
| detected_width = len(idf.columns) | |
| elif img_preview_file.name.endswith(".txt"): | |
| content = img_preview_file.read().decode().strip() | |
| bits_matrix = np.array([int(b) for b in content if b in ['0', '1']]) | |
| detected_width = None | |
| else: | |
| bits_matrix = np.array([]) | |
| detected_width = None | |
| if len(bits_matrix) == 0: | |
| st.warning("No binary data detected.") | |
| else: | |
| total_bits = len(bits_matrix) | |
| st.success(f"✅ Loaded **{total_bits:,}** bits.") | |
| st.markdown("#### ⚙️ Image Dimensions") | |
| if detected_width and detected_width > 1: | |
| default_w = detected_width | |
| st.caption(f"Auto-detected width from columns: **{detected_width}**") | |
| else: | |
| default_w = max(1, int(np.sqrt(total_bits))) | |
| img_width = st.number_input( | |
| "Image width (pixels / positions per row):", | |
| min_value=1, max_value=total_bits, value=default_w, step=1, | |
| key="img_preview_width" | |
| ) | |
| img_height = int(np.ceil(total_bits / img_width)) | |
| st.caption(f"Image size: **{img_width} × {img_height}** = **{img_width * img_height:,}** pixels " | |
| f"({total_bits:,} bits, {img_width * img_height - total_bits} padded)") | |
| padded = np.zeros(img_width * img_height, dtype=int) | |
| padded[:total_bits] = bits_matrix[:total_bits] | |
| img_data = padded.reshape((img_height, img_width)) | |
| img_render = ((1 - img_data) * 255).astype(np.uint8) | |
| pil_img = Image.fromarray(img_render, mode="L") | |
| st.markdown("### 🖼️ Rendered Image") | |
| display_scale = max(1, 256 // img_width) | |
| display_w = img_width * display_scale | |
| display_h = img_height * display_scale | |
| pil_display = pil_img.resize((display_w, display_h), Image.NEAREST) | |
| st.image(pil_display, caption=f"Binary image — {img_width}×{img_height} (1=black, 0=white)") | |
| ones = int(bits_matrix.sum()) | |
| st.markdown( | |
| f"- **Black pixels (1):** {ones:,} ({100*ones/total_bits:.1f}%) \n" | |
| f"- **White pixels (0):** {total_bits - ones:,} ({100*(total_bits-ones)/total_bits:.1f}%)" | |
| ) | |
| buf = io.BytesIO() | |
| pil_img.save(buf, format="PNG") | |
| st.download_button( | |
| "⬇️ Download as PNG", | |
| data=buf.getvalue(), | |
| file_name=f"binary_image_{img_width}x{img_height}.png", | |
| mime="image/png", | |
| key="download_preview_png" | |
| ) | |
| buf_hr = io.BytesIO() | |
| pil_display.save(buf_hr, format="PNG") | |
| st.download_button( | |
| "⬇️ Download Scaled PNG (for viewing)", | |
| data=buf_hr.getvalue(), | |
| file_name=f"binary_image_{display_w}x{display_h}_scaled.png", | |
| mime="image/png", | |
| key="download_preview_png_scaled" | |
| ) | |
| except Exception as e: | |
| st.error(f"❌ Error processing file: {e}") | |
| import traceback | |
| st.code(traceback.format_exc()) | |
| else: | |
| st.info("👆 Upload a binary data file (CSV or TXT) to render as an image.") | |
| # =========================================================== | |
| # DECODE: GRAYSCALE (4-bit) | |
| # =========================================================== | |
| else: | |
| g4_input_format = st.selectbox( | |
| "Input data format:", | |
| ["Value matrix (0–15)", "Binary (4 bits per pixel)", "Packed .g4 file"], | |
| key="g4_input_format", | |
| help=( | |
| "**Value matrix** — CSV/XLSX where each cell is a pixel value 0–15. " | |
| "Rows = pixel rows, columns = pixel columns.\n\n" | |
| "**Binary** — 0/1 data where every 4 consecutive bits encode one pixel (0–15).\n\n" | |
| "**Packed .g4 file** — Binary file with G4 header + packed 4bpp payload " | |
| "(two pixels per byte, high-nibble first)." | |
| ) | |
| ) | |
| st.markdown("Render 4-bit grayscale data as an image (16 levels, 0=black, 15=white).") | |
| # Accept .g4 files in addition to csv/xlsx/txt | |
| accept_types = ["csv", "xlsx", "txt"] | |
| if g4_input_format == "Packed .g4 file": | |
| accept_types = ["g4"] | |
| g4_file = st.file_uploader( | |
| f"📤 Upload data file ({', '.join('.' + t for t in accept_types)}):", | |
| type=accept_types, | |
| key="g4_decode_uploader" | |
| ) | |
| if g4_file is not None: | |
| try: | |
| gray4_matrix = None | |
| img_width = None | |
| img_height = None | |
| # ---- Packed .g4 file ---- | |
| if g4_input_format == "Packed .g4 file": | |
| raw_data = g4_file.read() | |
| gray4_matrix, img_width, img_height = load_g4_bytes(raw_data) | |
| # ---- Value matrix (0-15) ---- | |
| elif g4_input_format == "Value matrix (0–15)": | |
| if g4_file.name.endswith(".csv"): | |
| gdf = pd.read_csv(g4_file) | |
| elif g4_file.name.endswith(".xlsx"): | |
| gdf = pd.read_excel(g4_file) | |
| else: | |
| content = g4_file.read().decode().strip() | |
| rows = [list(map(int, line.split())) for line in content.splitlines() if line.strip()] | |
| gdf = pd.DataFrame(rows) | |
| if "Sample" in gdf.columns or "sample" in gdf.columns: | |
| gdf = gdf.drop(columns=[c for c in gdf.columns if c.lower() == "sample"]) | |
| gray4_matrix = gdf.values.astype(int) | |
| gray4_matrix = np.clip(gray4_matrix, 0, 15).astype(np.uint8) | |
| img_height, img_width = gray4_matrix.shape | |
| # ---- Binary (4 bits per pixel) ---- | |
| else: | |
| if g4_file.name.endswith(".csv"): | |
| bdf = pd.read_csv(g4_file) | |
| if "Sample" in bdf.columns or "sample" in bdf.columns: | |
| bdf = bdf.drop(columns=[c for c in bdf.columns if c.lower() == "sample"]) | |
| flat_bits = bdf.values.flatten().astype(int).tolist() | |
| detected_cols = len(bdf.columns) | |
| img_width = detected_cols // 4 if detected_cols >= 4 else max(1, int(np.sqrt(len(flat_bits) // 4))) | |
| elif g4_file.name.endswith(".xlsx"): | |
| bdf = pd.read_excel(g4_file) | |
| if "Sample" in bdf.columns or "sample" in bdf.columns: | |
| bdf = bdf.drop(columns=[c for c in bdf.columns if c.lower() == "sample"]) | |
| flat_bits = bdf.values.flatten().astype(int).tolist() | |
| detected_cols = len(bdf.columns) | |
| img_width = detected_cols // 4 if detected_cols >= 4 else max(1, int(np.sqrt(len(flat_bits) // 4))) | |
| elif g4_file.name.endswith(".txt"): | |
| content = g4_file.read().decode().strip() | |
| flat_bits = [int(b) for b in content if b in ['0', '1']] | |
| img_width = max(1, int(np.sqrt(len(flat_bits) // 4))) | |
| else: | |
| flat_bits = [] | |
| img_width = 1 | |
| gray4_matrix = binary_flat_to_gray4(flat_bits, img_width) | |
| img_height = gray4_matrix.shape[0] | |
| n_pixels = img_width * img_height | |
| st.success(f"✅ Loaded **{n_pixels:,}** pixels ({img_width} × {img_height}).") | |
| # Width override | |
| st.markdown("#### ⚙️ Image Dimensions") | |
| img_width_adj = st.number_input( | |
| "Image width (pixels per row):", | |
| min_value=1, max_value=n_pixels, value=img_width, step=1, | |
| key="g4_preview_width" | |
| ) | |
| if img_width_adj != img_width: | |
| flat_vals = gray4_matrix.flatten() | |
| new_h = max(1, int(np.ceil(len(flat_vals) / img_width_adj))) | |
| padded = np.zeros(img_width_adj * new_h, dtype=np.uint8) | |
| padded[:len(flat_vals)] = flat_vals | |
| gray4_matrix = padded.reshape((new_h, img_width_adj)) | |
| img_width = img_width_adj | |
| img_height = new_h | |
| st.caption(f"Image size: **{img_width} × {img_height}**") | |
| # Render | |
| gray8_render = gray4_to_gray8(gray4_matrix) | |
| pil_img = Image.fromarray(gray8_render, mode="L") | |
| st.markdown("### 🖼️ Rendered Image (4-bit Grayscale)") | |
| display_scale = max(1, 256 // img_width) | |
| display_w = img_width * display_scale | |
| display_h = img_height * display_scale | |
| pil_display = pil_img.resize((display_w, display_h), Image.NEAREST) | |
| st.image(pil_display, caption=f"4-bit grayscale — {img_width}×{img_height} (0=black, 15=white)") | |
| # Stats | |
| unique_vals, counts = np.unique(gray4_matrix, return_counts=True) | |
| st.markdown( | |
| f"- **Dimensions:** {img_width} × {img_height} \n" | |
| f"- **Unique levels:** {len(unique_vals)} of 16 \n" | |
| f"- **Min / Max value:** {gray4_matrix.min()} / {gray4_matrix.max()}" | |
| ) | |
| # Downloads | |
| buf = io.BytesIO() | |
| pil_img.save(buf, format="PNG") | |
| st.download_button( | |
| "⬇️ Download as PNG", | |
| data=buf.getvalue(), | |
| file_name=f"gray4_image_{img_width}x{img_height}.png", | |
| mime="image/png", | |
| key="download_g4_png" | |
| ) | |
| buf_hr = io.BytesIO() | |
| pil_display.save(buf_hr, format="PNG") | |
| st.download_button( | |
| "⬇️ Download Scaled PNG (for viewing)", | |
| data=buf_hr.getvalue(), | |
| file_name=f"gray4_image_{display_w}x{display_h}_scaled.png", | |
| mime="image/png", | |
| key="download_g4_png_scaled" | |
| ) | |
| except Exception as e: | |
| st.error(f"❌ Error processing file: {e}") | |
| import traceback | |
| st.code(traceback.format_exc()) | |
| else: | |
| st.info("👆 Upload a 4-bit grayscale data file to render as an image.") | |
| # -------------------------------------------------- | |
| # TAB 3: Data Analytics | |
| # -------------------------------------------------- | |
| with tab3: | |
| st.header("📊 Data Analytics") | |
| st.markdown(""" | |
| Upload your sample data file (Excel or CSV) for a quick exploratory assessment of the editing rates distribution. | |
| The file should contain samples as rows and position columns with editing values. | |
| This tab provides visualizations **before** any binary labelling. | |
| """) | |
| analytics_uploaded = st.file_uploader( | |
| "📤 Upload data file", | |
| type=["xlsx", "csv"], | |
| key="analytics_uploader" | |
| ) | |
| if analytics_uploaded is not None: | |
| try: | |
| if analytics_uploaded.name.endswith(".xlsx"): | |
| adf = pd.read_excel(analytics_uploaded) | |
| else: | |
| adf = pd.read_csv(analytics_uploaded) | |
| st.success(f"✅ Loaded file with {len(adf)} rows and {len(adf.columns)} columns") | |
| adf.columns = [str(c).strip() for c in adf.columns] | |
| non_pos_keywords = {"sample", "description", "descritpion", "total edited", | |
| 'volume per "1"', "volume per 1", "id", "name"} | |
| position_cols = [c for c in adf.columns | |
| if c.lower() not in non_pos_keywords | |
| and pd.to_numeric(adf[c], errors="coerce").notna().any()] | |
| def pos_sort_key(col_name: str): | |
| m = re.search(r"(\d+)", col_name) | |
| return int(m.group(1)) if m else 10**9 | |
| position_cols = sorted(position_cols, key=pos_sort_key) | |
| if not position_cols: | |
| st.error("No numeric position columns detected.") | |
| st.stop() | |
| st.info(f"Detected **{len(position_cols)}** position columns and **{len(adf)}** samples.") | |
| pos_data = adf[position_cols].apply(pd.to_numeric, errors="coerce").fillna(0.0) | |
| if "Total edited" in adf.columns: | |
| total_edited = pd.to_numeric(adf["Total edited"], errors="coerce").fillna(0.0) | |
| else: | |
| total_edited = pos_data.sum(axis=1) | |
| st.markdown("### 1️⃣ Raw Data Distribution") | |
| st.caption("Visualize editing values across all positions and samples — before any binary labelling.") | |
| transform_option = st.selectbox( | |
| "Value transformation:", | |
| ["Raw (linear)", "log1p", "log1p → log1p", "log1p → pos. norm."], | |
| index=0, | |
| key="transform_select", | |
| help=( | |
| "**Raw** — No transformation.\n\n" | |
| "**log1p** — `log(1 + x)`. Compresses high values, spreads low range.\n\n" | |
| "**log1p → log1p** — Double log1p. Even stronger compression.\n\n" | |
| "**log1p → pos. norm.** — log1p then robust per-position normalization " | |
| "(median / IQR scaling per position column)." | |
| ) | |
| ) | |
| def robust_pos_normalize_log1p(data: pd.DataFrame) -> pd.DataFrame: | |
| logged = np.log1p(data) | |
| result = logged.copy() | |
| for col in result.columns: | |
| med = result[col].median() | |
| q75, q25 = result[col].quantile(0.75), result[col].quantile(0.25) | |
| iqr = q75 - q25 | |
| if iqr > 0: | |
| result[col] = (result[col] - med) / iqr | |
| else: | |
| result[col] = result[col] - med | |
| return result | |
| if transform_option == "log1p": | |
| transformed = np.log1p(pos_data) | |
| value_label = "Editing Value (log1p)" | |
| transform_tag = "log1p" | |
| elif transform_option == "log1p → log1p": | |
| transformed = np.log1p(np.log1p(pos_data)) | |
| value_label = "Editing Value (log1p → log1p)" | |
| transform_tag = "log1p_log1p" | |
| elif transform_option == "log1p → pos. norm.": | |
| transformed = robust_pos_normalize_log1p(pos_data) | |
| value_label = "Editing Value (log1p → pos. norm.)" | |
| transform_tag = "log1p_posnorm" | |
| else: | |
| transformed = pos_data | |
| value_label = "Editing Value" | |
| transform_tag = "raw" | |
| melted = transformed.melt(var_name="Position", value_name="Value") | |
| melted["Position_idx"] = melted["Position"].apply( | |
| lambda x: int(re.search(r"(\d+)", str(x)).group(1)) if re.search(r"(\d+)", str(x)) else 0 | |
| ) | |
| st.markdown("#### 📊 Histogram — All Values") | |
| n_bins = st.number_input("Number of bins:", min_value=10, max_value=300, value=80, step=10, key="hist_bins") | |
| fig2, ax2 = plt.subplots(figsize=(10, 4)) | |
| ax2.hist(melted["Value"].values, bins=n_bins, color="#4F46E5", edgecolor="white", linewidth=0.3) | |
| ax2.set_xlabel(value_label) | |
| ax2.set_ylabel("Count") | |
| ax2.set_title(f"Raw Values Distribution ({transform_tag})") | |
| val_min = melted["Value"].min() | |
| val_max = melted["Value"].max() | |
| val_range = val_max - val_min | |
| if val_range <= 2: | |
| tick_step = 0.1 | |
| elif val_range <= 6: | |
| tick_step = 0.2 | |
| elif val_range <= 20: | |
| tick_step = 1 | |
| else: | |
| tick_step = 5 | |
| ax2.set_xticks(np.arange(np.floor(val_min / tick_step) * tick_step, | |
| val_max + tick_step, tick_step)) | |
| ax2.tick_params(axis='x', labelsize=8, rotation=45) | |
| ax2.grid(axis='y', alpha=0.3) | |
| fig2.tight_layout() | |
| st.pyplot(fig2) | |
| st.markdown("#### 2️⃣ Density Scatter Plot (FACS-style)") | |
| st.caption("Each dot = one measurement (sample × position). Color = local point density.") | |
| x_vals = melted["Position_idx"].values.astype(float) | |
| y_vals = melted["Value"].values.astype(float) | |
| x_jittered = x_vals + np.random.default_rng(42).uniform(-0.3, 0.3, size=len(x_vals)) | |
| with st.spinner("Computing point density..."): | |
| try: | |
| xy = np.vstack([x_jittered, y_vals]) | |
| density = gaussian_kde(xy)(xy) | |
| except np.linalg.LinAlgError: | |
| density = np.ones(len(x_vals)) | |
| sort_idx = density.argsort() | |
| x_plot = x_jittered[sort_idx] | |
| y_plot = y_vals[sort_idx] | |
| d_plot = density[sort_idx] | |
| fig3, ax3 = plt.subplots(figsize=(12, 6)) | |
| scatter = ax3.scatter(x_plot, y_plot, c=d_plot, cmap="jet", s=8, alpha=0.7, edgecolors="none") | |
| cbar = fig3.colorbar(scatter, ax=ax3, label="Density") | |
| ax3.set_xlabel("Position") | |
| ax3.set_ylabel(value_label) | |
| ax3.set_title(f"Density Scatter — Position vs. {value_label}") | |
| ax3.set_xticks(sorted(melted["Position_idx"].unique())) | |
| ax3.grid(alpha=0.2) | |
| fig3.tight_layout() | |
| st.pyplot(fig3) | |
| st.markdown("#### 3️⃣ 2D Density Heatmap") | |
| st.caption("Binned heatmap of editing values by position — similar to a FACS density plot.") | |
| y_bins = st.slider("Vertical bins:", min_value=20, max_value=150, value=60, key="heatmap_ybins") | |
| positions_unique = sorted(melted["Position_idx"].unique()) | |
| n_positions = len(positions_unique) | |
| fig4, ax4 = plt.subplots(figsize=(12, 6)) | |
| h = ax4.hist2d( | |
| x_vals, y_vals, | |
| bins=[n_positions, y_bins], | |
| cmap="jet", | |
| norm=mcolors.LogNorm() if melted["Value"].max() > 0 else None, | |
| ) | |
| fig4.colorbar(h[3], ax=ax4, label="Count (log scale)") | |
| ax4.set_xlabel("Position") | |
| ax4.set_ylabel(value_label) | |
| ax4.set_title(f"2D Density Heatmap — Position vs. {value_label}") | |
| ax4.set_xticks(positions_unique) | |
| ax4.grid(alpha=0.15) | |
| fig4.tight_layout() | |
| st.pyplot(fig4) | |
| except Exception as e: | |
| st.error(f"❌ Error processing file: {e}") | |
| import traceback | |
| st.code(traceback.format_exc()) | |
| else: | |
| st.info("👆 Upload a data file (CSV or Excel) to start exploring.") | |
| # -------------------------------------------------- | |
| # TAB 4: Pipetting Command Generator | |
| # -------------------------------------------------- | |
| with tab4: | |
| from math import ceil | |
| st.header("🧪 Pipetting Command Generator for Eppendorf epMotion liquid handler") | |
| st.markdown(""" | |
| Upload your sample file (Excel, CSV, or TXT) containing binary mutation data. | |
| The app will: | |
| - Auto-detect or create `Sample`, `Position#`, `Total edited`, and `Volume per "1"` columns | |
| - Let you set the **Maximum volume per input well (µL)** used to compute `Volume per "1"` | |
| - Calculate total demand per input and suggest a **uniform layout** (same # consecutive wells per input) | |
| - **Preview** the layout on a plate map (with tooltips) | |
| - After confirmation, generate pipetting commands and a source volume summary | |
| """) | |
| uploaded_writing = st.file_uploader( | |
| "📤 Upload data file", | |
| type=["xlsx", "csv", "txt"], | |
| key="writing_uploader" | |
| ) | |
| max_per_well_ul = st.number_input( | |
| "Maximum volume per source well (µL)", | |
| min_value=10.0, max_value=2000.0, value=160.0, step=10.0 | |
| ) | |
| ROWS_96 = ["A", "B", "C", "D", "E", "F", "G", "H"] | |
| COLS_96 = list(range(1, 13)) | |
| def well_name(row_letter, col_number): | |
| return f"{row_letter}{col_number}" | |
| def enumerate_plate_wells(): | |
| for r in ROWS_96: | |
| for c in COLS_96: | |
| yield f"{r}{c}" | |
| def parse_well_name(well: str): | |
| m = re.match(r"([A-Ha-h])\s*([0-9]+)", str(well).strip()) | |
| if not m: | |
| return ("A", 0) | |
| return (m.group(1).upper(), int(m.group(2))) | |
| def sample_index_to_plate_and_well(sample_idx: int): | |
| plate_num = ((sample_idx - 1) // 96) + 1 | |
| within_plate = (sample_idx - 1) % 96 | |
| row_idx = within_plate // 12 | |
| col_idx = within_plate % 12 | |
| return plate_num, well_name(ROWS_96[row_idx], COLS_96[col_idx]) | |
| def build_global_wells_list(n_plates: int): | |
| out = [] | |
| for p in range(1, n_plates + 1): | |
| for w in enumerate_plate_wells(): | |
| out.append((p, w)) | |
| return out | |
| def pick_tool(volume_ul: float) -> str: | |
| return "TS_10" if volume_ul <= 10.0 else "TS_50" | |
| PALETTE = [ | |
| "#4F46E5", "#22C55E", "#F59E0B", "#EF4444", "#06B6D4", "#A855F7", "#84CC16", "#F97316", | |
| "#0EA5E9", "#E11D48", "#10B981", "#7C3AED", "#15803D", "#EA580C", "#2563EB", "#DC2626" | |
| ] | |
| def render_plate_map_html(plates_used, well_to_input, max_wells_per_source, inputs_count): | |
| legend_spans = [] | |
| for i in range(1, inputs_count + 1): | |
| color = PALETTE[(i-1) % len(PALETTE)] | |
| legend_spans.append( | |
| f"<span style='display:inline-block;margin-right:12px'>" | |
| f"<span style='display:inline-block;width:12px;height:12px;background:{color};border:1px solid #333;margin-right:6px;vertical-align:middle'></span>" | |
| f"Input {i}</span>" | |
| ) | |
| legend_html = "<div style='margin:8px 0 16px 0'>" + "".join(legend_spans) + "</div>" | |
| css = """ | |
| <style> | |
| .plate { margin: 10px 0 24px 0; } | |
| .plate-title { font-weight: 600; margin: 4px 0 8px 0; } | |
| .grid { display: grid; grid-template-columns: 32px repeat(12, 38px); grid-auto-rows: 32px; gap: 4px; } | |
| .cell { width: 38px; height: 32px; border: 1px solid #DDD; display:flex; align-items:center; justify-content:center; font-size:12px; background:#FAFAFA; position:relative; } | |
| .head { font-weight:600; background:#F3F4F6; } | |
| .cell[data-color] { color:#111; } | |
| .cell .tip { visibility:hidden; opacity:0; transition:opacity 0.15s ease; position:absolute; bottom:100%; transform:translateY(-6px); left:50%; transform:translate(-50%, -6px); background:#111; color:#fff; padding:4px 6px; font-size:11px; border-radius:4px; white-space:nowrap; pointer-events:none; } | |
| .cell:hover .tip { visibility:visible; opacity:0.95; } | |
| </style> | |
| """ | |
| body = [css, legend_html] | |
| for p in range(1, plates_used + 1): | |
| body.append(f"<div class='plate'><div class='plate-title'>Plate {p}</div>") | |
| body.append("<div class='grid'>") | |
| body.append("<div class='cell head'></div>") | |
| for c in COLS_96: | |
| body.append(f"<div class='cell head'>{c}</div>") | |
| for r in ROWS_96: | |
| body.append(f"<div class='cell head'>{r}</div>") | |
| for c in COLS_96: | |
| well = f"{r}{c}" | |
| key = (p, well) | |
| if key in well_to_input: | |
| input_idx, within_idx = well_to_input[key] | |
| color = PALETTE[(input_idx-1) % len(PALETTE)] | |
| tip = f"Input {input_idx} • P{p}:{well} • Block well {within_idx}/{max_wells_per_source}" | |
| cell_html = ( | |
| f"<div class='cell' data-color style='background:{color};border-color:#555' title='{tip}'>" | |
| f"<span class='tip'>{tip}</span>" | |
| "</div>" | |
| ) | |
| else: | |
| cell_html = "<div class='cell'></div>" | |
| body.append(cell_html) | |
| body.append("</div></div>") | |
| return "".join(body) | |
| if uploaded_writing is not None: | |
| try: | |
| if uploaded_writing.name.endswith(".xlsx"): | |
| df = pd.read_excel(uploaded_writing) | |
| elif uploaded_writing.name.endswith(".csv"): | |
| df = pd.read_csv(uploaded_writing) | |
| else: | |
| try: | |
| df = pd.read_csv(uploaded_writing, sep="\t") | |
| except Exception: | |
| df = pd.read_csv(uploaded_writing) | |
| st.success(f"✅ Loaded file with {len(df)} rows and {len(df.columns)} columns") | |
| df.columns = [str(c).strip() for c in df.columns] | |
| if not any(c.lower() == "sample" for c in df.columns): | |
| df.insert(0, "Sample", np.arange(1, len(df) + 1)) | |
| st.info("`Sample` column missing — automatically generated 1..N.") | |
| position_cols = [c for c in df.columns if re.match(r"(?i)^position\s*\d+", c)] | |
| if not position_cols: | |
| non_pos_cols = {"sample", "total edited", 'volume per "1"', "volume per 1"} | |
| candidate_cols = [c for c in df.columns if c.lower() not in non_pos_cols] | |
| position_cols = candidate_cols | |
| st.info(f"Position columns inferred automatically: {len(position_cols)} detected.") | |
| def pos_key(col_name: str): | |
| m = re.search(r"(\d+)", col_name) | |
| return int(m.group(1)) if m else 10**9 | |
| position_cols = sorted(position_cols, key=pos_key) | |
| df[position_cols] = df[position_cols].apply(pd.to_numeric, errors="coerce").fillna(0).astype(int) | |
| if "Total edited" not in df.columns: | |
| df["Total edited"] = df[position_cols].sum(axis=1).astype(int) | |
| st.info("`Total edited` column missing — calculated automatically as sum of 1s per row.") | |
| st.markdown("#### ⚙️ Volume Calculation Settings") | |
| default_total_vol = st.number_input( | |
| "Maximum volume per input well (µL)", | |
| min_value=1.0, max_value=10000.0, value=64.0, step=1.0, | |
| help="Used to compute Volume per '1' as (Maximum volume per input well / Total edited) when not provided." | |
| ) | |
| vol_candidates = [c for c in df.columns if "volume per" in c.lower()] | |
| if not vol_candidates: | |
| df['Volume per "1"'] = default_total_vol / df["Total edited"].replace(0, np.nan) | |
| df['Volume per "1"'] = df['Volume per "1"'].fillna(0) | |
| st.info(f'`Volume per "1"` column missing — calculated automatically as {default_total_vol:.0f} µL (max per input well) / Total edited.') | |
| volume_col = 'Volume per "1"' | |
| else: | |
| volume_col = vol_candidates[0] | |
| if df[volume_col].max() > max_per_well_ul: | |
| st.error( | |
| f"❌ At least one row has `Volume per \"1\"` greater than the per-well cap ({max_per_well_ul} µL). " | |
| "Increase the cap or reduce per-transfer volume." | |
| ) | |
| st.stop() | |
| vol_per_one_series = pd.to_numeric(df[volume_col], errors="coerce").fillna(0.0) | |
| total_volume_per_input = [float(vol_per_one_series[df[pos] == 1].sum()) for pos in position_cols] | |
| wells_needed_per_input = [int(ceil(tv / max_per_well_ul)) if tv > 0 else 0 for tv in total_volume_per_input] | |
| num_inputs = len(position_cols) | |
| max_wells_per_source = max(wells_needed_per_input) if wells_needed_per_input else 0 | |
| st.markdown("### 👀 Preview: Suggested Uniform Layout") | |
| if max_wells_per_source == 0: | |
| st.info("No edits detected — nothing to allocate.") | |
| st.stop() | |
| st.write( | |
| f"💡 Suggested layout: **{max_wells_per_source} consecutive wells per input** " | |
| f"(cap {max_per_well_ul:.0f} µL/well)." | |
| ) | |
| total_wells_needed_uniform = num_inputs * max_wells_per_source | |
| plates_needed = int(ceil(total_wells_needed_uniform / 96)) or 1 | |
| global_wells = sorted( | |
| build_global_wells_list(plates_needed), | |
| key=lambda x: ( | |
| x[0], | |
| ROWS_96.index(parse_well_name(x[1])[0]), | |
| parse_well_name(x[1])[1] | |
| ) | |
| ) | |
| global_wells = global_wells[:total_wells_needed_uniform] | |
| assigned_wells_map, well_to_input, preview_rows = {}, {}, [] | |
| for i in range(1, num_inputs + 1): | |
| start, end = (i - 1) * max_wells_per_source, i * max_wells_per_source | |
| block = global_wells[start:end] | |
| assigned_wells_map[i] = block | |
| for j, (p, w) in enumerate(block, start=1): | |
| well_to_input[(p, w)] = (i, j) | |
| block_str = ", ".join([f"P{p}:{w}" for (p, w) in block]) | |
| preview_rows.append({ | |
| "Input (Position #)": i, | |
| "Total demand (µL)": round(total_volume_per_input[i-1], 2), | |
| "Wells needed (actual)": wells_needed_per_input[i-1], | |
| "Allocated (uniform)": max_wells_per_source, | |
| "Assigned wells": block_str | |
| }) | |
| preview_df = pd.DataFrame(preview_rows) | |
| st.dataframe(preview_df, width="stretch", height=300) | |
| st.markdown("#### Plate Map (hover cells for details)") | |
| plate_html = render_plate_map_html(plates_needed, well_to_input, max_wells_per_source, num_inputs) | |
| st.markdown(plate_html, unsafe_allow_html=True) | |
| st.markdown("### ✅ Generate Pipetting Commands") | |
| if st.button("Generate using this layout"): | |
| per_input_well_cum = {i: [0.0] * max_wells_per_source for i in range(1, num_inputs + 1)} | |
| commands, source_volume_totals = [], {} | |
| for _, row in df.iterrows(): | |
| sample_id = int(row["Sample"]) | |
| vol_per_one = float(row[volume_col]) | |
| if vol_per_one <= 0: | |
| continue | |
| dest_plate, dest_well = sample_index_to_plate_and_well(sample_id) | |
| tool = pick_tool(vol_per_one) | |
| for pos_idx, col in enumerate(position_cols, start=1): | |
| if int(row[col]) != 1: | |
| continue | |
| wells_for_input = assigned_wells_map[pos_idx] | |
| cum_list = per_input_well_cum[pos_idx] | |
| chosen = None | |
| for j, ((src_plate, src_well), current_vol) in enumerate(zip(wells_for_input, cum_list)): | |
| if current_vol + vol_per_one <= max_per_well_ul: | |
| chosen = (j, src_plate, src_well) | |
| break | |
| if chosen is None: | |
| st.error( | |
| f"Allocation exhausted for Input {pos_idx} while creating commands. " | |
| "Increase the max volume per well or review per-transfer volume." | |
| ) | |
| st.stop() | |
| j, src_plate, src_well = chosen | |
| cum_list[j] += vol_per_one | |
| per_input_well_cum[pos_idx] = cum_list | |
| source_volume_totals[(src_plate, src_well)] = source_volume_totals.get((src_plate, src_well), 0.0) + vol_per_one | |
| commands.append({ | |
| "Input #": pos_idx, | |
| "Source plate": src_plate, | |
| "Source well": src_well, | |
| "Destination plate": dest_plate, | |
| "Destination well": dest_well, | |
| "Volume": round(vol_per_one, 2), | |
| "Tool": tool | |
| }) | |
| commands_df = pd.DataFrame(commands) | |
| def row_idx_from_well(w): return ROWS_96.index(parse_well_name(w)[0]) | |
| def col_num_from_well(w): return parse_well_name(w)[1] | |
| commands_df["Src_row_idx"] = commands_df["Source well"].apply(row_idx_from_well) | |
| commands_df["Src_col_num"] = commands_df["Source well"].apply(col_num_from_well) | |
| commands_df["Dst_row_idx"] = commands_df["Destination well"].apply(row_idx_from_well) | |
| commands_df["Dst_col_num"] = commands_df["Destination well"].apply(col_num_from_well) | |
| commands_df = commands_df.sort_values( | |
| by=["Input #", "Source plate", "Src_row_idx", "Src_col_num", | |
| "Destination plate", "Dst_row_idx", "Dst_col_num"], | |
| kind="stable" | |
| ) | |
| commands_df = commands_df[[ | |
| "Input #", "Source plate", "Source well", | |
| "Destination plate", "Destination well", "Volume", "Tool" | |
| ]] | |
| st.success(f"✅ Generated {len(commands_df)} commands across {num_inputs} inputs.") | |
| summary_rows = [] | |
| for i in range(1, num_inputs + 1): | |
| for (p, w), used in zip(assigned_wells_map[i], per_input_well_cum[i]): | |
| total = source_volume_totals.get((p, w), 0.0) | |
| summary_rows.append({ | |
| "Source": i, "Source plate": p, "Source well": w, | |
| "Total volume taken (µL)": round(total, 2), | |
| "Allocated capacity (µL)": round(max_per_well_ul, 2) | |
| }) | |
| summary_df = pd.DataFrame(summary_rows) | |
| summary_df["Src_row_idx"] = summary_df["Source well"].apply(row_idx_from_well) | |
| summary_df["Src_col_num"] = summary_df["Source well"].apply(col_num_from_well) | |
| summary_df = summary_df.sort_values( | |
| by=["Source", "Source plate", "Src_row_idx", "Src_col_num"], | |
| kind="stable" | |
| )[ | |
| ["Source", "Source plate", "Source well", "Total volume taken (µL)", "Allocated capacity (µL)"] | |
| ] | |
| st.markdown("### 💧 Pipetting Commands") | |
| st.dataframe(commands_df, width="stretch", height=400) | |
| st.download_button("⬇️ Download Commands CSV", commands_df.to_csv(index=False), "pipetting_commands.csv", mime="text/csv") | |
| st.markdown("### 📊 Source Volume Summary") | |
| st.dataframe(summary_df, width="stretch", height=400) | |
| st.download_button("⬇️ Download Source Summary CSV", summary_df.to_csv(index=False), "source_volume_summary.csv", mime="text/csv") | |
| except Exception as e: | |
| st.error(f"❌ Error processing file: {e}") | |
| else: | |
| st.info("👆 Upload an Excel/CSV/TXT file to start.") | |