| """ |
| api.py — The main web server for this weather assistant app. |
| |
| HOW THIS APP WORKS: |
| When someone sends a question like "What's the weather in Tokyo?", this file |
| handles it end-to-end: |
| |
| 1. Receives the question via HTTP (like a web form submission) |
| 2. Passes it to an AI language model (Qwen2.5) running on HuggingFace |
| 3. The AI decides it needs real weather data, so it asks for the |
| get_current_temperature tool to be called |
| 4. This server calls that tool (defined in weather_mcp_server.py) |
| 5. The tool fetches live weather from the internet |
| 6. The result is sent back to the AI, which writes a natural response |
| 7. That response is returned to whoever asked the question |
| |
| WHAT IS FastAPI? |
| FastAPI is a Python library for building web servers. It lets you define |
| "endpoints" — URLs that accept requests and return responses. Think of it |
| like building a simple API that other apps or curl commands can talk to. |
| |
| WHAT IS MCP (Model Context Protocol)? |
| MCP is a standard way for AI models to use external tools. Instead of the |
| AI just generating text, it can say "I need to call this tool with these |
| arguments." This server listens for those requests and executes the tools. |
| weather_mcp_server.py defines the tools; this file calls them. |
| |
| WHAT IS HuggingFace? |
| HuggingFace is a platform that hosts AI models and lets you run them via |
| their API. This app uses their "Inference API" to run the Qwen2.5 language |
| model without needing to host it ourselves. |
| |
| ENDPOINTS (URLs this server responds to): |
| POST /ask Send a question, get an answer |
| Request: { "question": "What's the temp in Tokyo?" } |
| Response: { "answer": "It's currently 72°F and sunny in Tokyo..." } |
| |
| GET /health Just checks that the server is running |
| Response: { "status": "ok" } |
| |
| ENVIRONMENT VARIABLES (settings loaded from the environment, not hardcoded): |
| HF_TOKEN Your HuggingFace API token — needed to use the AI model |
| MCP_SERVER_URL Where the weather tool server is running |
| (defaults to http://localhost:8000/sse) |
| """ |
|
|
| import asyncio |
| import json |
| import os |
| import time |
| from contextlib import asynccontextmanager |
|
|
| from fastapi import FastAPI, HTTPException |
| from huggingface_hub import InferenceClient |
| from mcp import ClientSession |
| from mcp.client.sse import sse_client |
| from pydantic import BaseModel |
|
|
| |
| |
| |
|
|
| |
| |
| MODEL = "Qwen/Qwen2.5-7B-Instruct" |
|
|
| |
| |
| HF_TOKEN = os.environ.get("HF_TOKEN") |
|
|
| |
| |
| MCP_SERVER_URL = os.environ.get("MCP_SERVER_URL", "http://localhost:8000/sse") |
|
|
| |
| |
| SYSTEM_PROMPT = ( |
| "You are a helpful assistant with access to real-time weather data. " |
| "When the user asks about temperature or weather, always use the " |
| "get_current_temperature tool to fetch live data before answering. " |
| "Present results conversationally. If the user asks Any other question, tell them you only 'answer weather questions' And refuse to answer the question " |
| ) |
|
|
| |
| |
| |
|
|
|
|
| |
| |
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| print("Waiting for MCP server to be ready...", flush=True) |
|
|
| |
| |
| |
| for _ in range(20): |
| try: |
| async with sse_client(MCP_SERVER_URL) as (read, write): |
| async with ClientSession(read, write) as session: |
| await session.initialize() |
| tools = await session.list_tools() |
| print( |
| f"MCP ready — {len(tools.tools)} tool(s) available", flush=True |
| ) |
| break |
| except Exception: |
| await asyncio.sleep(1) |
| else: |
| print("WARNING: MCP server did not become ready in time", flush=True) |
|
|
| yield |
|
|
|
|
| |
| app = FastAPI(title="Weather via MCP + Qwen2.5", lifespan=lifespan) |
|
|
| |
| |
| |
|
|
|
|
| async def fetch_mcp_tools() -> list[dict]: |
| """ |
| Ask the weather tool server what tools it has available. |
| |
| Returns a list of tool definitions formatted the way the AI model expects |
| them — including the tool's name, what it does, and what arguments it takes. |
| """ |
| async with sse_client(MCP_SERVER_URL) as (read, write): |
| async with ClientSession(read, write) as session: |
| await session.initialize() |
| tools_result = await session.list_tools() |
| |
| return [ |
| { |
| "type": "function", |
| "function": { |
| "name": t.name, |
| "description": t.description, |
| "parameters": t.inputSchema, |
| }, |
| } |
| for t in tools_result.tools |
| ] |
|
|
|
|
| async def call_mcp_tool(tool_name: str, tool_args: dict) -> str: |
| """ |
| Call a specific tool on the weather tool server and return its result. |
| |
| For example: call_mcp_tool("get_current_temperature", {"location": "Tokyo"}) |
| connects to weather_mcp_server.py, runs that function, and returns the |
| weather data as a string. |
| """ |
| async with sse_client(MCP_SERVER_URL) as (read, write): |
| async with ClientSession(read, write) as session: |
| await session.initialize() |
| result = await session.call_tool(tool_name, tool_args) |
| |
| return "\n".join( |
| block.text for block in result.content if hasattr(block, "text") |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| async def run(user_message: str) -> str: |
| """ |
| Send a user's question to the AI and return its final answer. |
| |
| This is called an "agentic loop" because the AI can go back and forth |
| multiple times — asking for tool results, getting them, then deciding |
| whether to ask for more or give a final answer. |
| |
| Typical flow: |
| 1. We send: [system prompt] + [user question] + [available tools] |
| 2. AI responds: "I need to call get_current_temperature for Tokyo" |
| 3. We call that tool and get the weather data |
| 4. We send the weather data back to the AI |
| 5. AI responds with a natural language answer — we return that |
| """ |
| |
| client = InferenceClient(model=MODEL, token=HF_TOKEN) |
|
|
| |
| tools = await fetch_mcp_tools() |
|
|
| |
| |
| messages = [ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| {"role": "user", "content": user_message}, |
| ] |
|
|
| |
| response = client.chat.completions.create( |
| messages=messages, |
| tools=tools, |
| tool_choice="auto", |
| max_tokens=512, |
| temperature=0.2, |
| ) |
|
|
| choice = response.choices[0] |
| assistant_msg = choice.message |
|
|
| |
| while choice.finish_reason == "tool_calls" and assistant_msg.tool_calls: |
| |
| messages.append( |
| { |
| "role": "assistant", |
| "content": assistant_msg.content or "", |
| "tool_calls": [ |
| { |
| "id": tc.id, |
| "type": "function", |
| "function": { |
| "name": tc.function.name, |
| "arguments": tc.function.arguments, |
| }, |
| } |
| for tc in assistant_msg.tool_calls |
| ], |
| } |
| ) |
|
|
| |
| for tc in assistant_msg.tool_calls: |
| fn_name = tc.function.name |
| fn_args = json.loads( |
| tc.function.arguments |
| ) |
| tool_result = await call_mcp_tool(fn_name, fn_args) |
| messages.append( |
| { |
| "role": "tool", |
| "tool_call_id": tc.id, |
| "content": tool_result, |
| } |
| ) |
|
|
| |
| response = client.chat.completions.create( |
| messages=messages, |
| tools=tools, |
| tool_choice="auto", |
| max_tokens=512, |
| temperature=0.2, |
| ) |
| choice = response.choices[0] |
| assistant_msg = choice.message |
|
|
| |
| return assistant_msg.content or "(no response)" |
|
|
|
|
| |
| |
| |
|
|
| |
| |
|
|
|
|
| class AskRequest(BaseModel): |
| question: str |
|
|
|
|
| class AskResponse(BaseModel): |
| answer: str |
|
|
|
|
| |
| |
| |
|
|
|
|
| @app.get("/health") |
| async def health(): |
| """Simple health check — returns OK if the server is running.""" |
| return {"status": "ok"} |
|
|
|
|
| @app.post("/ask", response_model=AskResponse) |
| async def ask(req: AskRequest): |
| """ |
| Main endpoint — accepts a weather question and returns an AI-generated answer. |
| |
| Example request: |
| POST /ask |
| { "question": "What's the temperature in Paris?" } |
| |
| Example response: |
| { "answer": "It's currently 58°F and partly cloudy in Paris, France." } |
| """ |
| if not req.question.strip(): |
| raise HTTPException(status_code=400, detail="question must not be empty") |
| answer = await run(req.question) |
| return AskResponse(answer=answer) |
|
|
|
|
| |
| |
| |
|
|
| |
| |
| if __name__ == "__main__": |
| import uvicorn |
|
|
| uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|