Spaces:
Sleeping
Sleeping
File size: 41,242 Bytes
1f07aba adeb0c7 1f07aba adeb0c7 1f07aba adeb0c7 1f07aba adeb0c7 1f07aba | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 | """
MoodSyncAI: Multi-Modal Sentiment & Emotion Analyser
====================================================
Components:
- Visual emotion: ViT (Vision Transformer) - trpakov/vit-face-expression
- Text emotion: DistilRoBERTa transformer - j-hartmann/emotion-english-distilroberta-base
- Fusion: Valence-aligned multimodal fusion + mismatch detection
- Generative: FLAN-T5 (with safe template fallback) for plain-language summary
- Webcam: Short video upload/recording, per-frame emotion timeline
All models are free/open-source from Hugging Face. Runs on CPU.
"""
import os
import io
import time
import warnings
from typing import List, Tuple, Dict
warnings.filterwarnings("ignore")
os.environ.setdefault("TRANSFORMERS_VERBOSITY", "error")
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
import numpy as np
import pandas as pd
from PIL import Image
import cv2
import plotly.graph_objects as go
import plotly.express as px
import gradio as gr
import torch
from transformers import (
pipeline,
AutoTokenizer,
AutoModelForSeq2SeqLM,
AutoModelForImageClassification,
AutoModelForSequenceClassification,
AutoImageProcessor,
)
# -------------------------------------------------------------
# Model identifiers (all free / public on Hugging Face Hub)
# -------------------------------------------------------------
VISION_MODEL = "trpakov/vit-face-expression" # ViT for facial emotion
TEXT_MODEL = "j-hartmann/emotion-english-distilroberta-base" # 7 emotions
GEN_MODEL = "google/flan-t5-base" # generative summariser
ASR_MODEL = "openai/whisper-tiny" # speech-to-text (Whisper)
DEVICE = 0 if torch.cuda.is_available() else -1
print(f"[MoodSyncAI] Torch device: {'cuda' if DEVICE == 0 else 'cpu'}")
# -------------------------------------------------------------
# Lazy-loaded model singletons
# -------------------------------------------------------------
_vision_pipe = None
_text_pipe = None
_gen_tokenizer = None
_gen_model = None
_face_cascade = None
_asr_pipe = None
_vit_attn_model = None
_vit_attn_processor = None
_text_attn_model = None
_text_attn_tokenizer = None
def get_vision_pipe():
global _vision_pipe
if _vision_pipe is None:
print("[MoodSyncAI] Loading vision model:", VISION_MODEL)
_vision_pipe = pipeline(
"image-classification",
model=VISION_MODEL,
device=DEVICE,
top_k=None,
)
return _vision_pipe
def get_text_pipe():
global _text_pipe
if _text_pipe is None:
print("[MoodSyncAI] Loading text model:", TEXT_MODEL)
_text_pipe = pipeline(
"text-classification",
model=TEXT_MODEL,
device=DEVICE,
top_k=None,
truncation=True,
)
return _text_pipe
def get_generator():
global _gen_tokenizer, _gen_model
if _gen_model is None:
try:
print("[MoodSyncAI] Loading generator:", GEN_MODEL)
_gen_tokenizer = AutoTokenizer.from_pretrained(GEN_MODEL)
_gen_model = AutoModelForSeq2SeqLM.from_pretrained(GEN_MODEL)
if DEVICE == 0:
_gen_model = _gen_model.to("cuda")
except Exception as e:
print("[MoodSyncAI] Generator load failed, will use template fallback:", e)
_gen_tokenizer, _gen_model = None, None
return _gen_tokenizer, _gen_model
def get_face_cascade():
global _face_cascade
if _face_cascade is None:
path = os.path.join(cv2.data.haarcascades, "haarcascade_frontalface_default.xml")
_face_cascade = cv2.CascadeClassifier(path)
return _face_cascade
# -------------------------------------------------------------
# Valence map: used to align textual and visual signals
# -------------------------------------------------------------
VALENCE = {
# text emotions (from distilroberta)
"joy": 1.0,
"love": 1.0,
"surprise": 0.3,
"neutral": 0.0,
"sadness": -1.0,
"fear": -0.8,
"anger": -0.9,
"disgust": -0.8,
# vision labels (ViT face expression labels)
"happy": 1.0,
"happiness": 1.0,
"sad": -1.0,
"angry": -0.9,
"fearful": -0.8,
"fear": -0.8,
"disgusted": -0.8,
"surprised": 0.3,
"contempt": -0.6,
}
def valence_of(label: str) -> float:
return VALENCE.get(label.lower().strip(), 0.0)
# -------------------------------------------------------------
# Face detection (crops to face for better accuracy; falls back to full image)
# -------------------------------------------------------------
def detect_and_crop_face(pil_img: Image.Image) -> Image.Image:
try:
cascade = get_face_cascade()
rgb = np.array(pil_img.convert("RGB"))
gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY)
faces = cascade.detectMultiScale(gray, scaleFactor=1.2, minNeighbors=5, minSize=(60, 60))
if len(faces) == 0:
return pil_img
# Pick the largest face
x, y, w, h = max(faces, key=lambda b: b[2] * b[3])
pad = int(0.15 * max(w, h))
x0 = max(0, x - pad); y0 = max(0, y - pad)
x1 = min(rgb.shape[1], x + w + pad); y1 = min(rgb.shape[0], y + h + pad)
return Image.fromarray(rgb[y0:y1, x0:x1])
except Exception:
return pil_img
# -------------------------------------------------------------
# Core analysis helpers
# -------------------------------------------------------------
def predict_visual(pil_img: Image.Image) -> List[Dict]:
pipe = get_vision_pipe()
face = detect_and_crop_face(pil_img)
preds = pipe(face)
# normalise into list of {label,score}
return [{"label": p["label"], "score": float(p["score"])} for p in preds]
def predict_text(text: str) -> List[Dict]:
if not text or not text.strip():
return [{"label": "neutral", "score": 1.0}]
pipe = get_text_pipe()
preds = pipe(text)[0] # top_k=None -> list of all
return [{"label": p["label"], "score": float(p["score"])} for p in preds]
def top1(preds: List[Dict]) -> Tuple[str, float]:
p = max(preds, key=lambda d: d["score"])
return p["label"], p["score"]
def weighted_valence(preds: List[Dict]) -> float:
return sum(p["score"] * valence_of(p["label"]) for p in preds)
def fuse(visual_preds: List[Dict], text_preds: List[Dict]) -> Dict:
v_label, v_conf = top1(visual_preds)
t_label, t_conf = top1(text_preds)
v_val = weighted_valence(visual_preds)
t_val = weighted_valence(text_preds)
delta = v_val - t_val
# mismatch: opposite sign with meaningful magnitude
mismatch = (v_val * t_val < -0.05) or (abs(delta) > 0.9)
if mismatch:
status = "MISMATCH DETECTED"
badge = "π "
elif abs(delta) < 0.35:
status = "ALIGNED"
badge = "π’"
else:
status = "PARTIALLY ALIGNED"
badge = "π‘"
# overall valence (weighted average favoring visual when mismatch)
if mismatch:
overall_val = 0.6 * v_val + 0.4 * t_val
else:
overall_val = 0.5 * (v_val + t_val)
return {
"visual_label": v_label,
"visual_conf": v_conf,
"text_label": t_label,
"text_conf": t_conf,
"visual_valence": v_val,
"text_valence": t_val,
"delta": delta,
"status": status,
"badge": badge,
"overall_valence": overall_val,
}
# -------------------------------------------------------------
# Generative summary
# -------------------------------------------------------------
def template_summary(fusion: Dict) -> str:
v = fusion["visual_label"]; vc = fusion["visual_conf"]
t = fusion["text_label"]; tc = fusion["text_conf"]
if fusion["status"].startswith("MISMATCH"):
return (
f"Despite expressing **{t}** sentiment verbally ({tc*100:.0f}% confidence), "
f"the speaker's facial cues indicate **{v}** ({vc*100:.0f}% confidence). "
f"This incongruence between words and expression is worth noting in the "
f"context of the conversation - the spoken message may not fully reflect "
f"how the person actually feels."
)
if fusion["status"] == "ALIGNED":
return (
f"The speaker's words ({t}, {tc*100:.0f}%) and facial expression "
f"({v}, {vc*100:.0f}%) are consistent. The overall emotional state "
f"appears genuine and uncomplicated."
)
return (
f"The speaker shows mild divergence between facial expression ({v}, "
f"{vc*100:.0f}%) and spoken sentiment ({t}, {tc*100:.0f}%). The signals "
f"are not contradictory but suggest some nuance in the emotional state."
)
def generative_summary(fusion: Dict, text_input: str) -> str:
tok, model = get_generator()
fallback = template_summary(fusion)
if model is None or tok is None:
return fallback
try:
mismatch = fusion["status"].startswith("MISMATCH")
instr = (
"rewrite as one empathetic paragraph (2-3 sentences) that explicitly "
"highlights the mismatch between facial expression and spoken words"
if mismatch else
"rewrite as one empathetic paragraph (2-3 sentences) noting the emotional state"
)
prompt = (
f"You are an empathetic psychologist. Given the analysis below, {instr}. "
f"Begin with the word 'The'.\n\n"
f"Analysis:\n"
f"- Spoken sentence: \"{text_input or '(none provided)'}\"\n"
f"- Facial emotion detected: {fusion['visual_label']} "
f"({fusion['visual_conf']*100:.0f}% confidence)\n"
f"- Sentiment of the words: {fusion['text_label']} "
f"({fusion['text_conf']*100:.0f}% confidence)\n"
f"- Alignment: {fusion['status']}\n\n"
f"Paragraph:"
)
inputs = tok(prompt, return_tensors="pt", truncation=True, max_length=512)
if DEVICE == 0:
inputs = {k: v.to("cuda") for k, v in inputs.items()}
out = model.generate(
**inputs,
max_new_tokens=140,
min_new_tokens=30,
num_beams=4,
do_sample=False,
no_repeat_ngram_size=3,
early_stopping=True,
)
text = tok.decode(out[0], skip_special_tokens=True).strip()
# Reject obvious echoes / too-short / off-topic outputs
bad = (len(text) < 50
or text.lower().startswith(("tell ", "write ", "give "))
or "story" in text.lower()[:40]
or fusion["visual_label"].lower() not in text.lower()
and fusion["text_label"].lower() not in text.lower())
if bad:
return fallback
return text
except Exception as e:
print("[MoodSyncAI] Generation error:", e)
return fallback
# -------------------------------------------------------------
# Plotly charts
# -------------------------------------------------------------
def bar_chart(preds: List[Dict], title: str, color: str) -> go.Figure:
df = pd.DataFrame(preds).sort_values("score", ascending=True)
df["pct"] = (df["score"] * 100).round(1)
fig = go.Figure(go.Bar(
x=df["pct"], y=df["label"], orientation="h",
marker=dict(color=color),
text=df["pct"].astype(str) + "%",
textposition="outside",
))
fig.update_layout(
title=title,
xaxis_title="Confidence (%)",
yaxis_title=None,
xaxis=dict(range=[0, 110]),
height=320, margin=dict(l=10, r=10, t=40, b=10),
template="plotly_white",
)
return fig
def empty_fig(msg="No data") -> go.Figure:
fig = go.Figure()
fig.add_annotation(text=msg, xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False, font=dict(size=14))
fig.update_layout(height=320, template="plotly_white",
margin=dict(l=10, r=10, t=20, b=10))
return fig
# -------------------------------------------------------------
# Tab 1: Image + Text analysis
# -------------------------------------------------------------
def analyse_image_text(image: Image.Image, text: str):
if image is None:
return (empty_fig("Please upload an image"),
empty_fig("Awaiting input"),
"### β οΈ Please upload an image of a face.", "")
visual_preds = predict_visual(image)
text_preds = predict_text(text or "")
fusion = fuse(visual_preds, text_preds)
summary = generative_summary(fusion, text)
vfig = bar_chart(visual_preds, "ποΈ Visual Emotion (ViT)", "#4C78A8")
tfig = bar_chart(text_preds, "π¬ Text Sentiment (Transformer)", "#54A24B")
fusion_md = f"""
### {fusion['badge']} Fusion Result: **{fusion['status']}**
| Modality | Top Prediction | Confidence | Valence |
|---|---|---|---|
| ποΈ Visual | **{fusion['visual_label']}** | {fusion['visual_conf']*100:.1f}% | {fusion['visual_valence']:+.2f} |
| π¬ Text | **{fusion['text_label']}** | {fusion['text_conf']*100:.1f}% | {fusion['text_valence']:+.2f} |
| π Overall valence | β | β | **{fusion['overall_valence']:+.2f}** |
"""
summary_md = f"### π§ Generative Summary\n\n> {summary}"
return vfig, tfig, fusion_md, summary_md
# -------------------------------------------------------------
# Tab 2: Webcam / short video β emotion timeline
# -------------------------------------------------------------
def sample_frames(video_path: str, max_frames: int = 12) -> List[Tuple[float, Image.Image]]:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return []
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
# If total frames is unknown, read sequentially to count.
if total <= 0:
total = 0
while True:
ok, _ = cap.read()
if not ok:
break
total += 1
cap.release()
cap = cv2.VideoCapture(video_path)
if total <= 0:
return []
duration = total / fps if fps > 0 else 1.0
n = min(max_frames, max(3, int(duration * 2))) # ~2 fps target
target_idxs = set(np.linspace(0, total - 1, n).astype(int).tolist())
out: List[Tuple[float, Image.Image]] = []
idx = 0
while True:
ok, frame = cap.read()
if not ok:
break
if idx in target_idxs:
ts = idx / fps if fps > 0 else float(idx)
pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
out.append((float(ts), pil))
if len(out) >= n:
break
idx += 1
cap.release()
return out
def analyse_video_text(video_path, text: str):
if not video_path:
return (empty_fig("Record or upload a short video"),
empty_fig("Awaiting input"),
empty_fig("Awaiting input"),
"### β οΈ Please provide a webcam video.", "")
frames = sample_frames(video_path, max_frames=12)
if not frames:
return (empty_fig("Could not read video"),
empty_fig(""), empty_fig(""),
"### β οΈ Could not decode the video file.", "")
timeline = [] # list of dict: ts, label->score
aggregated: Dict[str, float] = {}
for ts, pil in frames:
preds = predict_visual(pil)
row = {"timestamp": ts}
for p in preds:
row[p["label"]] = p["score"]
aggregated[p["label"]] = aggregated.get(p["label"], 0.0) + p["score"]
timeline.append(row)
# Average the aggregated visual prediction across frames
n = len(frames)
avg_visual = [{"label": k, "score": v / n} for k, v in aggregated.items()]
text_preds = predict_text(text or "")
fusion = fuse(avg_visual, text_preds)
summary = generative_summary(fusion, text)
# Timeline figure (line per emotion)
df = pd.DataFrame(timeline).fillna(0.0)
label_cols = [c for c in df.columns if c != "timestamp"]
tl_fig = go.Figure()
palette = px.colors.qualitative.Set2
for i, lbl in enumerate(label_cols):
tl_fig.add_trace(go.Scatter(
x=df["timestamp"], y=df[lbl] * 100,
mode="lines+markers", name=lbl,
line=dict(color=palette[i % len(palette)], width=2),
))
tl_fig.update_layout(
title="π Emotion Timeline (per frame)",
xaxis_title="Time (s)", yaxis_title="Confidence (%)",
height=360, template="plotly_white",
margin=dict(l=10, r=10, t=40, b=10),
yaxis=dict(range=[0, 100]),
)
vfig = bar_chart(avg_visual, "ποΈ Average Visual Emotion", "#4C78A8")
tfig = bar_chart(text_preds, "π¬ Text Sentiment", "#54A24B")
fusion_md = f"""
### {fusion['badge']} Fusion Result: **{fusion['status']}**
| Modality | Top Prediction | Confidence | Valence |
|---|---|---|---|
| ποΈ Visual (avg) | **{fusion['visual_label']}** | {fusion['visual_conf']*100:.1f}% | {fusion['visual_valence']:+.2f} |
| π¬ Text | **{fusion['text_label']}** | {fusion['text_conf']*100:.1f}% | {fusion['text_valence']:+.2f} |
| π Overall valence | β | β | **{fusion['overall_valence']:+.2f}** |
*Analysed {n} frames from the video.*
"""
summary_md = f"### π§ Generative Summary\n\n> {summary}"
return tl_fig, vfig, tfig, fusion_md, summary_md
# =============================================================
# NEW FEATURE BLOCK (additive β does not touch Tab 1 / Tab 2)
# =============================================================
# 1) Whisper ASR (audio β text channel)
# 2) Video with audio (transcribe + frame timeline + fusion)
# 3) Attention visualisation (ViT rollout heatmap + text token attention)
# =============================================================
import tempfile
import subprocess
import html as _html
def get_asr_pipe():
global _asr_pipe
if _asr_pipe is None:
print("[MoodSyncAI] Loading ASR model:", ASR_MODEL)
_asr_pipe = pipeline(
"automatic-speech-recognition",
model=ASR_MODEL,
device=DEVICE,
chunk_length_s=30,
return_timestamps=False,
)
return _asr_pipe
def transcribe_audio(audio_path: str) -> str:
if not audio_path:
return ""
try:
# Load audio ourselves (soundfile/librosa) so we don't depend on
# whisper's internal ffmpeg-via-PATH lookup.
import soundfile as sf
try:
audio, sr = sf.read(audio_path, dtype="float32", always_2d=False)
except Exception:
import librosa
audio, sr = librosa.load(audio_path, sr=16000, mono=True)
if audio.ndim > 1:
audio = audio.mean(axis=1)
if sr != 16000:
import librosa
audio = librosa.resample(audio, orig_sr=sr, target_sr=16000)
sr = 16000
if audio.size == 0:
return ""
pipe = get_asr_pipe()
out = pipe(
{"array": audio, "sampling_rate": sr},
generate_kwargs={"language": "en", "task": "transcribe"},
)
text = out.get("text", "") if isinstance(out, dict) else str(out)
return (text or "").strip()
except Exception as e:
print("[MoodSyncAI] Transcription error:", e)
return ""
def _ffmpeg_exe() -> str:
try:
import imageio_ffmpeg
return imageio_ffmpeg.get_ffmpeg_exe()
except Exception:
return "ffmpeg"
def extract_audio_from_video(video_path: str) -> str:
"""Extract mono 16 kHz wav from video. Returns wav path or '' on failure."""
if not video_path:
return ""
try:
out_path = tempfile.NamedTemporaryFile(
suffix=".wav", delete=False
).name
cmd = [
_ffmpeg_exe(), "-y", "-i", video_path,
"-vn", "-ac", "1", "-ar", "16000",
"-f", "wav", out_path,
]
proc = subprocess.run(cmd, capture_output=True, timeout=120)
if proc.returncode != 0 or not os.path.exists(out_path) or os.path.getsize(out_path) < 1024:
return ""
return out_path
except Exception as e:
print("[MoodSyncAI] Audio-extract error:", e)
return ""
# -------------------------------------------------------------
# Attention visualisation
# -------------------------------------------------------------
def _get_vit_attn():
global _vit_attn_model, _vit_attn_processor
if _vit_attn_model is None:
print("[MoodSyncAI] Loading ViT (eager attn) for attention rollout")
_vit_attn_processor = AutoImageProcessor.from_pretrained(VISION_MODEL)
_vit_attn_model = AutoModelForImageClassification.from_pretrained(
VISION_MODEL, attn_implementation="eager"
)
_vit_attn_model.eval()
if DEVICE == 0:
_vit_attn_model = _vit_attn_model.to("cuda")
return _vit_attn_model, _vit_attn_processor
def _get_text_attn():
global _text_attn_model, _text_attn_tokenizer
if _text_attn_model is None:
print("[MoodSyncAI] Loading text classifier (eager attn) for token attention")
_text_attn_tokenizer = AutoTokenizer.from_pretrained(TEXT_MODEL)
_text_attn_model = AutoModelForSequenceClassification.from_pretrained(
TEXT_MODEL, attn_implementation="eager"
)
_text_attn_model.eval()
if DEVICE == 0:
_text_attn_model = _text_attn_model.to("cuda")
return _text_attn_model, _text_attn_tokenizer
def vit_attention_heatmap(pil_img: Image.Image) -> Image.Image:
"""Attention-rollout heatmap overlaid on the (face-cropped) image."""
try:
face = detect_and_crop_face(pil_img).convert("RGB")
model, processor = _get_vit_attn()
inputs = processor(images=face, return_tensors="pt")
if DEVICE == 0:
inputs = {k: v.to("cuda") for k, v in inputs.items()}
with torch.no_grad():
out = model(**inputs, output_attentions=True)
attns = out.attentions # tuple(L) of (1, H, S, S)
if not attns:
return face
# Attention rollout: avg heads, add identity, normalise, multiply layers
result = None
for a in attns:
a = a.mean(dim=1).squeeze(0) # (S, S)
a = a + torch.eye(a.size(0), device=a.device)
a = a / a.sum(dim=-1, keepdim=True)
result = a if result is None else a @ result
# CLS-token row, drop CLS index β patch importances
cls_attn = result[0, 1:].detach().cpu().numpy()
side = int(np.sqrt(cls_attn.shape[0]))
if side * side != cls_attn.shape[0]:
return face
grid = cls_attn.reshape(side, side)
grid = (grid - grid.min()) / (grid.max() - grid.min() + 1e-8)
# Resize heatmap to face image
w, h = face.size
heat = cv2.resize(grid, (w, h), interpolation=cv2.INTER_CUBIC)
heat_u8 = (heat * 255).astype(np.uint8)
color = cv2.applyColorMap(heat_u8, cv2.COLORMAP_JET)
color = cv2.cvtColor(color, cv2.COLOR_BGR2RGB)
base = np.array(face)
overlay = (0.55 * base + 0.45 * color).clip(0, 255).astype(np.uint8)
return Image.fromarray(overlay)
except Exception as e:
print("[MoodSyncAI] ViT attention error:", e)
return pil_img
def text_token_attention_html(text: str) -> str:
"""Render input text with per-token attention intensity (last layer, [CLS] row)."""
if not text or not text.strip():
return "<em>(no text)</em>"
try:
model, tok = _get_text_attn()
enc = tok(text, return_tensors="pt", truncation=True, max_length=256)
if DEVICE == 0:
enc = {k: v.to("cuda") for k, v in enc.items()}
with torch.no_grad():
out = model(**enc, output_attentions=True)
attns = out.attentions # tuple(L) of (1, H, S, S)
if not attns:
return _html.escape(text)
last = attns[-1].mean(dim=1).squeeze(0) # (S, S)
cls_row = last[0].detach().cpu().numpy() # importance of each token to CLS
ids = enc["input_ids"][0].detach().cpu().tolist()
tokens = tok.convert_ids_to_tokens(ids)
# Skip special tokens for normalisation range
specials = set(tok.all_special_tokens)
keep_mask = np.array([t not in specials for t in tokens])
if keep_mask.sum() == 0:
return _html.escape(text)
scores = cls_row.copy()
scores_disp = scores[keep_mask]
lo, hi = scores_disp.min(), scores_disp.max()
norm = (scores - lo) / (hi - lo + 1e-8)
norm = np.clip(norm, 0.0, 1.0)
# Build HTML: merge subword tokens (RoBERTa uses 'Δ ' prefix for word start)
spans = []
for i, t in enumerate(tokens):
if t in specials:
continue
display = t
prefix_space = ""
if display.startswith("Δ "):
display = display[1:]
prefix_space = " "
elif display.startswith("β"):
display = display[1:]
prefix_space = " "
intensity = float(norm[i])
# red highlight, alpha from intensity
bg = f"rgba(220,38,38,{intensity:.2f})"
color = "#fff" if intensity > 0.55 else "#111"
safe = _html.escape(display)
spans.append(
f"{prefix_space}<span style=\"background:{bg};color:{color};"
f"padding:2px 4px;border-radius:4px;margin:1px;"
f"font-family:monospace\" title=\"{intensity:.2f}\">{safe}</span>"
)
body = "".join(spans).strip()
legend = (
"<div style='margin-top:8px;font-size:12px;color:#555'>"
"Darker red = higher attention weight from [CLS] to that token "
"(last transformer layer, averaged over heads)."
"</div>"
)
return f"<div style='line-height:2;font-size:15px'>{body}</div>{legend}"
except Exception as e:
print("[MoodSyncAI] Text attention error:", e)
return _html.escape(text)
# -------------------------------------------------------------
# Tab 1 wrapper: existing outputs + (optional) attention viz
# -------------------------------------------------------------
def analyse_image_text_with_attention(image: Image.Image, text: str, show_attn: bool):
vfig, tfig, fusion_md, summary_md = analyse_image_text(image, text)
if not show_attn or image is None:
return (vfig, tfig, fusion_md, summary_md,
None, "<em>Toggle 'Show attention visualisation' to view.</em>")
heat = vit_attention_heatmap(image)
token_html = text_token_attention_html(text or "")
return vfig, tfig, fusion_md, summary_md, heat, token_html
# -------------------------------------------------------------
# Tab 3: Audio + Image
# -------------------------------------------------------------
def analyse_audio_image(audio_path, image: Image.Image):
if image is None and not audio_path:
return ("",
empty_fig("Provide an image"),
empty_fig("Provide audio"),
"### β οΈ Please provide both an image and audio.", "")
transcript = transcribe_audio(audio_path) if audio_path else ""
if not transcript:
transcript = "(no speech detected)"
if image is None:
return (transcript,
empty_fig("No image provided"),
empty_fig("(transcript only)"),
"### β οΈ Please also provide a face image.", "")
visual_preds = predict_visual(image)
spoken = "" if transcript.startswith("(") else transcript
text_preds = predict_text(spoken)
fusion = fuse(visual_preds, text_preds)
summary = generative_summary(fusion, spoken)
vfig = bar_chart(visual_preds, "ποΈ Visual Emotion (ViT)", "#4C78A8")
tfig = bar_chart(text_preds, "π¬ Sentiment of Transcribed Speech", "#54A24B")
fusion_md = f"""
### {fusion['badge']} Fusion Result: **{fusion['status']}**
| Modality | Top Prediction | Confidence | Valence |
|---|---|---|---|
| ποΈ Visual (image) | **{fusion['visual_label']}** | {fusion['visual_conf']*100:.1f}% | {fusion['visual_valence']:+.2f} |
| ποΈ Audio β Text | **{fusion['text_label']}** | {fusion['text_conf']*100:.1f}% | {fusion['text_valence']:+.2f} |
| π Overall valence | β | β | **{fusion['overall_valence']:+.2f}** |
"""
summary_md = f"### π§ Generative Summary\n\n> {summary}"
return transcript, vfig, tfig, fusion_md, summary_md
# -------------------------------------------------------------
# Tab 4: Video WITH audio (frames timeline + audio transcript β text channel)
# -------------------------------------------------------------
def analyse_video_with_audio(video_path):
if not video_path:
return ("",
empty_fig("Record or upload a video"),
empty_fig(""), empty_fig(""),
"### β οΈ Please provide a video.", "")
frames = sample_frames(video_path, max_frames=12)
if not frames:
return ("",
empty_fig("Could not read video"),
empty_fig(""), empty_fig(""),
"### β οΈ Could not decode the video file.", "")
# 1) Audio β transcript
wav = extract_audio_from_video(video_path)
transcript = transcribe_audio(wav) if wav else ""
if wav and os.path.exists(wav):
try: os.remove(wav)
except Exception: pass
if not transcript:
transcript = "(no speech detected in the audio track)"
spoken = "" if transcript.startswith("(") else transcript
# 2) Per-frame visual + aggregate
timeline = []
aggregated: Dict[str, float] = {}
for ts, pil in frames:
preds = predict_visual(pil)
row = {"timestamp": ts}
for p in preds:
row[p["label"]] = p["score"]
aggregated[p["label"]] = aggregated.get(p["label"], 0.0) + p["score"]
timeline.append(row)
n = len(frames)
avg_visual = [{"label": k, "score": v / n} for k, v in aggregated.items()]
# 3) Text channel from transcript
text_preds = predict_text(spoken)
fusion = fuse(avg_visual, text_preds)
summary = generative_summary(fusion, spoken)
# Timeline figure
df = pd.DataFrame(timeline).fillna(0.0)
label_cols = [c for c in df.columns if c != "timestamp"]
tl_fig = go.Figure()
palette = px.colors.qualitative.Set2
for i, lbl in enumerate(label_cols):
tl_fig.add_trace(go.Scatter(
x=df["timestamp"], y=df[lbl] * 100,
mode="lines+markers", name=lbl,
line=dict(color=palette[i % len(palette)], width=2),
))
tl_fig.update_layout(
title="π Emotion Timeline (per frame) β audio transcript drives text channel",
xaxis_title="Time (s)", yaxis_title="Confidence (%)",
height=360, template="plotly_white",
margin=dict(l=10, r=10, t=40, b=10),
yaxis=dict(range=[0, 100]),
)
vfig = bar_chart(avg_visual, "ποΈ Avg Visual Emotion (frames)", "#4C78A8")
tfig = bar_chart(text_preds, "π¬ Sentiment of Spoken Audio", "#54A24B")
fusion_md = f"""
### {fusion['badge']} Fusion Result: **{fusion['status']}**
| Modality | Top Prediction | Confidence | Valence |
|---|---|---|---|
| ποΈ Visual (avg of {n} frames) | **{fusion['visual_label']}** | {fusion['visual_conf']*100:.1f}% | {fusion['visual_valence']:+.2f} |
| ποΈ Audio transcript | **{fusion['text_label']}** | {fusion['text_conf']*100:.1f}% | {fusion['text_valence']:+.2f} |
| π Overall valence | β | β | **{fusion['overall_valence']:+.2f}** |
*Spoken words (auto-transcribed):* "{spoken or 'β'}"
"""
summary_md = f"### π§ Generative Summary\n\n> {summary}"
return transcript, tl_fig, vfig, tfig, fusion_md, summary_md
# -------------------------------------------------------------
# Gradio UI
# -------------------------------------------------------------
CSS = """
.gradio-container {max-width: 1200px !important;}
#title {text-align:center;}
footer {display: none !important;}
.show-api, .built-with, .settings {display: none !important;}
"""
with gr.Blocks(title="MoodSyncAI", theme=gr.themes.Soft(), css=CSS) as demo:
gr.Markdown("# π MoodSyncAI", elem_id="title")
gr.Markdown(
"**Multi-Modal Sentiment & Emotion Analyser** β combines a Vision "
"Transformer (face), a Transformer text classifier (words), a fusion "
"layer (mismatch detection), and a generative model (plain-language "
"summary). 100% open-source."
)
with gr.Tabs():
# ---------------- Tab 1 ----------------
with gr.Tab("πΌοΈ Image + Text"):
with gr.Row():
with gr.Column(scale=1):
img_in = gr.Image(type="pil", label="Face photo", height=320)
txt_in = gr.Textbox(
label="What the person said",
placeholder="e.g., No, I think the project is going really well.",
lines=2,
)
btn1 = gr.Button("π Analyse", variant="primary")
attn_toggle1 = gr.Checkbox(
label="π¬ Show attention visualisation (ViT rollout + text tokens)",
value=False,
)
gr.Examples(
examples=[
[None, "No, I think the project is going really well."],
[None, "I'm absolutely thrilled about the results!"],
[None, "I'm fine, really, don't worry about me."],
],
inputs=[img_in, txt_in],
)
with gr.Column(scale=2):
fusion_md1 = gr.Markdown()
summary_md1 = gr.Markdown()
with gr.Row():
vbar1 = gr.Plot(label="Visual emotion")
tbar1 = gr.Plot(label="Text sentiment")
with gr.Accordion("π¬ Attention visualisation", open=False):
attn_img1 = gr.Image(
label="ViT attention rollout (face)",
height=320, interactive=False,
)
attn_html1 = gr.HTML(label="Text token attention")
btn1.click(analyse_image_text_with_attention,
inputs=[img_in, txt_in, attn_toggle1],
outputs=[vbar1, tbar1, fusion_md1, summary_md1,
attn_img1, attn_html1])
# ---------------- Tab 2 ----------------
with gr.Tab("πΉ Webcam / Video + Text"):
gr.Markdown(
"Record a short clip from your webcam (3β10 s recommended) **or** "
"upload a short video. The system samples frames and builds an "
"emotion timeline."
)
with gr.Row():
with gr.Column(scale=1):
vid_in = gr.Video(
label="Webcam / video",
sources=["webcam", "upload"],
height=300,
)
txt_in2 = gr.Textbox(
label="What the person said",
placeholder="Type the spoken sentence hereβ¦",
lines=2,
)
btn2 = gr.Button("π Analyse video", variant="primary")
with gr.Column(scale=2):
timeline_plot = gr.Plot(label="Emotion timeline")
fusion_md2 = gr.Markdown()
summary_md2 = gr.Markdown()
with gr.Row():
vbar2 = gr.Plot(label="Avg visual emotion")
tbar2 = gr.Plot(label="Text sentiment")
btn2.click(analyse_video_text,
inputs=[vid_in, txt_in2],
outputs=[timeline_plot, vbar2, tbar2, fusion_md2, summary_md2])
# ---------------- Tab 3 : Audio + Image ----------------
with gr.Tab("ποΈ Audio + Image"):
gr.Markdown(
"Speak (or upload audio) **and** provide a face image. Whisper "
"transcribes the audio; the words become the *text channel* fed "
"into the multimodal fusion."
)
with gr.Row():
with gr.Column(scale=1):
audio_in3 = gr.Audio(
label="ποΈ Audio (microphone or upload)",
sources=["microphone", "upload"],
type="filepath",
)
img_in3 = gr.Image(type="pil", label="Face photo", height=300)
btn3 = gr.Button("π Transcribe & analyse", variant="primary")
with gr.Column(scale=2):
transcript3 = gr.Textbox(
label="Auto-transcript (Whisper)",
interactive=False, lines=2,
)
fusion_md3 = gr.Markdown()
summary_md3 = gr.Markdown()
with gr.Row():
vbar3 = gr.Plot(label="Visual emotion")
tbar3 = gr.Plot(label="Audioβtext sentiment")
btn3.click(analyse_audio_image,
inputs=[audio_in3, img_in3],
outputs=[transcript3, vbar3, tbar3, fusion_md3, summary_md3])
# ---------------- Tab 4 : Video WITH audio ----------------
with gr.Tab("π¬ Video with Audio"):
gr.Markdown(
"Record or upload a short video **with sound**. The system extracts "
"the audio track, transcribes it (Whisper), samples frames for an "
"emotion timeline, then fuses the visual signal with the spoken-word "
"sentiment β no manual typing needed."
)
with gr.Row():
with gr.Column(scale=1):
vid_in4 = gr.Video(
label="Webcam / video (with audio)",
sources=["webcam", "upload"],
height=300,
)
btn4 = gr.Button("π Transcribe & analyse video", variant="primary")
with gr.Column(scale=2):
transcript4 = gr.Textbox(
label="Auto-transcript (Whisper)",
interactive=False, lines=2,
)
timeline_plot4 = gr.Plot(label="Emotion timeline")
fusion_md4 = gr.Markdown()
summary_md4 = gr.Markdown()
with gr.Row():
vbar4 = gr.Plot(label="Avg visual emotion")
tbar4 = gr.Plot(label="Audioβtext sentiment")
btn4.click(analyse_video_with_audio,
inputs=[vid_in4],
outputs=[transcript4, timeline_plot4, vbar4, tbar4,
fusion_md4, summary_md4])
# ---------------- Tab 3 (about) ----------------
with gr.Tab("βΉοΈ About"):
gr.Markdown(f"""
### Architecture
| Stage | Model | Type |
|---|---|---|
| Visual emotion | `{VISION_MODEL}` | **Vision Transformer (ViT)** |
| Text sentiment | `{TEXT_MODEL}` | **Transformer (DistilRoBERTa)** |
| Speech-to-text | `{ASR_MODEL}` | **Encoder-Decoder Transformer (Whisper)** |
| Fusion | Valence-aligned multimodal fusion (custom) | rule + weighted |
| Generative summary | `{GEN_MODEL}` | **Encoder-Decoder Transformer (FLAN-T5)** |
| Attention viz | ViT attention rollout + last-layer text attention | interpretability |
### Fusion logic
1. Each modality produces a probability distribution over emotion labels.
2. Labels are mapped to a *valence* score in `[-1, +1]`.
3. We compute weighted valence per modality, then a delta.
4. Opposite signs β **MISMATCH** (amber). Small delta β **ALIGNED** (green).
5. Generative model receives the structured signals and writes plain-language output.
### Privacy
All processing runs locally on your machine; no data is sent to external services
after the first model download from the Hugging Face Hub.
""")
if __name__ == "__main__":
# Warm up small models so first request is snappy
try:
get_text_pipe()
except Exception as e:
print("[MoodSyncAI] Warmup text failed:", e)
import os as _os
_on_spaces = bool(_os.environ.get("SPACE_ID"))
demo.queue().launch(
server_name="0.0.0.0" if _on_spaces else "127.0.0.1",
server_port=7860,
inbrowser=not _on_spaces,
show_error=True,
show_api=False,
ssr_mode=False,
)
|