| from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.staticfiles import StaticFiles |
| import time |
| import os |
| import re |
| import asyncio |
| import base64 |
| from datetime import datetime |
| from typing import List, Optional, Any |
| from pydantic import BaseModel |
| from dotenv import load_dotenv |
|
|
| |
| from langchain_google_genai import ChatGoogleGenerativeAI |
| from langchain_core.messages import HumanMessage, SystemMessage, AIMessage |
|
|
| load_dotenv() |
|
|
| app = FastAPI(title="Socratic Sentiment Chatbot API") |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| class ChatMessage(BaseModel): |
| role: str |
| content: str |
|
|
| class ChatRequest(BaseModel): |
| message: str |
| gemini_api_key: Optional[str] = None |
| history: Optional[List[ChatMessage]] = None |
|
|
| class ChatResponse(BaseModel): |
| sentiment: str |
| response: str |
| latency: float |
| prompt_context: str |
| tokens: int |
| cost: float |
|
|
| |
| def estimate_tokens(text: str) -> int: |
| return max(1, int(len(text) / 4.0)) |
|
|
| |
| def calculate_cost(input_tokens: int, output_tokens: int) -> float: |
| |
| input_cost = (input_tokens / 1_000_000.0) * 0.075 |
| output_cost = (output_tokens / 1_000_000.0) * 0.30 |
| return input_cost + output_cost |
|
|
| |
| def get_text_content(content: Any) -> str: |
| if isinstance(content, str): |
| return content |
| elif isinstance(content, list): |
| text_parts = [] |
| for part in content: |
| if isinstance(part, dict) and part.get("type") == "text": |
| text_parts.append(part.get("text", "")) |
| elif isinstance(part, str): |
| text_parts.append(part) |
| return "".join(text_parts) |
| return str(content) |
|
|
| |
| def scrub_pii(text: str) -> str: |
| if not text: |
| return text |
| |
| text = re.sub(r'[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+', '[EMAIL]', text) |
| |
| text = re.sub(r'\b(?:\+?\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b', '[PHONE]', text) |
| |
| text = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '[IP_ADDRESS]', text) |
| |
| text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', text) |
| return text |
|
|
| |
| MD_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "sentiment_log.md") |
|
|
| def log_to_md(question: str, sentiment: str, latency: float, cost: float, tokens_in: int, tokens_out: int, reply: str): |
| file_exists = os.path.exists(MD_FILE) |
| try: |
| with open(MD_FILE, mode="a", encoding="utf-8") as f: |
| if not file_exists: |
| f.write("# Socratic Chatbot Sentiment & Response Log\n\n") |
| f.write("This file tracks detected user sentiments, response latencies, costs, and Socratic replies.\n\n") |
| |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| f.write(f"## [{timestamp}] Query: \"{question}\"\n\n") |
| f.write("<table>\n") |
| f.write(" <thead>\n") |
| f.write(" <tr><th align=\"left\">Metric</th><th align=\"left\">Value</th></tr>\n") |
| f.write(" </thead>\n") |
| f.write(" <tbody>\n") |
| f.write(f" <tr><td><strong>Detected Sentiment</strong></td><td><code>{sentiment}</code></td></tr>\n") |
| f.write(f" <tr><td><strong>Latency</strong></td><td>{round(latency, 3)}s</td></tr>\n") |
| f.write(f" <tr><td><strong>Estimated Cost</strong></td><td><code>${cost:.7f}</code></td></tr>\n") |
| f.write(f" <tr><td><strong>Tokens</strong></td><td>{tokens_in + tokens_out} ({tokens_in} in / {tokens_out} out)</td></tr>\n") |
| f.write(" </tbody>\n") |
| f.write("</table>\n\n") |
| f.write(f"### Socratic Tutor Reply\n{reply}\n\n") |
| f.write("---\n\n") |
| except Exception as e: |
| print(f"Error writing to MD log: {e}") |
|
|
| |
| def run_flow_b(message: str, api_key: str, history: Optional[List[ChatMessage]] = None): |
| import json |
| |
| |
| llm = ChatGoogleGenerativeAI( |
| model="gemini-3.1-flash-lite", |
| google_api_key=api_key, |
| temperature=0.5, |
| max_tokens=450, |
| generation_config={"response_mime_type": "application/json"} |
| ) |
| |
| custom_system = ( |
| "Socratic tutor: guide with clear, substantial hints. Continue unless close. " |
| "If close: give solution to the initial question & ask: 'Do you want to learn something else?'" |
| ) |
| |
| tone_instruction = ( |
| "JSON: {\"s\":\"sentiment\",\"r\":\"reply\"}\n" |
| "s values: confusion|frustration|confused_but_engaged|confused_and_frustrated|starting_to_get_bored|confident_and_engaged|neutral\n" |
| "Rules:\n" |
| "- Sympathize with s implicitly (tone/style); never name or mention the sentiment/emotion itself.\n" |
| "- NEVER use 'if you' (use direct phrasing: 'think about', 'imagine').\n" |
| "- Ask 1 question max.\n" |
| "Responses:\n" |
| "- frustration: acknowledge sentiment but not explicitly + simplify + question.\n" |
| "- starting_to_get_bored: acknowledge sentiment but not explicitly + puzzle/analogy + question.\n" |
| "- other: hint + question." |
| ) |
| |
| messages = [SystemMessage(content=f"{custom_system}\n\n{tone_instruction}")] |
| |
| |
| if history: |
| compact_history = history[-4:] |
| for msg in compact_history: |
| content = msg.content |
| if len(content) > 60: |
| content = content[:60] + "..." |
| |
| if msg.role == "user": |
| messages.append(HumanMessage(content=content)) |
| else: |
| messages.append(AIMessage(content=content)) |
| |
| messages.append(HumanMessage(content=message)) |
| |
| res = llm.invoke(messages) |
| raw_response = get_text_content(res.content) |
| cleaned_json = raw_response.strip() |
| |
| try: |
| parsed = json.loads(cleaned_json) |
| state_val = parsed.get("s", "neutral") |
| reply_val = parsed.get("r", "") |
| except Exception as e: |
| print(f"Failed to parse LLM JSON response: {e}. Raw response: {raw_response}") |
| state_val = "neutral" |
| reply_val = "Let's take a look at this concept step by step. What do you think is the first part?" |
| |
| prompt_context = f"{custom_system}\n{tone_instruction}\nUser Query: {message}" |
| est_in = estimate_tokens(prompt_context) |
| est_out = estimate_tokens(raw_response) |
| |
| return state_val, reply_val, prompt_context, est_in, est_out |
|
|
| |
| @app.get("/api/status") |
| def get_status(): |
| return { |
| "status": "ready", |
| "gemini_api_key_configured": bool(os.environ.get("GEMINI_API_KEY")) |
| } |
|
|
| @app.post("/api/chat", response_model=ChatResponse) |
| def chat_endpoint(request: ChatRequest): |
| |
| api_key = request.gemini_api_key or os.environ.get("GEMINI_API_KEY") |
| if not api_key: |
| raise HTTPException( |
| status_code=400, |
| detail="Gemini API Key is missing. Please provide it in the Settings panel or environment." |
| ) |
| |
| start_time = time.time() |
| |
| |
| scrubbed_message = scrub_pii(request.message) |
| |
| try: |
| sentiment, reply, prompt_context, est_in, est_out = run_flow_b( |
| message=scrubbed_message, |
| api_key=api_key, |
| history=request.history |
| ) |
| latency = time.time() - start_time |
| cost = calculate_cost(est_in, est_out) |
| tokens = est_in + est_out |
| |
| |
| log_to_md( |
| question=request.message, |
| sentiment=sentiment, |
| latency=latency, |
| cost=cost, |
| tokens_in=est_in, |
| tokens_out=est_out, |
| reply=reply |
| ) |
| |
| return ChatResponse( |
| sentiment=sentiment, |
| response=reply, |
| latency=round(latency, 3), |
| prompt_context=prompt_context, |
| tokens=tokens, |
| cost=cost |
| ) |
| except Exception as e: |
| print(f"Chat endpoint error: {e}") |
| raise HTTPException( |
| status_code=500, |
| detail=f"An error occurred: {str(e)}" |
| ) |
|
|
| |
| @app.websocket("/api/live-ws") |
| async def websocket_live_endpoint(websocket: WebSocket): |
| await websocket.accept() |
| |
| |
| api_key = websocket.query_params.get("api_key") or os.environ.get("GEMINI_API_KEY") |
| if not api_key: |
| await websocket.close(code=4000, reason="GEMINI_API_KEY is missing.") |
| return |
| |
| try: |
| from google import genai |
| from google.genai import types |
| except ImportError: |
| await websocket.close(code=4001, reason="google-genai SDK not installed.") |
| return |
|
|
| client = genai.Client(api_key=api_key) |
| |
| |
| config = types.LiveConnectConfig( |
| response_modalities=["AUDIO"], |
| system_instruction=types.Content( |
| parts=[types.Part.from_text( |
| text="Socratic tutor: guide with clear, substantial hints (no tiny nudges) to solve faster. " |
| "Confidence is not mastery—continue Socratic hints unless they are close. " |
| "Only when close to the solution, give the final answer & ask: 'Do you want to learn something else?' " |
| "NEVER use the phrase 'if you' anywhere in your response (e.g. do not say 'if you think', 'if you were', etc.). Instead, frame instructions or scenarios directly (e.g., say 'think about', 'imagine', 'when looking at', or 'sometimes'). " |
| "Only ask one question at a time to avoid overwhelming the user. " |
| "Keep replies extremely concise (maximum 3 brief sentences) and conversational." |
| )] |
| ) |
| ) |
| |
| try: |
| |
| async with client.aio.live.connect(model="gemini-3.1-flash-live-preview", config=config) as session: |
| |
| async def receive_from_client(): |
| try: |
| while True: |
| |
| message = await websocket.receive_json() |
| msg_type = message.get("type") |
| |
| if msg_type == "audio": |
| |
| audio_bytes = base64.b64decode(message["data"]) |
| |
| await session.send_realtime_input( |
| audio=types.Blob(data=audio_bytes, mime_type="audio/pcm;rate=16000") |
| ) |
| elif msg_type == "text": |
| |
| await session.send_realtime_input(text=message["data"]) |
| except WebSocketDisconnect: |
| pass |
| except Exception as e: |
| print(f"[WebSocket Proxy Client -> Gemini] Error: {e}") |
|
|
| async def send_to_client(): |
| try: |
| async for response in session.receive(): |
| server_content = response.server_content |
| if server_content is not None: |
| model_turn = server_content.model_turn |
| if model_turn is not None: |
| for part in model_turn.parts: |
| if part.inline_data is not None: |
| |
| audio_b64 = base64.b64encode(part.inline_data.data).decode('utf-8') |
| await websocket.send_json({ |
| "type": "audio", |
| "data": audio_b64 |
| }) |
| elif part.text is not None: |
| |
| await websocket.send_json({ |
| "type": "text", |
| "data": part.text |
| }) |
| |
| |
| if server_content.turn_complete: |
| await websocket.send_json({"type": "turn_complete"}) |
| except Exception as e: |
| print(f"[WebSocket Proxy Gemini -> Client] Error: {e}") |
|
|
| |
| await asyncio.gather(receive_from_client(), send_to_client()) |
| |
| except Exception as e: |
| print(f"WebSocket Gemini Live connection failed: {e}") |
| finally: |
| try: |
| await websocket.close() |
| except Exception: |
| pass |
|
|
| |
| frontend_dist_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "frontend", "dist") |
| if os.path.exists(frontend_dist_path): |
| app.mount("/", StaticFiles(directory=frontend_dist_path, html=True), name="frontend") |
|
|