Spaces:
Running
Running
| import gradio as gr | |
| import numpy as np | |
| import librosa | |
| import xgboost as xgb | |
| import random | |
| import subprocess | |
| import tempfile | |
| import os | |
| import cv2 | |
| import difflib | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.pipeline import Pipeline | |
| import torch | |
| import torchvision.transforms as T | |
| import torchvision.models as models | |
| # --- Constants --- | |
| SAMPLE_RATE = 16000 | |
| WINDOW_MS = 100 | |
| WINDOW_SAMPLES = int(SAMPLE_RATE * WINDOW_MS / 1000) | |
| N_MFCC = 13 | |
| SILENCE_EMOJI = "_" | |
| MIN_SEC = 3.0 | |
| MAX_SEC = 5.0 | |
| # --- Lightweight pretrained visual backbone --- | |
| device = torch.device("cpu") | |
| # mobilenet = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT) | |
| mobilenet = models.mobilenet_v3_small( | |
| weights=models.MobileNet_V3_Small_Weights.DEFAULT | |
| ) | |
| mobilenet = mobilenet.features # remove classifier | |
| mobilenet.eval() | |
| mobilenet.to(device) | |
| # ImageNet normalization | |
| video_transform = T.Compose([ | |
| T.ToPILImage(), | |
| T.Resize((96, 96)), # small input for speed | |
| T.ToTensor(), | |
| T.Normalize( | |
| mean=[0.485, 0.456, 0.406], | |
| std=[0.229, 0.224, 0.225] | |
| ) | |
| ]) | |
| def generate_challenge(): | |
| length = random.randint(3, 5) | |
| seq = [] | |
| for i in range(length): | |
| seq.append(str(random.choice([0, 1]))) | |
| if i < length - 1: | |
| seq.append(SILENCE_EMOJI) | |
| # Return both the mission string and reset visibility to True | |
| mission = " ".join(seq) | |
| return mission, gr.update(visible=True, value=mission) | |
| def hide_mission(audio_data): | |
| """Hides the mission textbox once the referee has recorded audio.""" | |
| if audio_data is not None: | |
| return gr.update(visible=False) | |
| return gr.update(visible=True) | |
| def post_process_video_sequence( | |
| preds, | |
| min_segment_frames=10, | |
| smoothing_window=10, | |
| background_class=2 | |
| ): | |
| """ | |
| Post-process frame-level predictions into a clean symbol sequence. | |
| Steps: | |
| 1. Temporal smoothing (majority vote). | |
| 2. Remove very short segments. | |
| 3. Collapse into final sequence. | |
| Args: | |
| preds: array of class predictions per frame | |
| min_segment_frames: minimum frames required to accept a symbol | |
| smoothing_window: neighborhood size for smoothing | |
| background_class: class index for background | |
| """ | |
| if len(preds) == 0: | |
| return "" | |
| preds = [int(p) for p in preds] | |
| # ----------------------------------- | |
| # 1. Majority vote smoothing | |
| # ----------------------------------- | |
| half_w = smoothing_window // 2 | |
| smoothed = [] | |
| for i in range(len(preds)): | |
| start = max(0, i - half_w) | |
| end = min(len(preds), i + half_w + 1) | |
| neighborhood = preds[start:end] | |
| smoothed.append(max(set(neighborhood), key=neighborhood.count)) | |
| # ----------------------------------- | |
| # 2. Segment compression | |
| # ----------------------------------- | |
| segments = [] | |
| current = smoothed[0] | |
| length = 1 | |
| for p in smoothed[1:]: | |
| if p == current: | |
| length += 1 | |
| else: | |
| segments.append((current, length)) | |
| current = p | |
| length = 1 | |
| segments.append((current, length)) | |
| # ----------------------------------- | |
| # 3. Filter short segments | |
| # ----------------------------------- | |
| filtered = [] | |
| for cls, length in segments: | |
| if cls != background_class and length < min_segment_frames: | |
| continue | |
| filtered.append(cls) | |
| # ----------------------------------- | |
| # 4. Collapse duplicates | |
| # ----------------------------------- | |
| final_seq = [] | |
| for cls in filtered: | |
| if cls == background_class: | |
| continue | |
| if not final_seq or cls != final_seq[-1]: | |
| final_seq.append(str(cls)) | |
| return "_".join(final_seq) | |
| def post_process_to_emoji(preds, window_ms, min_silence_ms=200): | |
| """Processes raw AI output, smooths it, enforces silence gaps, and merges duplicates.""" | |
| if len(preds) == 0: return "" | |
| ms_per_step = window_ms / 2 | |
| min_silence_steps = int(min_silence_ms / ms_per_step) | |
| # 1. Majority Vote Smoothing (Temporal Filtering) | |
| # Reduces "flicker" where a single window might jump to a wrong class | |
| smoothed = [] | |
| for i in range(len(preds)): | |
| start = max(0, i - 1) | |
| end = min(len(preds), i + 2) | |
| neighborhood = list(preds[start:end]) | |
| smoothed.append(max(set(neighborhood), key=neighborhood.count)) | |
| # 2. Silence Enforcement & Transition Logic | |
| # We only allow a change of class if the silence buffer is respected | |
| intermediate_sequence = [] | |
| last_val = -1 | |
| silence_count = 0 | |
| for p in smoothed: | |
| p = int(p) | |
| if p == 2: # Silence Class | |
| silence_count += 1 | |
| if last_val != 2: | |
| intermediate_sequence.append(2) | |
| last_val = 2 | |
| else: # Sound Class (0 or 1) | |
| if last_val != p: | |
| # If we were in silence, check if the gap was long enough | |
| if last_val == -1 or (last_val == 2 and silence_count >= min_silence_steps): | |
| intermediate_sequence.append(p) | |
| last_val = p | |
| silence_count = 0 | |
| # If we are jumping directly from 0 to 1 without silence, | |
| # we ignore it or force silence (depending on game strictness) | |
| # 3. Final Merge (The "100110" -> "1010" logic) | |
| # This removes any accidental back-to-back duplicates | |
| # print("Intermediate Sequence (post-silence enforcement):", intermediate_sequence) | |
| final_output = [] | |
| for val in intermediate_sequence: | |
| # print(f"Processing value: {val}") | |
| if val != 2: | |
| # Map back to emoji for silence or string for numbers | |
| # symbol = SILENCE_EMOJI if val == 2 else str(val) | |
| if not final_output or val != final_output[-1]: | |
| final_output.append(str(val)) | |
| # print(f"Added {val} to final output {final_output}") | |
| final_output=[char+"_" for char in final_output] | |
| return "".join(final_output[:-1]) # Remove trailing silence if exists | |
| def extract_features_sequence(audio_path,validate_duration=True): | |
| if audio_path is None: return None | |
| y, sr = librosa.load(audio_path, sr=SAMPLE_RATE, mono=True) | |
| if len(y) < WINDOW_SAMPLES: | |
| return None, f"Audio too short ({len(y)/SAMPLE_RATE:.1f}s), needs to be at least {WINDOW_MS/1000:.1f}s." | |
| elif validate_duration and len(y) > SAMPLE_RATE * 5: # Limit to 30 seconds for performance | |
| print(f"Audio too long ({len(y)/SAMPLE_RATE:.1f}s), truncating to 5s for feature extraction.") | |
| y = y[:SAMPLE_RATE * 5] | |
| hop = WINDOW_SAMPLES // 2 # 50% overlap for smoother sequence detection | |
| feats = [] | |
| for start in range(0, len(y) - WINDOW_SAMPLES, hop): | |
| w = y[start:start + WINDOW_SAMPLES] | |
| mfcc = librosa.feature.mfcc(y=w, sr=sr, n_mfcc=N_MFCC, n_fft=512) | |
| feats.append(mfcc.mean(axis=1)) | |
| return np.array(feats), "OK" | |
| def train_player_model(a0, a1, a_silence, player_name): | |
| X0, msg0 = extract_features_sequence(a0, validate_duration=True) | |
| X1, msg1 = extract_features_sequence(a1, validate_duration=True) | |
| X_sil, msg_sil = extract_features_sequence(a_silence, validate_duration=True) | |
| if X0 is None: return None, f"{player_name} Source 0: {msg0}" | |
| if X1 is None: return None, f"{player_name} Source 1: {msg1}" | |
| if X_sil is None: return None, f"{player_name} Silence: {msg_sil}" | |
| X = np.vstack([X0, X1, X_sil]) | |
| y = np.concatenate([np.zeros(len(X0)), np.ones(len(X1)), np.full(len(X_sil), 2)]) | |
| print(f"{player_name} - Training model with {len(X)} samples: {len(X0)} Source 0, {len(X1)} Source 1, {len(X_sil)} Silence") | |
| model = Pipeline([ | |
| ("scaler", StandardScaler()), | |
| ("clf", xgb.XGBClassifier(n_estimators=50, max_depth=3, objective='multi:softprob', num_class=3)) | |
| ]) | |
| model.fit(X, y) | |
| print(f"{player_name} model trained successfully with {len(X)} samples!") | |
| return model, "OK" | |
| def play_game(target_display, ref_audio, p1_0, p1_1, p1_s, p2_0, p2_1, p2_s): | |
| # Validation and Training logic... | |
| m1, err1 = train_player_model(p1_0, p1_1, p1_s, "Player 1") | |
| if m1 is None: return f"### β {err1}" | |
| m2, err2 = train_player_model(p2_0, p2_1, p2_s, "Player 2") | |
| if m2 is None: return f"### β {err2}" | |
| if not ref_audio: return "### β οΈ Referee recording missing!" | |
| X_ref, _ = extract_features_sequence(ref_audio, validate_duration=False) | |
| target_numeric = target_display.replace(" ", "").replace(SILENCE_EMOJI, "2") | |
| res1_emoji = post_process_to_emoji(m1.predict(X_ref), WINDOW_MS) | |
| res2_emoji = post_process_to_emoji(m2.predict(X_ref), WINDOW_MS) | |
| res1_num = res1_emoji.replace(SILENCE_EMOJI, "2") | |
| res2_num = res2_emoji.replace(SILENCE_EMOJI, "2") | |
| score1 = round(difflib.SequenceMatcher(None, target_numeric, res1_num).ratio() * 100, 1) | |
| score2 = round(difflib.SequenceMatcher(None, target_numeric, res2_num).ratio() * 100, 1) | |
| winner = "Player 1" if score1 > score2 else "Player 2" | |
| if score1 == score2: winner = "It's a Tie!" | |
| # Formatting results with Large Markdown | |
| return f""" | |
| # π BATTLE RESULTS | |
| ## π― Mission Target: {target_display} | |
| --- | |
| ## π€ Player 1 `{res1_emoji}` | **Accuracy:** `{score1}%` | |
| ## π€ Player 2 `{res2_emoji}` | **Accuracy:** `{score2}%` | |
| --- | |
| # π WINNER: <span style="color: #ff4b4b; font-size: 40px;">{winner}</span> | |
| """ | |
| # ========================================================= | |
| # VIDEO SECTION | |
| # ========================================================= | |
| def ensure_readable_video(input_path): | |
| """Re-encode video to MP4 to avoid WEBM/Opus issues.""" | |
| if input_path is None: | |
| return None | |
| tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
| tmp_path = tmp.name | |
| tmp.close() | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", input_path, | |
| "-an", # remove audio | |
| "-vcodec", "libx264", | |
| "-preset", "ultrafast", | |
| tmp_path | |
| ] | |
| try: | |
| subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| return tmp_path | |
| except: | |
| return input_path | |
| def extract_video_features(video_path, max_frames=300): | |
| """Extract frame-level features from video.""" | |
| if video_path is None: | |
| return None, "No video provided" | |
| video_path = ensure_readable_video(video_path) | |
| cap = cv2.VideoCapture(video_path) | |
| feats = [] | |
| frame_count = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret or frame_count >= max_frames: | |
| break | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| tensor = video_transform(frame_rgb).unsqueeze(0).to(device) | |
| with torch.no_grad(): | |
| feat_map = mobilenet(tensor) | |
| feat = torch.nn.functional.adaptive_avg_pool2d(feat_map, 1) | |
| feat = feat.view(-1).cpu().numpy() | |
| feats.append(feat) | |
| # frame = cv2.resize(frame, (64, 64)) | |
| # frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # # Basic color statistics | |
| # mean = frame_rgb.mean(axis=(0, 1)) | |
| # std = frame_rgb.std(axis=(0, 1)) | |
| # brightness = frame_rgb.mean() | |
| # feat = np.concatenate([mean, std, [brightness]]) | |
| # feats.append(feat) | |
| frame_count += 1 | |
| cap.release() | |
| if len(feats) == 0: | |
| return None, "No frames extracted" | |
| return np.array(feats), "OK" | |
| def train_video_model(v0, v1, v_bg): | |
| X0, msg0 = extract_video_features(v0) | |
| X1, msg1 = extract_video_features(v1) | |
| Xbg, msgbg = extract_video_features(v_bg) | |
| if X0 is None: return None, f"Class 0 error: {msg0}" | |
| if X1 is None: return None, f"Class 1 error: {msg1}" | |
| if Xbg is None: return None, f"Background error: {msgbg}" | |
| print(f"Training video model with {len(X0)} frames for Class 0, {len(X1)} frames for Class 1, and {len(Xbg)} frames for Background.") | |
| X = np.vstack([X0, X1, Xbg]) | |
| y = np.concatenate([ | |
| np.zeros(len(X0)), | |
| np.ones(len(X1)), | |
| np.full(len(Xbg), 2) | |
| ]) | |
| model = Pipeline([ | |
| ("scaler", StandardScaler()), | |
| ("clf", xgb.XGBClassifier( | |
| n_estimators=50, | |
| max_depth=3, | |
| objective='multi:softprob', | |
| num_class=3 | |
| )) | |
| ]) | |
| model.fit(X, y) | |
| print("Video model trained successfully!") | |
| return model, "OK" | |
| def decode_video_sequence(model, video_path): | |
| X, msg = extract_video_features(video_path) | |
| if X is None: | |
| return f"Error: {msg}" | |
| preds = model.predict(X) | |
| print(f"Raw frame-level predictions: {preds}") | |
| return post_process_video_sequence(preds) | |
| def run_video_decoder(v0, v1, v_bg, test_video): | |
| model, msg = train_video_model(v0, v1, v_bg) | |
| if model is None: | |
| return f"β {msg}" | |
| result = decode_video_sequence(model, test_video) | |
| return f"### π¬ Decoded Sequence: `{result}`" | |
| # ========================================================= | |
| # GRADIO UI WITH DUAL TABS | |
| # ========================================================= | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| with gr.Tabs(): | |
| # ===================================== | |
| # TAB 1 β AUDIO GAME (existing) | |
| # ===================================== | |
| with gr.Tab("ποΈ Audio Sequence Battle"): | |
| hidden_target = gr.State("") | |
| with gr.Row(): | |
| target_seq_ui = gr.Textbox( | |
| label="π’ Referee's Mission", | |
| interactive=False | |
| ) | |
| refresh_btn = gr.Button("π New Mission") | |
| demo.load(generate_challenge, outputs=[hidden_target, target_seq_ui]) | |
| refresh_btn.click(generate_challenge, outputs=[hidden_target, target_seq_ui]) | |
| with gr.Accordion("βοΈ Step 1: The Referee", open=True): | |
| ref_audio = gr.Audio( | |
| sources=["microphone"], | |
| type="filepath", | |
| label="Record the Mission" | |
| ) | |
| ref_audio.change(hide_mission, inputs=ref_audio, outputs=target_seq_ui) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### π€ Player 1") | |
| p1_0 = gr.Audio(sources=["microphone"], type="filepath", label="Source 0") | |
| p1_1 = gr.Audio(sources=["microphone"], type="filepath", label="Source 1") | |
| p1_s = gr.Audio(sources=["microphone"], type="filepath", label="Silence") | |
| with gr.Column(): | |
| gr.Markdown("### π€ Player 2") | |
| p2_0 = gr.Audio(sources=["microphone"], type="filepath", label="Source 0") | |
| p2_1 = gr.Audio(sources=["microphone"], type="filepath", label="Source 1") | |
| p2_s = gr.Audio(sources=["microphone"], type="filepath", label="Silence") | |
| btn_fight = gr.Button("π₯ REVEAL WINNER", variant="primary") | |
| result_display = gr.Markdown("### Results will appear here") | |
| btn_fight.click( | |
| play_game, | |
| inputs=[hidden_target, ref_audio, p1_0, p1_1, p1_s, p2_0, p2_1, p2_s], | |
| outputs=result_display | |
| ) | |
| # ===================================== | |
| # TAB 2 β VIDEO DECODER | |
| # ===================================== | |
| with gr.Tab("π¬ Video Frame Decoder"): | |
| gr.Markdown("## Train video symbols and decode frame-level sequence") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Training Samples") | |
| v0 = gr.Video(label="Class 0 video",format="mp4") | |
| v1 = gr.Video(label="Class 1 video",format="mp4") | |
| v_bg = gr.Video(label="Background video",format="mp4") | |
| with gr.Column(): | |
| gr.Markdown("### Test Video") | |
| test_video = gr.Video(label="Video to decode",format="mp4") | |
| decode_btn = gr.Button("π¬ Decode Video", variant="primary") | |
| video_result = gr.Markdown("### Decoded result will appear here") | |
| decode_btn.click( | |
| run_video_decoder, | |
| inputs=[v0, v1, v_bg, test_video], | |
| outputs=video_result | |
| ) | |
| demo.launch() | |
| # # --- Gradio UI --- | |
| # with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| # gr.Markdown("# ποΈ The AI Sequence Battle") | |
| # # Store the mission in a hidden state so we can still use it for scoring even when invisible | |
| # hidden_target = gr.State("") | |
| # with gr.Row(): | |
| # target_seq_ui = gr.Textbox(label="π’ Referee's Mission (Memorize this!)", interactive=False) | |
| # refresh_btn = gr.Button("π New Mission") | |
| # # On load and on refresh, update both the UI and the State | |
| # demo.load(generate_challenge, outputs=[hidden_target, target_seq_ui]) | |
| # refresh_btn.click(generate_challenge, outputs=[hidden_target, target_seq_ui]) | |
| # with gr.Accordion("βοΈ Step 1: The Referee", open=True): | |
| # ref_audio = gr.Audio(sources=["microphone"], type="filepath", label="Record the Mission") | |
| # # Trigger hiding when audio is recorded | |
| # ref_audio.change(hide_mission, inputs=ref_audio, outputs=target_seq_ui) | |
| # with gr.Row(): | |
| # with gr.Column(): | |
| # gr.Markdown("### π€ Player 1 (3-5s samples)") | |
| # p1_0 = gr.Audio(sources=["microphone"], type="filepath", label="Source 0") | |
| # p1_1 = gr.Audio(sources=["microphone"], type="filepath", label="Source 1") | |
| # p1_s = gr.Audio(sources=["microphone"], type="filepath", label="Silence π€«") | |
| # with gr.Column(): | |
| # gr.Markdown("### π€ Player 2 (3-5s samples)") | |
| # p2_0 = gr.Audio(sources=["microphone"], type="filepath", label="Source 0") | |
| # p2_1 = gr.Audio(sources=["microphone"], type="filepath", label="Source 1") | |
| # p2_s = gr.Audio(sources=["microphone"], type="filepath", label="Silence π€«") | |
| # btn_fight = gr.Button("π₯ REVEAL WINNER", variant="primary", size="lg") | |
| # # Using Markdown for large, styled text results | |
| # result_display = gr.Markdown("### Results will appear here after the battle!") | |
| # btn_fight.click( | |
| # play_game, | |
| # inputs=[hidden_target, ref_audio, p1_0, p1_1, p1_s, p2_0, p2_1, p2_s], | |
| # outputs=result_display | |
| # ) | |
| # demo.launch() |