BitConverter / src /app.py
wenjun99's picture
Update src/app.py
dec936f verified
import streamlit as st
import pandas as pd
import io
import re
import struct
import numpy as np
import openpyxl
import base64
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from scipy.stats import gaussian_kde
from PIL import Image
# =========================
# Streamlit App Setup
# =========================
st.set_page_config(page_title="Bitconverter", layout="wide")
st.title("Bitconverter")
# =========================
# Encoding Schemes
# =========================
ENCODING_OPTIONS = ["Voyager 6-bit", "Base64 (6-bit)", "ASCII (7-bit)", "UTF-8 (8-bit)"]
BITS_PER_UNIT = {
"Voyager 6-bit": 6,
"Base64 (6-bit)": 6,
"ASCII (7-bit)": 7,
"UTF-8 (8-bit)": 8,
}
# =========================
# Voyager ASCII 6-bit Table
# =========================
voyager_table = {
i: ch for i, ch in enumerate([
' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I',
'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S',
'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2',
'3', '4', '5', '6', '7', '8', '9', '.', ',', '(',
')','+', '-', '*', '/', '=', '$', '!', ':', '%',
'"', '#', '@', "'", '?', '&'
])
}
reverse_voyager_table = {v: k for k, v in voyager_table.items()}
B64_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
# =========================
# 4-bit Grayscale Helpers
# =========================
# 4-bit grayscale, uniform quantization in sRGB/BT.601 luma code space
# (0=black, 15=white). Two pixels per byte, high-nibble first;
# rows top-to-bottom, no row padding.
# =========================
def quantize_to_4bit(gray8: np.ndarray) -> np.ndarray:
"""Quantize 8-bit grayscale (0..255) to 4-bit (0..15) with nearest rounding."""
v4 = np.round(gray8.astype(np.float32) * (15.0 / 255.0)).astype(np.uint8)
np.clip(v4, 0, 15, out=v4)
return v4
def gray4_to_gray8(gray4: np.ndarray) -> np.ndarray:
"""Expand 4-bit values (0..15) to 8-bit grayscale (0..255) for viewing."""
return np.round(gray4.astype(np.float32) * (255.0 / 15.0)).astype(np.uint8)
def pack_4bpp_rows(gray4: np.ndarray) -> bytes:
"""
Pack a 2D array of 4-bit values (0..15) into bytes: two pixels per byte.
High nibble = first pixel, Low nibble = second pixel.
If width is odd, pad the last low nibble with 0.
"""
h, w = gray4.shape
bytes_per_row = (w + 1) // 2
out = bytearray(bytes_per_row * h)
idx = 0
for r in range(h):
row = gray4[r, :]
i = 0
while i < w:
hi = int(row[i] & 0x0F)
lo = int(row[i + 1] & 0x0F) if i + 1 < w else 0
out[idx] = (hi << 4) | lo
idx += 1
i += 2
return bytes(out)
def unpack_4bpp_rows(packed: bytes, w: int, h: int) -> np.ndarray:
"""
Unpack row-major 4bpp data into a 2D array (H, W) with values 0..15.
Two pixels per byte, high nibble first.
"""
bytes_per_row = (w + 1) // 2
if len(packed) != bytes_per_row * h:
raise ValueError("Packed data length mismatch for given dimensions")
gray4 = np.zeros((h, w), dtype=np.uint8)
pos = 0
for r in range(h):
col = 0
for _ in range(bytes_per_row):
b = packed[pos]; pos += 1
hi = (b >> 4) & 0x0F
lo = b & 0x0F
gray4[r, col] = hi; col += 1
if col < w:
gray4[r, col] = lo; col += 1
return gray4
def save_g4_bytes(gray4: np.ndarray) -> bytes:
"""
Build a .g4 file in memory with a simple header and packed 4bpp payload.
Header (LE): magic 'G4' (2B), version (1B=1), width (uint32),
height (uint32), reserved (uint32=0). Payload: ceil(width/2)*height bytes.
"""
h, w = gray4.shape
payload = pack_4bpp_rows(gray4)
buf = io.BytesIO()
buf.write(b"G4")
buf.write(struct.pack("<B", 1))
buf.write(struct.pack("<I", w))
buf.write(struct.pack("<I", h))
buf.write(struct.pack("<I", 0))
buf.write(payload)
return buf.getvalue()
def load_g4_bytes(data: bytes):
"""
Load a .g4 file from bytes, returning (gray4, width, height).
"""
offset = 0
if data[offset:offset+2] != b"G4":
raise ValueError("Not a G4 file")
offset += 2
version = data[offset]; offset += 1
if version != 1:
raise ValueError(f"Unsupported G4 version: {version}")
w = struct.unpack_from("<I", data, offset)[0]; offset += 4
h = struct.unpack_from("<I", data, offset)[0]; offset += 4
_reserved = struct.unpack_from("<I", data, offset)[0]; offset += 4
bytes_per_row = (w + 1) // 2
expected = bytes_per_row * h
payload = data[offset:offset+expected]
if len(payload) != expected:
raise ValueError("Payload length mismatch")
gray4 = unpack_4bpp_rows(payload, w=w, h=h)
return gray4, w, h
def gray4_to_binary_flat(gray4: np.ndarray) -> list[int]:
"""Convert 4-bit value matrix to flat binary list (4 bits per pixel, MSB first)."""
bits = []
for val in gray4.flatten():
v = int(val) & 0x0F
bits.extend([(v >> b) & 1 for b in range(3, -1, -1)])
return bits
def binary_flat_to_gray4(bits: list[int], width: int) -> np.ndarray:
"""Convert flat binary list (4 bits per pixel) back to 4-bit value matrix."""
n_pixels = len(bits) // 4
values = []
for i in range(0, n_pixels * 4, 4):
chunk = bits[i:i+4]
val = sum(b << (3 - j) for j, b in enumerate(chunk))
values.append(val)
height = max(1, int(np.ceil(n_pixels / width)))
padded = np.zeros(width * height, dtype=np.uint8)
padded[:len(values)] = values
return padded.reshape((height, width))
# =========================
# Encoding Functions
# =========================
def encode_to_binary(text: str, scheme: str) -> tuple[list[int], list[str], list[str]]:
"""
Returns (flat_bits, display_units, source_chars).
- display_units: the encoded representation (Base64 symbol, hex byte, ASCII code, Voyager char)
- source_chars: the original text character each chunk maps to
"""
if scheme == "Voyager 6-bit":
bits = []
for char in text:
val = reverse_voyager_table.get(char.upper(), 0)
bits.extend([(val >> b) & 1 for b in range(5, -1, -1)])
labels = list(text.upper())
return bits, labels, list(text)
elif scheme == "ASCII (7-bit)":
bits = []
for c in text:
val = ord(c) & 0x7F
bits.extend([(val >> b) & 1 for b in range(6, -1, -1)])
labels = [f"0x{ord(c) & 0x7F:02X}" for c in text]
return bits, labels, list(text)
elif scheme == "UTF-8 (8-bit)":
raw = text.encode("utf-8")
bits = []
for byte in raw:
bits.extend([(byte >> b) & 1 for b in range(7, -1, -1)])
labels = [f"0x{b:02X}" for b in raw]
source = []
for ch in text:
n_bytes = len(ch.encode("utf-8"))
source.extend([ch] * n_bytes)
return bits, labels, source
elif scheme == "Base64 (6-bit)":
raw_bytes = text.encode("utf-8")
b64_str = base64.b64encode(raw_bytes).decode("ascii")
bits = []
clean = b64_str.rstrip("=")
for c in clean:
val = B64_ALPHABET.index(c)
bits.extend([(val >> b) & 1 for b in range(5, -1, -1)])
labels = list(clean)
byte_to_char = []
for ch in text:
n_bytes = len(ch.encode("utf-8"))
byte_to_char.extend([ch] * n_bytes)
source = []
for j in range(len(clean)):
byte_idx = (j * 6) // 8
if byte_idx < len(byte_to_char):
source.append(byte_to_char[byte_idx])
else:
source.append("?")
return bits, labels, source
return [], [], []
# =========================
# Decoding Functions
# =========================
def decode_from_binary(bits: list[int], scheme: str) -> str:
if scheme == "Voyager 6-bit":
chars = []
for i in range(0, len(bits), 6):
chunk = bits[i:i + 6]
if len(chunk) < 6:
chunk += [0] * (6 - len(chunk))
val = sum(b << (5 - j) for j, b in enumerate(chunk))
chars.append(voyager_table.get(val, '?'))
return ''.join(chars)
elif scheme == "ASCII (7-bit)":
chars = []
for i in range(0, len(bits), 7):
chunk = bits[i:i + 7]
if len(chunk) < 7:
chunk += [0] * (7 - len(chunk))
val = sum(b << (6 - j) for j, b in enumerate(chunk))
chars.append(chr(val) if 32 <= val < 127 else '?')
return ''.join(chars)
elif scheme == "UTF-8 (8-bit)":
byte_list = []
for i in range(0, len(bits), 8):
chunk = bits[i:i + 8]
if len(chunk) < 8:
chunk += [0] * (8 - len(chunk))
val = sum(b << (7 - j) for j, b in enumerate(chunk))
byte_list.append(val)
return bytes(byte_list).decode("utf-8", errors="replace")
elif scheme == "Base64 (6-bit)":
chars = []
for i in range(0, len(bits), 6):
chunk = bits[i:i + 6]
if len(chunk) < 6:
chunk += [0] * (6 - len(chunk))
val = sum(b << (5 - j) for j, b in enumerate(chunk))
chars.append(B64_ALPHABET[val])
b64_str = ''.join(chars)
while len(b64_str) % 4 != 0:
b64_str += '='
try:
return base64.b64decode(b64_str).decode("utf-8", errors="replace")
except Exception:
return "[Base64 decode error]"
return ""
# =========================
# Tabs
# =========================
tab1, tab2, tab3, tab4 = st.tabs(["Encoding", "Decoding", "Data Analytics", "Writing"])
# --------------------------------------------------
# TAB 1: Text/Image → Binary
# --------------------------------------------------
with tab1:
st.markdown("""
Convert text or an image into binary labels.
Choose an input mode, encoding scheme, and control grouping.
""")
input_mode = st.selectbox("Input mode:", ["Text", "Image"], key="input_mode")
if input_mode == "Text":
st.subheader("Step 1 – Choose Encoding & Input Text")
encoding_scheme = st.selectbox(
"Encoding scheme:",
ENCODING_OPTIONS,
index=0,
key="enc_scheme",
help=(
"**Voyager 6-bit** – Custom 56-character table (A-Z, 0-9, punctuation). 6 bits/char.\n\n"
"**Base64 (6-bit)** – Standard Base64 encoding of UTF-8 bytes. 6 bits/symbol.\n\n"
"**ASCII (7-bit)** – Standard 7-bit ASCII. 7 bits/char.\n\n"
"**UTF-8 (8-bit)** – Full UTF-8 byte encoding. 8 bits/byte. Supports all Unicode."
)
)
bits_per = BITS_PER_UNIT[encoding_scheme]
if encoding_scheme == "Voyager 6-bit":
supported = ''.join(voyager_table[i] for i in range(len(voyager_table)))
st.caption(f"Supported characters ({len(voyager_table)}): `{supported}`")
user_input = st.text_input("Enter your text:", value="DNA", key="input_text")
col1, col2 = st.columns([2, 1])
with col1:
group_size = st.slider("Select number of target positions:", min_value=12, max_value=128, value=25)
with col2:
custom_cols = st.number_input("Or enter custom number:", min_value=1, max_value=512, value=group_size)
if custom_cols != group_size:
group_size = custom_cols
if user_input:
binary_labels, display_units, source_chars = encode_to_binary(user_input, encoding_scheme)
binary_concat = ''.join(map(str, binary_labels))
st.markdown("### Output 1 – Binary Labels per Character")
st.caption(f"Encoding: **{encoding_scheme}** — {bits_per} bits per unit")
grouped_bits = [binary_labels[i:i + bits_per] for i in range(0, len(binary_labels), bits_per)]
scroll_html = (
"<div style='max-height:300px; overflow-y:auto; font-family:monospace; "
"padding:6px; border:1px solid #ccc;'>"
)
for i, bits in enumerate(grouped_bits):
src = source_chars[i] if i < len(source_chars) else "?"
enc = display_units[i] if i < len(display_units) else "?"
if encoding_scheme == "Voyager 6-bit":
scroll_html += f"<div>'{src}' → {bits}</div>"
else:
scroll_html += f"<div>'{src}' → '{enc}' → {bits}</div>"
scroll_html += "</div>"
st.markdown(scroll_html, unsafe_allow_html=True)
per_char_lines = []
for i, bits in enumerate(grouped_bits):
src = source_chars[i] if i < len(source_chars) else "?"
enc = display_units[i] if i < len(display_units) else "?"
bit_str = ''.join(map(str, bits))
if encoding_scheme == "Voyager 6-bit":
per_char_lines.append(f"'{src}' → {bit_str}")
else:
per_char_lines.append(f"'{src}' → '{enc}' → {bit_str}")
st.download_button(
"⬇️ Download Binary per Character (.txt)",
data='\n'.join(per_char_lines),
file_name="binary_per_unit.txt",
mime="text/plain",
key="download_per_unit"
)
st.download_button(
"⬇️ Download Concatenated Binary String",
data=binary_concat,
file_name="binary_full.txt",
mime="text/plain",
key="download_binary_txt"
)
st.markdown("### Output 2 – Binary matrix split into reactions grouped by target position")
groups = []
for i in range(0, len(binary_labels), group_size):
group = binary_labels[i:i + group_size]
if len(group) < group_size:
group += [0] * (group_size - len(group))
groups.append(group)
columns = [f"Position {i+1}" for i in range(group_size)]
df = pd.DataFrame(groups, columns=columns)
df.insert(0, "Sample", range(1, len(df) + 1))
st.dataframe(df, width="stretch")
st.download_button(
"⬇️ Download as CSV",
df.to_csv(index=False),
file_name=f"binary_labels_{group_size}_positions.csv",
mime="text/csv",
key="download_binary_csv"
)
else:
st.info("👆 Enter text above to see binary labels.")
# =====================================================
# IMAGE INPUT MODE
# =====================================================
else:
st.subheader("Step 1 – Upload Image & Set Resolution")
image_type = st.selectbox(
"Image type:",
["Black & White (1-bit)", "Grayscale (4-bit)"],
key="enc_image_type",
help=(
"**Black & White (1-bit)** — Each pixel = 1 bit (0 or 1). Uses a brightness threshold.\n\n"
"**Grayscale (4-bit)** — Each pixel = 4 bits (0–15 levels). "
"Uniform quantization in sRGB/BT.601 luma space. 0 = black, 15 = white. "
"Two pixels per byte, high-nibble first; rows top-to-bottom, no row padding."
)
)
uploaded_img = st.file_uploader(
"Upload an image (PNG, JPG, BMP, etc.):",
type=["png", "jpg", "jpeg", "bmp", "gif", "tiff", "webp"],
key="img_uploader"
)
if uploaded_img is not None:
img = Image.open(uploaded_img).convert("L") # grayscale
orig_w, orig_h = img.size
aspect = orig_h / orig_w
st.image(img, caption=f"Original (grayscale) — {orig_w}×{orig_h} px", use_container_width=True)
st.markdown("#### ⚙️ Resolution")
target_width = st.slider(
"Output width (pixels):",
min_value=8, max_value=min(orig_w, 256), value=min(64, orig_w), step=1,
help="Height is auto-calculated from aspect ratio."
)
target_height = max(1, int(round(target_width * aspect)))
img_resized = img.resize((target_width, target_height), Image.LANCZOS)
img_array = np.array(img_resized)
# ===========================================================
# BLACK & WHITE (1-bit)
# ===========================================================
if image_type == "Black & White (1-bit)":
total_bits = target_width * target_height
st.caption(f"Output size: **{target_width} × {target_height}** = **{total_bits:,}** bits (1 bit/pixel)")
threshold = st.slider(
"Black/white threshold:",
min_value=0, max_value=255, value=128,
help="Pixels darker than this → 1 (black). Brighter → 0 (white)."
)
binary_matrix = (img_array < threshold).astype(int)
st.markdown("### Preview — Black & White Output")
col_prev1, col_prev2 = st.columns(2)
with col_prev1:
st.image(img_resized, caption=f"Resized grayscale ({target_width}×{target_height})", use_container_width=True)
with col_prev2:
bw_display = Image.fromarray(((1 - binary_matrix) * 255).astype(np.uint8))
st.image(bw_display, caption=f"Binary B&W ({target_width}×{target_height})", use_container_width=True)
binary_labels = binary_matrix.flatten().tolist()
binary_concat = ''.join(map(str, binary_labels))
n_ones = sum(binary_labels)
st.markdown("### Output 1 – Image Info")
st.markdown(
f"- **Dimensions:** {target_width} × {target_height} \n"
f"- **Bits per pixel:** 1 \n"
f"- **Total bits:** {total_bits:,} \n"
f"- **Black pixels (1):** {n_ones:,} \n"
f"- **White pixels (0):** {total_bits - n_ones:,}"
)
st.download_button(
"⬇️ Download Concatenated Binary String",
data=binary_concat,
file_name="image_binary_full.txt",
mime="text/plain",
key="download_img_binary_txt"
)
st.markdown("### Output 2 – Binary Matrix by dimension (Samples × Positions)")
columns = [f"Position {i+1}" for i in range(target_width)]
df_img = pd.DataFrame(binary_matrix, columns=columns)
df_img.insert(0, "Sample", range(1, len(df_img) + 1))
st.dataframe(df_img, width="stretch")
st.download_button(
"⬇️ Download as CSV",
df_img.to_csv(index=False),
file_name=f"image_binary_{target_width}x{target_height}.csv",
mime="text/csv",
key="download_img_csv"
)
st.markdown("### Output 3 – Custom Grouped Matrix by Number of Target Positions")
col1, col2 = st.columns([2, 1])
with col1:
img_group_size = st.slider(
"Select number of target positions:",
min_value=12, max_value=128, value=target_width, key="img_group_slider"
)
with col2:
img_custom_cols = st.number_input(
"Or enter custom number:",
min_value=1, max_value=512, value=img_group_size, key="img_custom_cols"
)
if img_custom_cols != img_group_size:
img_group_size = img_custom_cols
groups = []
for i in range(0, len(binary_labels), img_group_size):
group = binary_labels[i:i + img_group_size]
if len(group) < img_group_size:
group += [0] * (img_group_size - len(group))
groups.append(group)
columns_g = [f"Position {i+1}" for i in range(img_group_size)]
df_grouped = pd.DataFrame(groups, columns=columns_g)
df_grouped.insert(0, "Sample", range(1, len(df_grouped) + 1))
st.dataframe(df_grouped, width="stretch")
st.download_button(
"⬇️ Download Grouped CSV",
df_grouped.to_csv(index=False),
file_name=f"image_binary_grouped_{img_group_size}_positions.csv",
mime="text/csv",
key="download_img_grouped_csv"
)
# ===========================================================
# GRAYSCALE (4-bit)
# ===========================================================
else:
n_pixels = target_width * target_height
total_bits = n_pixels * 4
st.caption(
f"Output size: **{target_width} × {target_height}** = **{n_pixels:,}** pixels × 4 bits = "
f"**{total_bits:,}** bits"
)
gray4_matrix = quantize_to_4bit(img_array)
gray8_preview = gray4_to_gray8(gray4_matrix)
st.markdown("### Preview — 4-bit Grayscale (16 levels)")
col_prev1, col_prev2 = st.columns(2)
with col_prev1:
st.image(img_resized, caption=f"Original resized ({target_width}×{target_height}, 256 levels)", use_container_width=True)
with col_prev2:
st.image(
Image.fromarray(gray8_preview),
caption=f"4-bit quantized ({target_width}×{target_height}, 16 levels)",
use_container_width=True
)
# Binary flat
binary_labels = gray4_to_binary_flat(gray4_matrix)
binary_concat = ''.join(map(str, binary_labels))
st.markdown("### Output 1 – Image Info")
unique_vals, counts = np.unique(gray4_matrix, return_counts=True)
st.markdown(
f"- **Dimensions:** {target_width} × {target_height} \n"
f"- **Bits per pixel:** 4 (values 0–15) \n"
f"- **Total pixels:** {n_pixels:,} \n"
f"- **Total bits:** {total_bits:,} \n"
f"- **Unique levels used:** {len(unique_vals)} of 16"
)
# Downloads: binary string, packed .g4 file
col_dl1, col_dl2 = st.columns(2)
with col_dl1:
st.download_button(
"⬇️ Download Binary String (.txt, 4 bits/pixel)",
data=binary_concat,
file_name="image_gray4_binary_full.txt",
mime="text/plain",
key="download_g4_binary_txt"
)
with col_dl2:
g4_bytes = save_g4_bytes(gray4_matrix)
st.download_button(
"⬇️ Download Packed .g4 File",
data=g4_bytes,
file_name=f"image_{target_width}x{target_height}.g4",
mime="application/octet-stream",
key="download_g4_file"
)
# Value matrix (0-15 per pixel)
st.markdown("### Output 2 – Value Matrix (0–15 per pixel)")
st.caption("Each cell = one pixel's 4-bit grayscale level. 0 = black, 15 = white.")
columns_v = [f"Position {i+1}" for i in range(target_width)]
df_val = pd.DataFrame(gray4_matrix.astype(int), columns=columns_v)
df_val.insert(0, "Sample", range(1, len(df_val) + 1))
st.dataframe(df_val, width="stretch")
st.download_button(
"⬇️ Download Value Matrix CSV (0–15)",
df_val.to_csv(index=False),
file_name=f"image_gray4_values_{target_width}x{target_height}.csv",
mime="text/csv",
key="download_g4_values_csv"
)
# Binary matrix (4 bits per pixel → width*4 binary columns per row)
st.markdown("### Output 3 – Binary Matrix (4 bits per pixel)")
st.caption("Each pixel expanded to 4 binary columns. Row width = image width × 4.")
bin_width = target_width * 4
bin_matrix = np.array(binary_labels).reshape((target_height, bin_width))
columns_b = [f"Position {i+1}" for i in range(bin_width)]
df_bin = pd.DataFrame(bin_matrix, columns=columns_b)
df_bin.insert(0, "Sample", range(1, len(df_bin) + 1))
st.dataframe(df_bin, width="stretch")
st.download_button(
"⬇️ Download Binary Matrix CSV",
df_bin.to_csv(index=False),
file_name=f"image_gray4_binary_{target_width}x{target_height}.csv",
mime="text/csv",
key="download_g4_binary_csv"
)
# Custom grouped
st.markdown("### Output 4 – Custom Grouped Matrix by Number of Target Positions")
col1, col2 = st.columns([2, 1])
with col1:
g4_group_size = st.slider(
"Select number of target positions:",
min_value=12, max_value=256, value=bin_width, key="g4_group_slider"
)
with col2:
g4_custom_cols = st.number_input(
"Or enter custom number:",
min_value=1, max_value=1024, value=g4_group_size, key="g4_custom_cols"
)
if g4_custom_cols != g4_group_size:
g4_group_size = g4_custom_cols
groups = []
for i in range(0, len(binary_labels), g4_group_size):
group = binary_labels[i:i + g4_group_size]
if len(group) < g4_group_size:
group += [0] * (g4_group_size - len(group))
groups.append(group)
columns_cg = [f"Position {i+1}" for i in range(g4_group_size)]
df_cg = pd.DataFrame(groups, columns=columns_cg)
df_cg.insert(0, "Sample", range(1, len(df_cg) + 1))
st.dataframe(df_cg, width="stretch")
st.download_button(
"⬇️ Download Grouped CSV",
df_cg.to_csv(index=False),
file_name=f"image_gray4_grouped_{g4_group_size}_positions.csv",
mime="text/csv",
key="download_g4_grouped_csv"
)
else:
st.info("👆 Upload an image to encode it as binary.")
# --------------------------------------------------
# TAB 2: Decoding (Text & Image)
# --------------------------------------------------
with tab2:
st.markdown("""
Decode binary data back into **text** or render it as an **image**.
""")
decode_mode = st.selectbox("Output mode:", ["Text", "Image"], key="decode_mode")
if decode_mode == "Text":
st.markdown("""
Upload either:
- `.csv` file with 0/1 values (any number of columns/rows)
- `.xlsx` Excel file
- `.txt` file containing a concatenated binary string (e.g. `010101...`)
""")
decode_scheme = st.selectbox(
"Decoding scheme (must match the encoding used):",
ENCODING_OPTIONS,
index=0,
key="dec_scheme",
help="Select the same encoding scheme that was used to produce the binary data."
)
uploaded_decode = st.file_uploader(
"Upload your file (.csv, .xlsx, or .txt):",
type=["csv", "xlsx", "txt"],
key="decode_uploader"
)
if uploaded_decode is not None:
try:
if uploaded_decode.name.endswith(".csv"):
df = pd.read_csv(uploaded_decode)
bits = df.values.flatten().astype(int).tolist()
elif uploaded_decode.name.endswith(".xlsx"):
df = pd.read_excel(uploaded_decode)
bits = df.values.flatten().astype(int).tolist()
elif uploaded_decode.name.endswith(".txt"):
content = uploaded_decode.read().decode().strip()
bits = [int(b) for b in content if b in ['0', '1']]
else:
bits = []
if not bits:
st.warning("No binary data detected.")
else:
recovered_text = decode_from_binary(bits, decode_scheme)
st.success(f"✅ Conversion complete using **{decode_scheme}**!")
st.markdown("**Recovered text:**")
st.text_area("Output", recovered_text, height=150)
st.download_button(
"⬇️ Download Recovered Text (.txt)",
data=recovered_text,
file_name="recovered_text.txt",
mime="text/plain",
key="download_recovered"
)
except Exception as e:
st.error(f"Error reading or converting file: {e}")
else:
st.info("👆 Upload a file to start the reverse conversion.")
# =====================================================
# IMAGE DECODE MODE
# =====================================================
else:
dec_image_type = st.selectbox(
"Image type:",
["Black & White (1-bit)", "Grayscale (4-bit)"],
key="dec_image_type",
help=(
"**Black & White** — Input is 0/1 binary data. Each value = 1 pixel.\n\n"
"**Grayscale (4-bit)** — Input is a **value matrix (0–15)**, **binary data** "
"(every 4 bits = one pixel), or a packed **.g4 file**."
)
)
# ===========================================================
# DECODE: B&W (1-bit)
# ===========================================================
if dec_image_type == "Black & White (1-bit)":
st.markdown("""
Render binary data (0/1) as a **black & white image**.
Upload a binary matrix CSV (rows × positions) or a concatenated binary `.txt` string.
""")
img_preview_file = st.file_uploader(
"📤 Upload binary data file (.csv, .xlsx, or .txt):",
type=["csv", "xlsx", "txt"],
key="img_preview_uploader"
)
if img_preview_file is not None:
try:
if img_preview_file.name.endswith(".csv"):
idf = pd.read_csv(img_preview_file)
if "Sample" in idf.columns or "sample" in idf.columns:
idf = idf.drop(columns=[c for c in idf.columns if c.lower() == "sample"])
bits_matrix = idf.values.flatten().astype(int)
detected_width = len(idf.columns)
elif img_preview_file.name.endswith(".xlsx"):
idf = pd.read_excel(img_preview_file)
if "Sample" in idf.columns or "sample" in idf.columns:
idf = idf.drop(columns=[c for c in idf.columns if c.lower() == "sample"])
bits_matrix = idf.values.flatten().astype(int)
detected_width = len(idf.columns)
elif img_preview_file.name.endswith(".txt"):
content = img_preview_file.read().decode().strip()
bits_matrix = np.array([int(b) for b in content if b in ['0', '1']])
detected_width = None
else:
bits_matrix = np.array([])
detected_width = None
if len(bits_matrix) == 0:
st.warning("No binary data detected.")
else:
total_bits = len(bits_matrix)
st.success(f"✅ Loaded **{total_bits:,}** bits.")
st.markdown("#### ⚙️ Image Dimensions")
if detected_width and detected_width > 1:
default_w = detected_width
st.caption(f"Auto-detected width from columns: **{detected_width}**")
else:
default_w = max(1, int(np.sqrt(total_bits)))
img_width = st.number_input(
"Image width (pixels / positions per row):",
min_value=1, max_value=total_bits, value=default_w, step=1,
key="img_preview_width"
)
img_height = int(np.ceil(total_bits / img_width))
st.caption(f"Image size: **{img_width} × {img_height}** = **{img_width * img_height:,}** pixels "
f"({total_bits:,} bits, {img_width * img_height - total_bits} padded)")
padded = np.zeros(img_width * img_height, dtype=int)
padded[:total_bits] = bits_matrix[:total_bits]
img_data = padded.reshape((img_height, img_width))
img_render = ((1 - img_data) * 255).astype(np.uint8)
pil_img = Image.fromarray(img_render, mode="L")
st.markdown("### 🖼️ Rendered Image")
display_scale = max(1, 256 // img_width)
display_w = img_width * display_scale
display_h = img_height * display_scale
pil_display = pil_img.resize((display_w, display_h), Image.NEAREST)
st.image(pil_display, caption=f"Binary image — {img_width}×{img_height} (1=black, 0=white)")
ones = int(bits_matrix.sum())
st.markdown(
f"- **Black pixels (1):** {ones:,} ({100*ones/total_bits:.1f}%) \n"
f"- **White pixels (0):** {total_bits - ones:,} ({100*(total_bits-ones)/total_bits:.1f}%)"
)
buf = io.BytesIO()
pil_img.save(buf, format="PNG")
st.download_button(
"⬇️ Download as PNG",
data=buf.getvalue(),
file_name=f"binary_image_{img_width}x{img_height}.png",
mime="image/png",
key="download_preview_png"
)
buf_hr = io.BytesIO()
pil_display.save(buf_hr, format="PNG")
st.download_button(
"⬇️ Download Scaled PNG (for viewing)",
data=buf_hr.getvalue(),
file_name=f"binary_image_{display_w}x{display_h}_scaled.png",
mime="image/png",
key="download_preview_png_scaled"
)
except Exception as e:
st.error(f"❌ Error processing file: {e}")
import traceback
st.code(traceback.format_exc())
else:
st.info("👆 Upload a binary data file (CSV or TXT) to render as an image.")
# ===========================================================
# DECODE: GRAYSCALE (4-bit)
# ===========================================================
else:
g4_input_format = st.selectbox(
"Input data format:",
["Value matrix (0–15)", "Binary (4 bits per pixel)", "Packed .g4 file"],
key="g4_input_format",
help=(
"**Value matrix** — CSV/XLSX where each cell is a pixel value 0–15. "
"Rows = pixel rows, columns = pixel columns.\n\n"
"**Binary** — 0/1 data where every 4 consecutive bits encode one pixel (0–15).\n\n"
"**Packed .g4 file** — Binary file with G4 header + packed 4bpp payload "
"(two pixels per byte, high-nibble first)."
)
)
st.markdown("Render 4-bit grayscale data as an image (16 levels, 0=black, 15=white).")
# Accept .g4 files in addition to csv/xlsx/txt
accept_types = ["csv", "xlsx", "txt"]
if g4_input_format == "Packed .g4 file":
accept_types = ["g4"]
g4_file = st.file_uploader(
f"📤 Upload data file ({', '.join('.' + t for t in accept_types)}):",
type=accept_types,
key="g4_decode_uploader"
)
if g4_file is not None:
try:
gray4_matrix = None
img_width = None
img_height = None
# ---- Packed .g4 file ----
if g4_input_format == "Packed .g4 file":
raw_data = g4_file.read()
gray4_matrix, img_width, img_height = load_g4_bytes(raw_data)
# ---- Value matrix (0-15) ----
elif g4_input_format == "Value matrix (0–15)":
if g4_file.name.endswith(".csv"):
gdf = pd.read_csv(g4_file)
elif g4_file.name.endswith(".xlsx"):
gdf = pd.read_excel(g4_file)
else:
content = g4_file.read().decode().strip()
rows = [list(map(int, line.split())) for line in content.splitlines() if line.strip()]
gdf = pd.DataFrame(rows)
if "Sample" in gdf.columns or "sample" in gdf.columns:
gdf = gdf.drop(columns=[c for c in gdf.columns if c.lower() == "sample"])
gray4_matrix = gdf.values.astype(int)
gray4_matrix = np.clip(gray4_matrix, 0, 15).astype(np.uint8)
img_height, img_width = gray4_matrix.shape
# ---- Binary (4 bits per pixel) ----
else:
if g4_file.name.endswith(".csv"):
bdf = pd.read_csv(g4_file)
if "Sample" in bdf.columns or "sample" in bdf.columns:
bdf = bdf.drop(columns=[c for c in bdf.columns if c.lower() == "sample"])
flat_bits = bdf.values.flatten().astype(int).tolist()
detected_cols = len(bdf.columns)
img_width = detected_cols // 4 if detected_cols >= 4 else max(1, int(np.sqrt(len(flat_bits) // 4)))
elif g4_file.name.endswith(".xlsx"):
bdf = pd.read_excel(g4_file)
if "Sample" in bdf.columns or "sample" in bdf.columns:
bdf = bdf.drop(columns=[c for c in bdf.columns if c.lower() == "sample"])
flat_bits = bdf.values.flatten().astype(int).tolist()
detected_cols = len(bdf.columns)
img_width = detected_cols // 4 if detected_cols >= 4 else max(1, int(np.sqrt(len(flat_bits) // 4)))
elif g4_file.name.endswith(".txt"):
content = g4_file.read().decode().strip()
flat_bits = [int(b) for b in content if b in ['0', '1']]
img_width = max(1, int(np.sqrt(len(flat_bits) // 4)))
else:
flat_bits = []
img_width = 1
gray4_matrix = binary_flat_to_gray4(flat_bits, img_width)
img_height = gray4_matrix.shape[0]
n_pixels = img_width * img_height
st.success(f"✅ Loaded **{n_pixels:,}** pixels ({img_width} × {img_height}).")
# Width override
st.markdown("#### ⚙️ Image Dimensions")
img_width_adj = st.number_input(
"Image width (pixels per row):",
min_value=1, max_value=n_pixels, value=img_width, step=1,
key="g4_preview_width"
)
if img_width_adj != img_width:
flat_vals = gray4_matrix.flatten()
new_h = max(1, int(np.ceil(len(flat_vals) / img_width_adj)))
padded = np.zeros(img_width_adj * new_h, dtype=np.uint8)
padded[:len(flat_vals)] = flat_vals
gray4_matrix = padded.reshape((new_h, img_width_adj))
img_width = img_width_adj
img_height = new_h
st.caption(f"Image size: **{img_width} × {img_height}**")
# Render
gray8_render = gray4_to_gray8(gray4_matrix)
pil_img = Image.fromarray(gray8_render, mode="L")
st.markdown("### 🖼️ Rendered Image (4-bit Grayscale)")
display_scale = max(1, 256 // img_width)
display_w = img_width * display_scale
display_h = img_height * display_scale
pil_display = pil_img.resize((display_w, display_h), Image.NEAREST)
st.image(pil_display, caption=f"4-bit grayscale — {img_width}×{img_height} (0=black, 15=white)")
# Stats
unique_vals, counts = np.unique(gray4_matrix, return_counts=True)
st.markdown(
f"- **Dimensions:** {img_width} × {img_height} \n"
f"- **Unique levels:** {len(unique_vals)} of 16 \n"
f"- **Min / Max value:** {gray4_matrix.min()} / {gray4_matrix.max()}"
)
# Downloads
buf = io.BytesIO()
pil_img.save(buf, format="PNG")
st.download_button(
"⬇️ Download as PNG",
data=buf.getvalue(),
file_name=f"gray4_image_{img_width}x{img_height}.png",
mime="image/png",
key="download_g4_png"
)
buf_hr = io.BytesIO()
pil_display.save(buf_hr, format="PNG")
st.download_button(
"⬇️ Download Scaled PNG (for viewing)",
data=buf_hr.getvalue(),
file_name=f"gray4_image_{display_w}x{display_h}_scaled.png",
mime="image/png",
key="download_g4_png_scaled"
)
except Exception as e:
st.error(f"❌ Error processing file: {e}")
import traceback
st.code(traceback.format_exc())
else:
st.info("👆 Upload a 4-bit grayscale data file to render as an image.")
# --------------------------------------------------
# TAB 3: Data Analytics
# --------------------------------------------------
with tab3:
st.header("📊 Data Analytics")
st.markdown("""
Upload your sample data file (Excel or CSV) for a quick exploratory assessment of the editing rates distribution.
The file should contain samples as rows and position columns with editing values.
This tab provides visualizations **before** any binary labelling.
""")
analytics_uploaded = st.file_uploader(
"📤 Upload data file",
type=["xlsx", "csv"],
key="analytics_uploader"
)
if analytics_uploaded is not None:
try:
if analytics_uploaded.name.endswith(".xlsx"):
adf = pd.read_excel(analytics_uploaded)
else:
adf = pd.read_csv(analytics_uploaded)
st.success(f"✅ Loaded file with {len(adf)} rows and {len(adf.columns)} columns")
adf.columns = [str(c).strip() for c in adf.columns]
non_pos_keywords = {"sample", "description", "descritpion", "total edited",
'volume per "1"', "volume per 1", "id", "name"}
position_cols = [c for c in adf.columns
if c.lower() not in non_pos_keywords
and pd.to_numeric(adf[c], errors="coerce").notna().any()]
def pos_sort_key(col_name: str):
m = re.search(r"(\d+)", col_name)
return int(m.group(1)) if m else 10**9
position_cols = sorted(position_cols, key=pos_sort_key)
if not position_cols:
st.error("No numeric position columns detected.")
st.stop()
st.info(f"Detected **{len(position_cols)}** position columns and **{len(adf)}** samples.")
pos_data = adf[position_cols].apply(pd.to_numeric, errors="coerce").fillna(0.0)
if "Total edited" in adf.columns:
total_edited = pd.to_numeric(adf["Total edited"], errors="coerce").fillna(0.0)
else:
total_edited = pos_data.sum(axis=1)
st.markdown("### 1️⃣ Raw Data Distribution")
st.caption("Visualize editing values across all positions and samples — before any binary labelling.")
transform_option = st.selectbox(
"Value transformation:",
["Raw (linear)", "log1p", "log1p → log1p", "log1p → pos. norm."],
index=0,
key="transform_select",
help=(
"**Raw** — No transformation.\n\n"
"**log1p** — `log(1 + x)`. Compresses high values, spreads low range.\n\n"
"**log1p → log1p** — Double log1p. Even stronger compression.\n\n"
"**log1p → pos. norm.** — log1p then robust per-position normalization "
"(median / IQR scaling per position column)."
)
)
def robust_pos_normalize_log1p(data: pd.DataFrame) -> pd.DataFrame:
logged = np.log1p(data)
result = logged.copy()
for col in result.columns:
med = result[col].median()
q75, q25 = result[col].quantile(0.75), result[col].quantile(0.25)
iqr = q75 - q25
if iqr > 0:
result[col] = (result[col] - med) / iqr
else:
result[col] = result[col] - med
return result
if transform_option == "log1p":
transformed = np.log1p(pos_data)
value_label = "Editing Value (log1p)"
transform_tag = "log1p"
elif transform_option == "log1p → log1p":
transformed = np.log1p(np.log1p(pos_data))
value_label = "Editing Value (log1p → log1p)"
transform_tag = "log1p_log1p"
elif transform_option == "log1p → pos. norm.":
transformed = robust_pos_normalize_log1p(pos_data)
value_label = "Editing Value (log1p → pos. norm.)"
transform_tag = "log1p_posnorm"
else:
transformed = pos_data
value_label = "Editing Value"
transform_tag = "raw"
melted = transformed.melt(var_name="Position", value_name="Value")
melted["Position_idx"] = melted["Position"].apply(
lambda x: int(re.search(r"(\d+)", str(x)).group(1)) if re.search(r"(\d+)", str(x)) else 0
)
st.markdown("#### 📊 Histogram — All Values")
n_bins = st.number_input("Number of bins:", min_value=10, max_value=300, value=80, step=10, key="hist_bins")
fig2, ax2 = plt.subplots(figsize=(10, 4))
ax2.hist(melted["Value"].values, bins=n_bins, color="#4F46E5", edgecolor="white", linewidth=0.3)
ax2.set_xlabel(value_label)
ax2.set_ylabel("Count")
ax2.set_title(f"Raw Values Distribution ({transform_tag})")
val_min = melted["Value"].min()
val_max = melted["Value"].max()
val_range = val_max - val_min
if val_range <= 2:
tick_step = 0.1
elif val_range <= 6:
tick_step = 0.2
elif val_range <= 20:
tick_step = 1
else:
tick_step = 5
ax2.set_xticks(np.arange(np.floor(val_min / tick_step) * tick_step,
val_max + tick_step, tick_step))
ax2.tick_params(axis='x', labelsize=8, rotation=45)
ax2.grid(axis='y', alpha=0.3)
fig2.tight_layout()
st.pyplot(fig2)
st.markdown("#### 2️⃣ Density Scatter Plot (FACS-style)")
st.caption("Each dot = one measurement (sample × position). Color = local point density.")
x_vals = melted["Position_idx"].values.astype(float)
y_vals = melted["Value"].values.astype(float)
x_jittered = x_vals + np.random.default_rng(42).uniform(-0.3, 0.3, size=len(x_vals))
with st.spinner("Computing point density..."):
try:
xy = np.vstack([x_jittered, y_vals])
density = gaussian_kde(xy)(xy)
except np.linalg.LinAlgError:
density = np.ones(len(x_vals))
sort_idx = density.argsort()
x_plot = x_jittered[sort_idx]
y_plot = y_vals[sort_idx]
d_plot = density[sort_idx]
fig3, ax3 = plt.subplots(figsize=(12, 6))
scatter = ax3.scatter(x_plot, y_plot, c=d_plot, cmap="jet", s=8, alpha=0.7, edgecolors="none")
cbar = fig3.colorbar(scatter, ax=ax3, label="Density")
ax3.set_xlabel("Position")
ax3.set_ylabel(value_label)
ax3.set_title(f"Density Scatter — Position vs. {value_label}")
ax3.set_xticks(sorted(melted["Position_idx"].unique()))
ax3.grid(alpha=0.2)
fig3.tight_layout()
st.pyplot(fig3)
st.markdown("#### 3️⃣ 2D Density Heatmap")
st.caption("Binned heatmap of editing values by position — similar to a FACS density plot.")
y_bins = st.slider("Vertical bins:", min_value=20, max_value=150, value=60, key="heatmap_ybins")
positions_unique = sorted(melted["Position_idx"].unique())
n_positions = len(positions_unique)
fig4, ax4 = plt.subplots(figsize=(12, 6))
h = ax4.hist2d(
x_vals, y_vals,
bins=[n_positions, y_bins],
cmap="jet",
norm=mcolors.LogNorm() if melted["Value"].max() > 0 else None,
)
fig4.colorbar(h[3], ax=ax4, label="Count (log scale)")
ax4.set_xlabel("Position")
ax4.set_ylabel(value_label)
ax4.set_title(f"2D Density Heatmap — Position vs. {value_label}")
ax4.set_xticks(positions_unique)
ax4.grid(alpha=0.15)
fig4.tight_layout()
st.pyplot(fig4)
except Exception as e:
st.error(f"❌ Error processing file: {e}")
import traceback
st.code(traceback.format_exc())
else:
st.info("👆 Upload a data file (CSV or Excel) to start exploring.")
# --------------------------------------------------
# TAB 4: Pipetting Command Generator
# --------------------------------------------------
with tab4:
from math import ceil
st.header("🧪 Pipetting Command Generator for Eppendorf epMotion liquid handler")
st.markdown("""
Upload your sample file (Excel, CSV, or TXT) containing binary mutation data.
The app will:
- Auto-detect or create `Sample`, `Position#`, `Total edited`, and `Volume per "1"` columns
- Let you set the **Maximum volume per input well (µL)** used to compute `Volume per "1"`
- Calculate total demand per input and suggest a **uniform layout** (same # consecutive wells per input)
- **Preview** the layout on a plate map (with tooltips)
- After confirmation, generate pipetting commands and a source volume summary
""")
uploaded_writing = st.file_uploader(
"📤 Upload data file",
type=["xlsx", "csv", "txt"],
key="writing_uploader"
)
max_per_well_ul = st.number_input(
"Maximum volume per source well (µL)",
min_value=10.0, max_value=2000.0, value=160.0, step=10.0
)
ROWS_96 = ["A", "B", "C", "D", "E", "F", "G", "H"]
COLS_96 = list(range(1, 13))
def well_name(row_letter, col_number):
return f"{row_letter}{col_number}"
def enumerate_plate_wells():
for r in ROWS_96:
for c in COLS_96:
yield f"{r}{c}"
def parse_well_name(well: str):
m = re.match(r"([A-Ha-h])\s*([0-9]+)", str(well).strip())
if not m:
return ("A", 0)
return (m.group(1).upper(), int(m.group(2)))
def sample_index_to_plate_and_well(sample_idx: int):
plate_num = ((sample_idx - 1) // 96) + 1
within_plate = (sample_idx - 1) % 96
row_idx = within_plate // 12
col_idx = within_plate % 12
return plate_num, well_name(ROWS_96[row_idx], COLS_96[col_idx])
def build_global_wells_list(n_plates: int):
out = []
for p in range(1, n_plates + 1):
for w in enumerate_plate_wells():
out.append((p, w))
return out
def pick_tool(volume_ul: float) -> str:
return "TS_10" if volume_ul <= 10.0 else "TS_50"
PALETTE = [
"#4F46E5", "#22C55E", "#F59E0B", "#EF4444", "#06B6D4", "#A855F7", "#84CC16", "#F97316",
"#0EA5E9", "#E11D48", "#10B981", "#7C3AED", "#15803D", "#EA580C", "#2563EB", "#DC2626"
]
def render_plate_map_html(plates_used, well_to_input, max_wells_per_source, inputs_count):
legend_spans = []
for i in range(1, inputs_count + 1):
color = PALETTE[(i-1) % len(PALETTE)]
legend_spans.append(
f"<span style='display:inline-block;margin-right:12px'>"
f"<span style='display:inline-block;width:12px;height:12px;background:{color};border:1px solid #333;margin-right:6px;vertical-align:middle'></span>"
f"Input {i}</span>"
)
legend_html = "<div style='margin:8px 0 16px 0'>" + "".join(legend_spans) + "</div>"
css = """
<style>
.plate { margin: 10px 0 24px 0; }
.plate-title { font-weight: 600; margin: 4px 0 8px 0; }
.grid { display: grid; grid-template-columns: 32px repeat(12, 38px); grid-auto-rows: 32px; gap: 4px; }
.cell { width: 38px; height: 32px; border: 1px solid #DDD; display:flex; align-items:center; justify-content:center; font-size:12px; background:#FAFAFA; position:relative; }
.head { font-weight:600; background:#F3F4F6; }
.cell[data-color] { color:#111; }
.cell .tip { visibility:hidden; opacity:0; transition:opacity 0.15s ease; position:absolute; bottom:100%; transform:translateY(-6px); left:50%; transform:translate(-50%, -6px); background:#111; color:#fff; padding:4px 6px; font-size:11px; border-radius:4px; white-space:nowrap; pointer-events:none; }
.cell:hover .tip { visibility:visible; opacity:0.95; }
</style>
"""
body = [css, legend_html]
for p in range(1, plates_used + 1):
body.append(f"<div class='plate'><div class='plate-title'>Plate {p}</div>")
body.append("<div class='grid'>")
body.append("<div class='cell head'></div>")
for c in COLS_96:
body.append(f"<div class='cell head'>{c}</div>")
for r in ROWS_96:
body.append(f"<div class='cell head'>{r}</div>")
for c in COLS_96:
well = f"{r}{c}"
key = (p, well)
if key in well_to_input:
input_idx, within_idx = well_to_input[key]
color = PALETTE[(input_idx-1) % len(PALETTE)]
tip = f"Input {input_idx} • P{p}:{well} • Block well {within_idx}/{max_wells_per_source}"
cell_html = (
f"<div class='cell' data-color style='background:{color};border-color:#555' title='{tip}'>"
f"<span class='tip'>{tip}</span>"
"</div>"
)
else:
cell_html = "<div class='cell'></div>"
body.append(cell_html)
body.append("</div></div>")
return "".join(body)
if uploaded_writing is not None:
try:
if uploaded_writing.name.endswith(".xlsx"):
df = pd.read_excel(uploaded_writing)
elif uploaded_writing.name.endswith(".csv"):
df = pd.read_csv(uploaded_writing)
else:
try:
df = pd.read_csv(uploaded_writing, sep="\t")
except Exception:
df = pd.read_csv(uploaded_writing)
st.success(f"✅ Loaded file with {len(df)} rows and {len(df.columns)} columns")
df.columns = [str(c).strip() for c in df.columns]
if not any(c.lower() == "sample" for c in df.columns):
df.insert(0, "Sample", np.arange(1, len(df) + 1))
st.info("`Sample` column missing — automatically generated 1..N.")
position_cols = [c for c in df.columns if re.match(r"(?i)^position\s*\d+", c)]
if not position_cols:
non_pos_cols = {"sample", "total edited", 'volume per "1"', "volume per 1"}
candidate_cols = [c for c in df.columns if c.lower() not in non_pos_cols]
position_cols = candidate_cols
st.info(f"Position columns inferred automatically: {len(position_cols)} detected.")
def pos_key(col_name: str):
m = re.search(r"(\d+)", col_name)
return int(m.group(1)) if m else 10**9
position_cols = sorted(position_cols, key=pos_key)
df[position_cols] = df[position_cols].apply(pd.to_numeric, errors="coerce").fillna(0).astype(int)
if "Total edited" not in df.columns:
df["Total edited"] = df[position_cols].sum(axis=1).astype(int)
st.info("`Total edited` column missing — calculated automatically as sum of 1s per row.")
st.markdown("#### ⚙️ Volume Calculation Settings")
default_total_vol = st.number_input(
"Maximum volume per input well (µL)",
min_value=1.0, max_value=10000.0, value=64.0, step=1.0,
help="Used to compute Volume per '1' as (Maximum volume per input well / Total edited) when not provided."
)
vol_candidates = [c for c in df.columns if "volume per" in c.lower()]
if not vol_candidates:
df['Volume per "1"'] = default_total_vol / df["Total edited"].replace(0, np.nan)
df['Volume per "1"'] = df['Volume per "1"'].fillna(0)
st.info(f'`Volume per "1"` column missing — calculated automatically as {default_total_vol:.0f} µL (max per input well) / Total edited.')
volume_col = 'Volume per "1"'
else:
volume_col = vol_candidates[0]
if df[volume_col].max() > max_per_well_ul:
st.error(
f"❌ At least one row has `Volume per \"1\"` greater than the per-well cap ({max_per_well_ul} µL). "
"Increase the cap or reduce per-transfer volume."
)
st.stop()
vol_per_one_series = pd.to_numeric(df[volume_col], errors="coerce").fillna(0.0)
total_volume_per_input = [float(vol_per_one_series[df[pos] == 1].sum()) for pos in position_cols]
wells_needed_per_input = [int(ceil(tv / max_per_well_ul)) if tv > 0 else 0 for tv in total_volume_per_input]
num_inputs = len(position_cols)
max_wells_per_source = max(wells_needed_per_input) if wells_needed_per_input else 0
st.markdown("### 👀 Preview: Suggested Uniform Layout")
if max_wells_per_source == 0:
st.info("No edits detected — nothing to allocate.")
st.stop()
st.write(
f"💡 Suggested layout: **{max_wells_per_source} consecutive wells per input** "
f"(cap {max_per_well_ul:.0f} µL/well)."
)
total_wells_needed_uniform = num_inputs * max_wells_per_source
plates_needed = int(ceil(total_wells_needed_uniform / 96)) or 1
global_wells = sorted(
build_global_wells_list(plates_needed),
key=lambda x: (
x[0],
ROWS_96.index(parse_well_name(x[1])[0]),
parse_well_name(x[1])[1]
)
)
global_wells = global_wells[:total_wells_needed_uniform]
assigned_wells_map, well_to_input, preview_rows = {}, {}, []
for i in range(1, num_inputs + 1):
start, end = (i - 1) * max_wells_per_source, i * max_wells_per_source
block = global_wells[start:end]
assigned_wells_map[i] = block
for j, (p, w) in enumerate(block, start=1):
well_to_input[(p, w)] = (i, j)
block_str = ", ".join([f"P{p}:{w}" for (p, w) in block])
preview_rows.append({
"Input (Position #)": i,
"Total demand (µL)": round(total_volume_per_input[i-1], 2),
"Wells needed (actual)": wells_needed_per_input[i-1],
"Allocated (uniform)": max_wells_per_source,
"Assigned wells": block_str
})
preview_df = pd.DataFrame(preview_rows)
st.dataframe(preview_df, width="stretch", height=300)
st.markdown("#### Plate Map (hover cells for details)")
plate_html = render_plate_map_html(plates_needed, well_to_input, max_wells_per_source, num_inputs)
st.markdown(plate_html, unsafe_allow_html=True)
st.markdown("### ✅ Generate Pipetting Commands")
if st.button("Generate using this layout"):
per_input_well_cum = {i: [0.0] * max_wells_per_source for i in range(1, num_inputs + 1)}
commands, source_volume_totals = [], {}
for _, row in df.iterrows():
sample_id = int(row["Sample"])
vol_per_one = float(row[volume_col])
if vol_per_one <= 0:
continue
dest_plate, dest_well = sample_index_to_plate_and_well(sample_id)
tool = pick_tool(vol_per_one)
for pos_idx, col in enumerate(position_cols, start=1):
if int(row[col]) != 1:
continue
wells_for_input = assigned_wells_map[pos_idx]
cum_list = per_input_well_cum[pos_idx]
chosen = None
for j, ((src_plate, src_well), current_vol) in enumerate(zip(wells_for_input, cum_list)):
if current_vol + vol_per_one <= max_per_well_ul:
chosen = (j, src_plate, src_well)
break
if chosen is None:
st.error(
f"Allocation exhausted for Input {pos_idx} while creating commands. "
"Increase the max volume per well or review per-transfer volume."
)
st.stop()
j, src_plate, src_well = chosen
cum_list[j] += vol_per_one
per_input_well_cum[pos_idx] = cum_list
source_volume_totals[(src_plate, src_well)] = source_volume_totals.get((src_plate, src_well), 0.0) + vol_per_one
commands.append({
"Input #": pos_idx,
"Source plate": src_plate,
"Source well": src_well,
"Destination plate": dest_plate,
"Destination well": dest_well,
"Volume": round(vol_per_one, 2),
"Tool": tool
})
commands_df = pd.DataFrame(commands)
def row_idx_from_well(w): return ROWS_96.index(parse_well_name(w)[0])
def col_num_from_well(w): return parse_well_name(w)[1]
commands_df["Src_row_idx"] = commands_df["Source well"].apply(row_idx_from_well)
commands_df["Src_col_num"] = commands_df["Source well"].apply(col_num_from_well)
commands_df["Dst_row_idx"] = commands_df["Destination well"].apply(row_idx_from_well)
commands_df["Dst_col_num"] = commands_df["Destination well"].apply(col_num_from_well)
commands_df = commands_df.sort_values(
by=["Input #", "Source plate", "Src_row_idx", "Src_col_num",
"Destination plate", "Dst_row_idx", "Dst_col_num"],
kind="stable"
)
commands_df = commands_df[[
"Input #", "Source plate", "Source well",
"Destination plate", "Destination well", "Volume", "Tool"
]]
st.success(f"✅ Generated {len(commands_df)} commands across {num_inputs} inputs.")
summary_rows = []
for i in range(1, num_inputs + 1):
for (p, w), used in zip(assigned_wells_map[i], per_input_well_cum[i]):
total = source_volume_totals.get((p, w), 0.0)
summary_rows.append({
"Source": i, "Source plate": p, "Source well": w,
"Total volume taken (µL)": round(total, 2),
"Allocated capacity (µL)": round(max_per_well_ul, 2)
})
summary_df = pd.DataFrame(summary_rows)
summary_df["Src_row_idx"] = summary_df["Source well"].apply(row_idx_from_well)
summary_df["Src_col_num"] = summary_df["Source well"].apply(col_num_from_well)
summary_df = summary_df.sort_values(
by=["Source", "Source plate", "Src_row_idx", "Src_col_num"],
kind="stable"
)[
["Source", "Source plate", "Source well", "Total volume taken (µL)", "Allocated capacity (µL)"]
]
st.markdown("### 💧 Pipetting Commands")
st.dataframe(commands_df, width="stretch", height=400)
st.download_button("⬇️ Download Commands CSV", commands_df.to_csv(index=False), "pipetting_commands.csv", mime="text/csv")
st.markdown("### 📊 Source Volume Summary")
st.dataframe(summary_df, width="stretch", height=400)
st.download_button("⬇️ Download Source Summary CSV", summary_df.to_csv(index=False), "source_volume_summary.csv", mime="text/csv")
except Exception as e:
st.error(f"❌ Error processing file: {e}")
else:
st.info("👆 Upload an Excel/CSV/TXT file to start.")