| import os |
| import sqlite3 |
| import requests |
| import openai |
| import gradio as gr |
| import asyncio |
| from gtts import gTTS |
| from typing_extensions import TypedDict |
| from langgraph.graph import StateGraph, START, END |
| import csv |
|
|
| openai.api_key = os.getenv("OPENAI_API_KEY") |
|
|
| def init_db_from_csv(csv_path: str = "transactions.csv") -> None: |
| conn = sqlite3.connect("shop.db") |
| cur = conn.cursor() |
| cur.execute( |
| "CREATE TABLE IF NOT EXISTS transactions (date TEXT, product TEXT, amount REAL)" |
| ) |
| with open(csv_path, newline='') as f: |
| reader = csv.DictReader(f) |
| rows = [(row["date"], row["product"], float(row["amount"])) for row in reader] |
| cur.execute("DELETE FROM transactions") |
| cur.executemany( |
| "INSERT INTO transactions (date, product, amount) VALUES (?, ?, ?)", rows |
| ) |
| conn.commit() |
| conn.close() |
|
|
| init_db_from_csv() |
|
|
| def db_agent(query: str) -> str: |
| try: |
| conn = sqlite3.connect("shop.db") |
| cur = conn.cursor() |
| cur.execute( |
| """ |
| SELECT product, SUM(amount) AS revenue |
| FROM transactions |
| WHERE date = date('now') |
| GROUP BY product |
| ORDER BY revenue DESC |
| LIMIT 1 |
| """ |
| ) |
| row = cur.fetchone() |
| if row: |
| return f"Top product today: {row[0]} with ₹{row[1]:,.2f}" |
| return "No transactions found for today." |
| except sqlite3.OperationalError as e: |
| return f"Database error: {e}. Please check 'transactions' table in shop.db." |
|
|
| def web_search_agent(query: str) -> str: |
| try: |
| resp = requests.get( |
| "https://serpapi.com/search", |
| params={"q": query, "api_key": os.getenv("SERPAPI_KEY")} |
| ) |
| snippet = resp.json().get("organic_results", [{}])[0].get("snippet", "").strip() |
| if snippet: |
| return llm_agent(f"Summarize: {snippet}") |
| except Exception: |
| pass |
| return llm_agent(query) |
|
|
| def llm_agent(query: str) -> str: |
| response = openai.chat.completions.create( |
| model="gpt-4o-mini", |
| messages=[ |
| {"role": "system", "content": "You are a helpful assistant."}, |
| {"role": "user", "content": query}, |
| ], |
| temperature=0.2, |
| ) |
| return response.choices[0].message.content.strip() |
|
|
| def stt_agent(audio_path: str) -> str: |
| with open(audio_path, "rb") as afile: |
| transcript = openai.audio.transcriptions.create( |
| model="whisper-1", |
| file=afile |
| ) |
| return transcript.text.strip() |
|
|
| def tts_agent(text: str, lang: str = 'en') -> str: |
| tts = gTTS(text=text, lang=lang) |
| out_path = "response_audio.mp3" |
| tts.save(out_path) |
| return out_path |
|
|
| class State(TypedDict): |
| query: str |
| result: str |
|
|
| def route_fn(state: State) -> str: |
| q = state["query"].lower() |
| if any(k in q for k in ["max revenue", "revenue"]): |
| return "db" |
| if any(k in q for k in ["who", "what", "when", "where"]): |
| return "web" |
| return "llm" |
|
|
| def router_node(state: State) -> dict: |
| return {"query": state["query"]} |
|
|
| def db_node(state: State) -> dict: |
| return {"result": db_agent(state["query"]) } |
|
|
| def web_node(state: State) -> dict: |
| return {"result": web_search_agent(state["query"]) } |
|
|
| def llm_node(state: State) -> dict: |
| return {"result": llm_agent(state["query"]) } |
|
|
| builder = StateGraph(State) |
| builder.add_node("router", router_node) |
| builder.set_entry_point("router") |
| builder.set_conditional_entry_point( |
| route_fn, |
| path_map={"db": "db", "web": "web", "llm": "llm"} |
| ) |
| builder.add_node("db", db_node) |
| builder.add_node("web", web_node) |
| builder.add_node("llm", llm_node) |
| builder.add_edge(START, "router") |
| builder.add_edge("db", END) |
| builder.add_edge("web", END) |
| builder.add_edge("llm", END) |
| graph = builder.compile() |
|
|
| def handle_query(audio_or_text: str): |
| is_audio = audio_or_text.endswith('.wav') or audio_or_text.endswith('.mp3') |
| if is_audio: |
| query = stt_agent(audio_or_text) |
| else: |
| query = audio_or_text |
|
|
| state = graph.invoke({"query": query}) |
| response = state["result"] |
|
|
| if is_audio: |
| audio_path = tts_agent(response) |
| return response, audio_path |
| return response |
|
|
| with gr.Blocks() as demo: |
| |
| gr.Markdown( |
| """ |
| **Shop Voice-Box Assistant Demo!** |
| |
| **Usage Instructions:** |
| - Speak into your microphone or upload transactions.csv for data queries. |
| - Sample questions you can ask: |
| - What is the max revenue product today? |
| - Who invented the light bulb? |
| - Tell me a joke about cats. |
| """ |
| ) |
| inp = gr.Audio(sources=["microphone"], type="filepath", label="Speak or type your question") |
| out_text = gr.Textbox(label="Answer (text)") |
| out_audio = gr.Audio(label="Answer (speech)") |
| submit = gr.Button("Submit") |
| submit.click(fn=handle_query, inputs=inp, outputs=[out_text, out_audio]) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| if __name__ == "__main__": |
| demo.launch(share=False, server_name="0.0.0.0", server_port=7860) |
|
|