""" api.py — The main web server for this weather assistant app. HOW THIS APP WORKS: When someone sends a question like "What's the weather in Tokyo?", this file handles it end-to-end: 1. Receives the question via HTTP (like a web form submission) 2. Passes it to an AI language model (Qwen2.5) running on HuggingFace 3. The AI decides it needs real weather data, so it asks for the get_current_temperature tool to be called 4. This server calls that tool (defined in weather_mcp_server.py) 5. The tool fetches live weather from the internet 6. The result is sent back to the AI, which writes a natural response 7. That response is returned to whoever asked the question WHAT IS FastAPI? FastAPI is a Python library for building web servers. It lets you define "endpoints" — URLs that accept requests and return responses. Think of it like building a simple API that other apps or curl commands can talk to. WHAT IS MCP (Model Context Protocol)? MCP is a standard way for AI models to use external tools. Instead of the AI just generating text, it can say "I need to call this tool with these arguments." This server listens for those requests and executes the tools. weather_mcp_server.py defines the tools; this file calls them. WHAT IS HuggingFace? HuggingFace is a platform that hosts AI models and lets you run them via their API. This app uses their "Inference API" to run the Qwen2.5 language model without needing to host it ourselves. ENDPOINTS (URLs this server responds to): POST /ask Send a question, get an answer Request: { "question": "What's the temp in Tokyo?" } Response: { "answer": "It's currently 72°F and sunny in Tokyo..." } GET /health Just checks that the server is running Response: { "status": "ok" } ENVIRONMENT VARIABLES (settings loaded from the environment, not hardcoded): HF_TOKEN Your HuggingFace API token — needed to use the AI model MCP_SERVER_URL Where the weather tool server is running (defaults to http://localhost:8000/sse) """ import asyncio import json import os import time from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException from huggingface_hub import InferenceClient from mcp import ClientSession from mcp.client.sse import sse_client from pydantic import BaseModel # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- # The AI model we're using. Qwen2.5 is an open-source language model made by # Alibaba, hosted for free on HuggingFace. MODEL = "Qwen/Qwen2.5-7B-Instruct" # Your HuggingFace token, loaded from an environment variable (not hardcoded # here for security reasons). HF_TOKEN = os.environ.get("HF_TOKEN") # The URL of the weather tool server (weather_mcp_server.py). SSE (Server-Sent # Events) is the communication protocol they use to talk to each other. MCP_SERVER_URL = os.environ.get("MCP_SERVER_URL", "http://localhost:8000/sse") # This is the instruction we give the AI at the start of every conversation. # It tells the AI what its job is and how to behave. SYSTEM_PROMPT = ( "You are a helpful assistant with access to real-time weather data. " "When the user asks about temperature or weather, always use the " "get_current_temperature tool to fetch live data before answering. " "Present results conversationally. If the user asks Any other question, tell them you only 'answer weather questions' And refuse to answer the question " ) # --------------------------------------------------------------------------- # Startup: wait for the weather tool server to be ready # --------------------------------------------------------------------------- # This function runs when the web server first starts up. FastAPI calls it # automatically before accepting any requests. @asynccontextmanager async def lifespan(app: FastAPI): print("Waiting for MCP server to be ready...", flush=True) # Try up to 20 times (once per second) to connect to the weather tool # server. Both servers start at the same time, so this gives the tool # server time to finish booting before we start accepting requests. for _ in range(20): try: async with sse_client(MCP_SERVER_URL) as (read, write): async with ClientSession(read, write) as session: await session.initialize() tools = await session.list_tools() print( f"MCP ready — {len(tools.tools)} tool(s) available", flush=True ) break except Exception: await asyncio.sleep(1) # Wait 1 second before trying again else: print("WARNING: MCP server did not become ready in time", flush=True) yield # Hand control back to FastAPI — start accepting requests # Create the FastAPI web server instance. app = FastAPI(title="Weather via MCP + Qwen2.5", lifespan=lifespan) # --------------------------------------------------------------------------- # Functions for talking to the weather tool server (MCP) # --------------------------------------------------------------------------- async def fetch_mcp_tools() -> list[dict]: """ Ask the weather tool server what tools it has available. Returns a list of tool definitions formatted the way the AI model expects them — including the tool's name, what it does, and what arguments it takes. """ async with sse_client(MCP_SERVER_URL) as (read, write): async with ClientSession(read, write) as session: await session.initialize() tools_result = await session.list_tools() # Reformat each tool into the structure the AI model expects return [ { "type": "function", "function": { "name": t.name, "description": t.description, "parameters": t.inputSchema, }, } for t in tools_result.tools ] async def call_mcp_tool(tool_name: str, tool_args: dict) -> str: """ Call a specific tool on the weather tool server and return its result. For example: call_mcp_tool("get_current_temperature", {"location": "Tokyo"}) connects to weather_mcp_server.py, runs that function, and returns the weather data as a string. """ async with sse_client(MCP_SERVER_URL) as (read, write): async with ClientSession(read, write) as session: await session.initialize() result = await session.call_tool(tool_name, tool_args) # The result may contain multiple content blocks; join them into one string return "\n".join( block.text for block in result.content if hasattr(block, "text") ) # --------------------------------------------------------------------------- # The agentic loop — the core logic of this app # --------------------------------------------------------------------------- async def run(user_message: str) -> str: """ Send a user's question to the AI and return its final answer. This is called an "agentic loop" because the AI can go back and forth multiple times — asking for tool results, getting them, then deciding whether to ask for more or give a final answer. Typical flow: 1. We send: [system prompt] + [user question] + [available tools] 2. AI responds: "I need to call get_current_temperature for Tokyo" 3. We call that tool and get the weather data 4. We send the weather data back to the AI 5. AI responds with a natural language answer — we return that """ # Create a client that can talk to HuggingFace's AI inference API client = InferenceClient(model=MODEL, token=HF_TOKEN) # Get the list of available tools from the weather server tools = await fetch_mcp_tools() # Build the conversation history. The AI sees this as a back-and-forth chat. # "system" sets the AI's behavior; "user" is the human's message. messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_message}, ] # Send the conversation to the AI model and get its first response response = client.chat.completions.create( messages=messages, tools=tools, # Tell the AI what tools it can use tool_choice="auto", # Let the AI decide whether to use a tool max_tokens=512, # Maximum length of the AI's response temperature=0.2, # Low temperature = more focused, less random responses ) choice = response.choices[0] assistant_msg = choice.message # If the AI wants to call a tool, handle it and loop back while choice.finish_reason == "tool_calls" and assistant_msg.tool_calls: # Add the AI's tool request to the conversation history messages.append( { "role": "assistant", "content": assistant_msg.content or "", "tool_calls": [ { "id": tc.id, "type": "function", "function": { "name": tc.function.name, "arguments": tc.function.arguments, }, } for tc in assistant_msg.tool_calls ], } ) # Execute each tool the AI requested and add results to the conversation for tc in assistant_msg.tool_calls: fn_name = tc.function.name fn_args = json.loads( tc.function.arguments ) # Parse the JSON arguments string tool_result = await call_mcp_tool(fn_name, fn_args) messages.append( { "role": "tool", "tool_call_id": tc.id, # Links this result to the specific tool call "content": tool_result, } ) # Send the updated conversation (now including tool results) back to the AI response = client.chat.completions.create( messages=messages, tools=tools, tool_choice="auto", max_tokens=512, temperature=0.2, ) choice = response.choices[0] assistant_msg = choice.message # The AI has finished — return its final text response return assistant_msg.content or "(no response)" # --------------------------------------------------------------------------- # Request/response shapes # --------------------------------------------------------------------------- # These classes define the exact structure of data that goes in and out of # the /ask endpoint. FastAPI uses them to validate requests and format responses. class AskRequest(BaseModel): question: str # The user's weather question class AskResponse(BaseModel): answer: str # The AI's response # --------------------------------------------------------------------------- # Endpoints (URLs the server responds to) # --------------------------------------------------------------------------- @app.get("/health") async def health(): """Simple health check — returns OK if the server is running.""" return {"status": "ok"} @app.post("/ask", response_model=AskResponse) async def ask(req: AskRequest): """ Main endpoint — accepts a weather question and returns an AI-generated answer. Example request: POST /ask { "question": "What's the temperature in Paris?" } Example response: { "answer": "It's currently 58°F and partly cloudy in Paris, France." } """ if not req.question.strip(): raise HTTPException(status_code=400, detail="question must not be empty") answer = await run(req.question) return AskResponse(answer=answer) # --------------------------------------------------------------------------- # Entry point (only used when running locally, not in Docker) # --------------------------------------------------------------------------- # This block only runs if you start the file directly: `python api.py` # In Docker/HuggingFace Spaces, supervisord starts uvicorn directly instead. if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)