| | import streamlit as st |
| | import openai |
| | import requests |
| | import json |
| | import asyncio |
| | import aiohttp |
| | from typing import Dict, Any, List |
| | from datetime import datetime |
| | import os |
| |
|
| | |
| | st.set_page_config( |
| | page_title="AI Assistant with SAP & News Integration", |
| | page_icon="π€", |
| | layout="wide" |
| | ) |
| |
|
| | |
| | st.markdown(""" |
| | <style> |
| | .main-header { |
| | font-size: 2.5rem; |
| | font-weight: bold; |
| | text-align: center; |
| | color: #1f77b4; |
| | margin-bottom: 2rem; |
| | } |
| | .chat-message { |
| | padding: 1rem; |
| | border-radius: 0.5rem; |
| | margin: 0.5rem 0; |
| | } |
| | .user-message { |
| | background-color: #e3f2fd; |
| | border-left: 4px solid #2196f3; |
| | } |
| | .assistant-message { |
| | background-color: #f5f5f5; |
| | border-left: 4px solid #4caf50; |
| | } |
| | .tool-result { |
| | background-color: #fff3e0; |
| | border: 1px solid #ff9800; |
| | border-radius: 0.5rem; |
| | padding: 1rem; |
| | margin: 1rem 0; |
| | } |
| | .error-message { |
| | background-color: #ffebee; |
| | border: 1px solid #f44336; |
| | border-radius: 0.5rem; |
| | padding: 1rem; |
| | margin: 1rem 0; |
| | } |
| | </style> |
| | """, unsafe_allow_html=True) |
| |
|
| | class MCPClient: |
| | """MCP Client for communicating with the MCP server""" |
| | |
| | def __init__(self, server_url: str): |
| | self.server_url = server_url.rstrip('/') |
| | self.session = None |
| | |
| | async def initialize_session(self): |
| | """Initialize aiohttp session""" |
| | if not self.session: |
| | self.session = aiohttp.ClientSession() |
| | |
| | async def close_session(self): |
| | """Close aiohttp session""" |
| | if self.session: |
| | await self.session.close() |
| | self.session = None |
| | |
| | async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Dict[str, Any]: |
| | """Call a tool on the MCP server""" |
| | if arguments is None: |
| | arguments = {} |
| | |
| | await self.initialize_session() |
| | |
| | mcp_request = { |
| | "jsonrpc": "2.0", |
| | "id": 1, |
| | "method": "tools/call", |
| | "params": { |
| | "name": tool_name, |
| | "arguments": arguments |
| | } |
| | } |
| | |
| | try: |
| | async with self.session.post( |
| | f"{self.server_url}/mcp", |
| | json=mcp_request, |
| | headers={"Content-Type": "application/json"} |
| | ) as response: |
| | if response.status == 200: |
| | result = await response.json() |
| | if "result" in result and "content" in result["result"]: |
| | |
| | content = result["result"]["content"][0]["text"] |
| | return json.loads(content) |
| | return result |
| | else: |
| | return { |
| | "success": False, |
| | "error": f"HTTP {response.status}: {await response.text()}" |
| | } |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "error": f"Connection error: {str(e)}" |
| | } |
| | |
| | async def list_tools(self) -> List[Dict[str, Any]]: |
| | """List available tools on the MCP server""" |
| | await self.initialize_session() |
| | |
| | mcp_request = { |
| | "jsonrpc": "2.0", |
| | "id": 1, |
| | "method": "tools/list" |
| | } |
| | |
| | try: |
| | async with self.session.post( |
| | f"{self.server_url}/mcp", |
| | json=mcp_request, |
| | headers={"Content-Type": "application/json"} |
| | ) as response: |
| | if response.status == 200: |
| | result = await response.json() |
| | return result.get("result", {}).get("tools", []) |
| | return [] |
| | except Exception as e: |
| | st.error(f"Error listing tools: {str(e)}") |
| | return [] |
| |
|
| | class AIAssistant: |
| | """AI Assistant with MCP integration""" |
| | |
| | def __init__(self, openai_api_key: str, mcp_client: MCPClient): |
| | self.openai_client = openai.OpenAI(api_key=openai_api_key) |
| | self.mcp_client = mcp_client |
| | self.available_tools = [] |
| | |
| | async def initialize(self): |
| | """Initialize the assistant by fetching available tools""" |
| | self.available_tools = await self.mcp_client.list_tools() |
| | |
| | def get_system_prompt(self) -> str: |
| | """Generate system prompt with available tools""" |
| | tools_description = "\n".join([ |
| | f"- {tool['name']}: {tool['description']}" |
| | for tool in self.available_tools |
| | ]) |
| | |
| | return f"""You are an AI assistant with access to SAP business systems and news data through specialized tools. |
| | |
| | Available tools: |
| | {tools_description} |
| | |
| | When a user asks for information that can be retrieved using these tools, you should: |
| | 1. Identify which tool(s) would be helpful |
| | 2. Call the appropriate tool(s) with the right parameters |
| | 3. Interpret and present the results in a user-friendly way |
| | |
| | For SAP-related queries (purchase orders, requisitions), use the SAP tools. |
| | For news-related queries, use the news tools. |
| | |
| | Always explain what you're doing and present results clearly. If a tool call fails, explain the error and suggest alternatives. |
| | |
| | You can call tools by responding with: CALL_TOOL: tool_name(parameter1=value1, parameter2=value2) |
| | """ |
| | |
| | def extract_tool_calls(self, response: str) -> List[Dict[str, Any]]: |
| | """Extract tool calls from AI response""" |
| | tool_calls = [] |
| | lines = response.split('\n') |
| | |
| | for line in lines: |
| | if line.strip().startswith('CALL_TOOL:'): |
| | try: |
| | |
| | tool_part = line.strip()[10:].strip() |
| | |
| | if '(' in tool_part and ')' in tool_part: |
| | tool_name = tool_part.split('(')[0].strip() |
| | params_str = tool_part.split('(')[1].split(')')[0] |
| | |
| | |
| | params = {} |
| | if params_str.strip(): |
| | for param in params_str.split(','): |
| | if '=' in param: |
| | key, value = param.split('=', 1) |
| | key = key.strip() |
| | value = value.strip().strip('"\'') |
| | |
| | try: |
| | if value.isdigit(): |
| | value = int(value) |
| | elif value.lower() in ['true', 'false']: |
| | value = value.lower() == 'true' |
| | except: |
| | pass |
| | params[key] = value |
| | |
| | tool_calls.append({ |
| | 'name': tool_name, |
| | 'arguments': params |
| | }) |
| | except Exception as e: |
| | st.error(f"Error parsing tool call: {e}") |
| | |
| | return tool_calls |
| | |
| | async def process_message(self, user_message: str) -> str: |
| | """Process user message and handle tool calls""" |
| | try: |
| | |
| | messages = [ |
| | {"role": "system", "content": self.get_system_prompt()}, |
| | {"role": "user", "content": user_message} |
| | ] |
| | |
| | response = self.openai_client.chat.completions.create( |
| | model="gpt-3.5-turbo", |
| | messages=messages, |
| | temperature=0.7, |
| | max_tokens=1000 |
| | ) |
| | |
| | ai_response = response.choices[0].message.content |
| | |
| | |
| | tool_calls = self.extract_tool_calls(ai_response) |
| | |
| | if tool_calls: |
| | tool_results = [] |
| | |
| | for tool_call in tool_calls: |
| | st.info(f"π§ Calling tool: {tool_call['name']} with parameters: {tool_call['arguments']}") |
| | |
| | result = await self.mcp_client.call_tool( |
| | tool_call['name'], |
| | tool_call['arguments'] |
| | ) |
| | |
| | tool_results.append({ |
| | 'tool': tool_call['name'], |
| | 'result': result |
| | }) |
| | |
| | |
| | if result.get('success'): |
| | st.success(f"β
Tool {tool_call['name']} executed successfully") |
| | with st.expander(f"π {tool_call['name']} Results", expanded=False): |
| | st.json(result) |
| | else: |
| | st.error(f"β Tool {tool_call['name']} failed: {result.get('error', 'Unknown error')}") |
| | |
| | |
| | tool_results_text = "\n\n".join([ |
| | f"Tool: {tr['tool']}\nResult: {json.dumps(tr['result'], indent=2)}" |
| | for tr in tool_results |
| | ]) |
| | |
| | final_messages = messages + [ |
| | {"role": "assistant", "content": ai_response}, |
| | {"role": "user", "content": f"Here are the tool results:\n\n{tool_results_text}\n\nPlease interpret these results and provide a helpful response to the user."} |
| | ] |
| | |
| | final_response = self.openai_client.chat.completions.create( |
| | model="gpt-3.5-turbo", |
| | messages=final_messages, |
| | temperature=0.7, |
| | max_tokens=1000 |
| | ) |
| | |
| | return final_response.choices[0].message.content |
| | |
| | else: |
| | return ai_response |
| | |
| | except Exception as e: |
| | return f"β Error processing your request: {str(e)}" |
| |
|
| | |
| | def main(): |
| | st.markdown('<h1 class="main-header">π€ AI Assistant with SAP & News Integration</h1>', unsafe_allow_html=True) |
| | |
| | |
| | with st.sidebar: |
| | st.header("βοΈ Configuration") |
| | |
| | |
| | openai_api_key = st.text_input( |
| | "OpenAI API Key", |
| | type="password", |
| | help="Enter your OpenAI API key" |
| | ) |
| | |
| | |
| | mcp_server_url = st.text_input( |
| | "MCP Server URL", |
| | value="https://your-ngrok-url.ngrok.io", |
| | help="Enter your ngrok URL where the MCP server is running" |
| | ) |
| | |
| | |
| | if st.button("π Test MCP Connection"): |
| | if mcp_server_url: |
| | try: |
| | response = requests.get(f"{mcp_server_url.rstrip('/')}/health", timeout=10) |
| | if response.status_code == 200: |
| | st.success("β
MCP Server connected successfully!") |
| | st.json(response.json()) |
| | else: |
| | st.error(f"β Connection failed: HTTP {response.status_code}") |
| | except Exception as e: |
| | st.error(f"β Connection error: {str(e)}") |
| | else: |
| | st.error("Please enter MCP Server URL") |
| | |
| | st.markdown("---") |
| | st.markdown("### π Available Commands") |
| | st.markdown(""" |
| | - **SAP Purchase Orders**: "Show me recent purchase orders" |
| | - **SAP Requisitions**: "Get purchase requisitions" |
| | - **News Headlines**: "What's the latest tech news?" |
| | - **News by Source**: "Get news from BBC" |
| | """) |
| | |
| | |
| | if not openai_api_key: |
| | st.warning("β οΈ Please enter your OpenAI API key in the sidebar to continue.") |
| | return |
| | |
| | if not mcp_server_url or mcp_server_url == "https://your-ngrok-url.ngrok.io": |
| | st.warning("β οΈ Please enter your MCP server URL in the sidebar.") |
| | return |
| | |
| | |
| | if 'messages' not in st.session_state: |
| | st.session_state.messages = [] |
| | |
| | if 'assistant' not in st.session_state: |
| | mcp_client = MCPClient(mcp_server_url) |
| | st.session_state.assistant = AIAssistant(openai_api_key, mcp_client) |
| | |
| | |
| | async def init_assistant(): |
| | await st.session_state.assistant.initialize() |
| | |
| | try: |
| | asyncio.run(init_assistant()) |
| | st.success("π AI Assistant initialized successfully!") |
| | except Exception as e: |
| | st.error(f"β Failed to initialize assistant: {str(e)}") |
| | return |
| | |
| | |
| | for message in st.session_state.messages: |
| | with st.chat_message(message["role"]): |
| | st.markdown(message["content"]) |
| | |
| | |
| | if prompt := st.chat_input("Ask me about SAP data, news, or anything else..."): |
| | |
| | st.session_state.messages.append({"role": "user", "content": prompt}) |
| | |
| | with st.chat_message("user"): |
| | st.markdown(prompt) |
| | |
| | |
| | with st.chat_message("assistant"): |
| | with st.spinner("π€ Thinking and processing..."): |
| | try: |
| | response = asyncio.run( |
| | st.session_state.assistant.process_message(prompt) |
| | ) |
| | st.markdown(response) |
| | |
| | |
| | st.session_state.messages.append({"role": "assistant", "content": response}) |
| | |
| | except Exception as e: |
| | error_msg = f"β Sorry, I encountered an error: {str(e)}" |
| | st.error(error_msg) |
| | st.session_state.messages.append({"role": "assistant", "content": error_msg}) |
| | |
| | |
| | st.markdown("---") |
| | st.markdown( |
| | "π‘ **Tip**: Try asking about purchase orders, requisitions, or latest news. " |
| | "The AI will automatically use the appropriate tools to fetch the data." |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | main() |