dispatchAI-API / api /mcp_server.py
3morixd's picture
Upload api/mcp_server.py with huggingface_hub
5233b57 verified
Raw
History Blame Contribute Delete
6.38 kB
"""
dispatchAI MCP Server - gives the agent tools to interact with the phone farm API.
This is a simple MCP server that exposes the dispatchAI API as tools.
"""
import json
import httpx
import asyncio
from typing import Any
# MCP server using the simple JSON-RPC protocol
# This runs as a subprocess that Hermes communicates with via stdio
BASE_URL = "http://127.0.0.1:8081"
API_KEY = "da-demo-key-0001"
TOOLS = [
{
"name": "chat_completion",
"description": "Send a chat completion request to a dispatchAI model running on a phone. Returns the model's response.",
"inputSchema": {
"type": "object",
"properties": {
"message": {"type": "string", "description": "The user message to send"},
"model": {"type": "string", "description": "Model name (default: SmolLM2-135M)", "default": "dispatchAI/SmolLM2-135M-Instruct-mobile"},
"max_tokens": {"type": "integer", "description": "Max tokens to generate", "default": 100},
},
"required": ["message"],
},
},
{
"name": "list_models",
"description": "List all available dispatchAI models",
"inputSchema": {"type": "object", "properties": {}},
},
{
"name": "get_phone_status",
"description": "Get the status of the phone farm - how many phones are connected and their load",
"inputSchema": {"type": "object", "properties": {}},
},
{
"name": "get_api_usage",
"description": "Get API usage statistics for the current API key",
"inputSchema": {"type": "object", "properties": {}},
},
]
async def call_tool(name: str, arguments: dict) -> str:
"""Call a tool and return the result."""
async with httpx.AsyncClient(timeout=120.0) as client:
if name == "chat_completion":
model = arguments.get("model", "dispatchAI/SmolLM2-135M-Instruct-mobile")
message = arguments.get("message", "")
max_tokens = arguments.get("max_tokens", 100)
r = await client.post(
f"{BASE_URL}/v1/chat/completions",
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {API_KEY}",
},
json={
"model": model,
"messages": [{"role": "user", "content": message}],
"max_tokens": max_tokens,
},
)
if r.status_code == 200:
data = r.json()
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
phone = data.get("phone_info", {})
usage = data.get("usage", {})
return json.dumps({
"response": content,
"phone": phone.get("serial", "unknown"),
"speed_tps": phone.get("generation_tps", 0),
"tokens_used": usage.get("total_tokens", 0),
})
else:
return f"Error: {r.status_code} - {r.text[:200]}"
elif name == "list_models":
r = await client.get(
f"{BASE_URL}/v1/models",
headers={"Authorization": f"Bearer {API_KEY}"},
)
if r.status_code == 200:
data = r.json()
models = [m["id"] for m in data.get("data", [])]
return json.dumps({"models": models})
return f"Error: {r.status_code}"
elif name == "get_phone_status":
r = await client.get(
f"{BASE_URL}/admin/phones",
headers={"Authorization": f"Bearer {API_KEY}"},
)
if r.status_code == 200:
return json.dumps(r.json())
return f"Error: {r.status_code}"
elif name == "get_api_usage":
r = await client.get(
f"{BASE_URL}/v1/usage",
headers={"Authorization": f"Bearer {API_KEY}"},
)
if r.status_code == 200:
return json.dumps(r.json())
return f"Error: {r.status_code}"
return f"Unknown tool: {name}"
# Simple MCP JSON-RPC server over stdio
import sys
def main():
"""Main MCP server loop - reads JSON-RPC from stdin, writes to stdout."""
while True:
try:
line = sys.stdin.readline()
if not line:
break
req = json.loads(line)
method = req.get("method", "")
req_id = req.get("id")
if method == "initialize":
resp = {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {"name": "dispatchai-mcp", "version": "1.0.0"},
},
}
sys.stdout.write(json.dumps(resp) + "\n")
sys.stdout.flush()
elif method == "tools/list":
resp = {
"jsonrpc": "2.0",
"id": req_id,
"result": {"tools": TOOLS},
}
sys.stdout.write(json.dumps(resp) + "\n")
sys.stdout.flush()
elif method == "tools/call":
params = req.get("params", {})
tool_name = params.get("name", "")
arguments = params.get("arguments", {})
result = asyncio.run(call_tool(tool_name, arguments))
resp = {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [{"type": "text", "text": result}]
},
}
sys.stdout.write(json.dumps(resp) + "\n")
sys.stdout.flush()
except Exception as e:
if req_id:
resp = {
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": -32603, "message": str(e)},
}
sys.stdout.write(json.dumps(resp) + "\n")
sys.stdout.flush()
if __name__ == "__main__":
main()