| | """
|
| | Durable Objects integration for OpenManus
|
| | Provides interface to Cloudflare Durable Objects operations
|
| | """
|
| |
|
| | import json
|
| | import time
|
| | from typing import Any, Dict, List, Optional
|
| |
|
| | from app.logger import logger
|
| |
|
| | from .client import CloudflareClient, CloudflareError
|
| |
|
| |
|
| | class DurableObjects:
|
| | """Cloudflare Durable Objects client"""
|
| |
|
| | def __init__(self, client: CloudflareClient):
|
| | self.client = client
|
| |
|
| | async def create_agent_session(
|
| | self, session_id: str, user_id: str, metadata: Optional[Dict[str, Any]] = None
|
| | ) -> Dict[str, Any]:
|
| | """Create a new agent session"""
|
| |
|
| | session_data = {
|
| | "sessionId": session_id,
|
| | "userId": user_id,
|
| | "metadata": metadata or {},
|
| | }
|
| |
|
| | try:
|
| | response = await self.client.post(
|
| | f"do/agent/{session_id}/start", data=session_data, use_worker=True
|
| | )
|
| |
|
| | return {
|
| | "success": True,
|
| | "session_id": session_id,
|
| | "user_id": user_id,
|
| | **response,
|
| | }
|
| |
|
| | except CloudflareError as e:
|
| | logger.error(f"Failed to create agent session: {e}")
|
| | raise
|
| |
|
| | async def get_agent_session_status(self, session_id: str) -> Dict[str, Any]:
|
| | """Get agent session status"""
|
| |
|
| | try:
|
| | response = await self.client.get(
|
| | f"do/agent/{session_id}/status?sessionId={session_id}", use_worker=True
|
| | )
|
| |
|
| | return response
|
| |
|
| | except CloudflareError as e:
|
| | logger.error(f"Failed to get agent session status: {e}")
|
| | raise
|
| |
|
| | async def update_agent_session(
|
| | self, session_id: str, updates: Dict[str, Any]
|
| | ) -> Dict[str, Any]:
|
| | """Update agent session"""
|
| |
|
| | update_data = {"sessionId": session_id, "updates": updates}
|
| |
|
| | try:
|
| | response = await self.client.post(
|
| | f"do/agent/{session_id}/update", data=update_data, use_worker=True
|
| | )
|
| |
|
| | return {"success": True, "session_id": session_id, **response}
|
| |
|
| | except CloudflareError as e:
|
| | logger.error(f"Failed to update agent session: {e}")
|
| | raise
|
| |
|
| | async def stop_agent_session(self, session_id: str) -> Dict[str, Any]:
|
| | """Stop agent session"""
|
| |
|
| | try:
|
| | response = await self.client.post(
|
| | f"do/agent/{session_id}/stop",
|
| | data={"sessionId": session_id},
|
| | use_worker=True,
|
| | )
|
| |
|
| | return {"success": True, "session_id": session_id, **response}
|
| |
|
| | except CloudflareError as e:
|
| | logger.error(f"Failed to stop agent session: {e}")
|
| | raise
|
| |
|
| | async def add_agent_message(
|
| | self, session_id: str, message: Dict[str, Any]
|
| | ) -> Dict[str, Any]:
|
| | """Add a message to agent session"""
|
| |
|
| | message_data = {
|
| | "sessionId": session_id,
|
| | "message": {"timestamp": int(time.time()), **message},
|
| | }
|
| |
|
| | try:
|
| | response = await self.client.post(
|
| | f"do/agent/{session_id}/messages", data=message_data, use_worker=True
|
| | )
|
| |
|
| | return {"success": True, "session_id": session_id, **response}
|
| |
|
| | except CloudflareError as e:
|
| | logger.error(f"Failed to add agent message: {e}")
|
| | raise
|
| |
|
| | async def get_agent_messages(
|
| | self, session_id: str, limit: int = 50, offset: int = 0
|
| | ) -> Dict[str, Any]:
|
| | """Get agent session messages"""
|
| |
|
| | try:
|
| | response = await self.client.get(
|
| | f"do/agent/{session_id}/messages?sessionId={session_id}&limit={limit}&offset={offset}",
|
| | use_worker=True,
|
| | )
|
| |
|
| | return response
|
| |
|
| | except CloudflareError as e:
|
| | logger.error(f"Failed to get agent messages: {e}")
|
| | raise
|
| |
|
| |
|
| | async def join_chat_room(
|
| | self,
|
| | room_id: str,
|
| | user_id: str,
|
| | username: str,
|
| | room_config: Optional[Dict[str, Any]] = None,
|
| | ) -> Dict[str, Any]:
|
| | """Join a chat room"""
|
| |
|
| | join_data = {
|
| | "userId": user_id,
|
| | "username": username,
|
| | "roomConfig": room_config or {},
|
| | }
|
| |
|
| | try:
|
| | response = await self.client.post(
|
| | f"do/chat/{room_id}/join", data=join_data, use_worker=True
|
| | )
|
| |
|
| | return {"success": True, "room_id": room_id, "user_id": user_id, **response}
|
| |
|
| | except CloudflareError as e:
|
| | logger.error(f"Failed to join chat room: {e}")
|
| | raise
|
| |
|
| | async def leave_chat_room(self, room_id: str, user_id: str) -> Dict[str, Any]:
|
| | """Leave a chat room"""
|
| |
|
| | leave_data = {"userId": user_id}
|
| |
|
| | try:
|
| | response = await self.client.post(
|
| | f"do/chat/{room_id}/leave", data=leave_data, use_worker=True
|
| | )
|
| |
|
| | return {"success": True, "room_id": room_id, "user_id": user_id, **response}
|
| |
|
| | except CloudflareError as e:
|
| | logger.error(f"Failed to leave chat room: {e}")
|
| | raise
|
| |
|
| | async def get_chat_room_info(self, room_id: str) -> Dict[str, Any]:
|
| | """Get chat room information"""
|
| |
|
| | try:
|
| | response = await self.client.get(f"do/chat/{room_id}/info", use_worker=True)
|
| |
|
| | return response
|
| |
|
| | except CloudflareError as e:
|
| | logger.error(f"Failed to get chat room info: {e}")
|
| | raise
|
| |
|
| | async def send_chat_message(
|
| | self,
|
| | room_id: str,
|
| | user_id: str,
|
| | username: str,
|
| | content: str,
|
| | message_type: str = "text",
|
| | ) -> Dict[str, Any]:
|
| | """Send a message to chat room"""
|
| |
|
| | message_data = {
|
| | "userId": user_id,
|
| | "username": username,
|
| | "content": content,
|
| | "messageType": message_type,
|
| | }
|
| |
|
| | try:
|
| | response = await self.client.post(
|
| | f"do/chat/{room_id}/messages", data=message_data, use_worker=True
|
| | )
|
| |
|
| | return {"success": True, "room_id": room_id, **response}
|
| |
|
| | except CloudflareError as e:
|
| | logger.error(f"Failed to send chat message: {e}")
|
| | raise
|
| |
|
| | async def get_chat_messages(
|
| | self, room_id: str, limit: int = 50, offset: int = 0
|
| | ) -> Dict[str, Any]:
|
| | """Get chat room messages"""
|
| |
|
| | try:
|
| | response = await self.client.get(
|
| | f"do/chat/{room_id}/messages?limit={limit}&offset={offset}",
|
| | use_worker=True,
|
| | )
|
| |
|
| | return response
|
| |
|
| | except CloudflareError as e:
|
| | logger.error(f"Failed to get chat messages: {e}")
|
| | raise
|
| |
|
| | async def get_chat_participants(self, room_id: str) -> Dict[str, Any]:
|
| | """Get chat room participants"""
|
| |
|
| | try:
|
| | response = await self.client.get(
|
| | f"do/chat/{room_id}/participants", use_worker=True
|
| | )
|
| |
|
| | return response
|
| |
|
| | except CloudflareError as e:
|
| | logger.error(f"Failed to get chat participants: {e}")
|
| | raise
|
| |
|
| |
|
| | def get_agent_websocket_url(self, session_id: str, user_id: str) -> str:
|
| | """Get WebSocket URL for agent session"""
|
| |
|
| | if not self.client.worker_url:
|
| | raise CloudflareError("Worker URL not configured")
|
| |
|
| | base_url = self.client.worker_url.replace("https://", "wss://").replace(
|
| | "http://", "ws://"
|
| | )
|
| | return (
|
| | f"{base_url}/do/agent/{session_id}?sessionId={session_id}&userId={user_id}"
|
| | )
|
| |
|
| | def get_chat_websocket_url(self, room_id: str, user_id: str, username: str) -> str:
|
| | """Get WebSocket URL for chat room"""
|
| |
|
| | if not self.client.worker_url:
|
| | raise CloudflareError("Worker URL not configured")
|
| |
|
| | base_url = self.client.worker_url.replace("https://", "wss://").replace(
|
| | "http://", "ws://"
|
| | )
|
| | return f"{base_url}/do/chat/{room_id}?userId={user_id}&username={username}"
|
| |
|
| |
|
| | class DurableObjectsWebSocket:
|
| | """Helper class for WebSocket connections to Durable Objects"""
|
| |
|
| | def __init__(self, url: str):
|
| | self.url = url
|
| | self.websocket = None
|
| | self.connected = False
|
| | self.message_handlers = {}
|
| |
|
| | async def connect(self):
|
| | """Connect to WebSocket"""
|
| | try:
|
| | import websockets
|
| |
|
| | self.websocket = await websockets.connect(self.url)
|
| | self.connected = True
|
| | logger.info(f"Connected to Durable Object WebSocket: {self.url}")
|
| |
|
| |
|
| | import asyncio
|
| |
|
| | asyncio.create_task(self._message_loop())
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Failed to connect to WebSocket: {e}")
|
| | raise CloudflareError(f"WebSocket connection failed: {e}")
|
| |
|
| | async def disconnect(self):
|
| | """Disconnect from WebSocket"""
|
| | if self.websocket and self.connected:
|
| | await self.websocket.close()
|
| | self.connected = False
|
| | logger.info("Disconnected from Durable Object WebSocket")
|
| |
|
| | async def send_message(self, message_type: str, payload: Dict[str, Any]):
|
| | """Send message via WebSocket"""
|
| | if not self.connected or not self.websocket:
|
| | raise CloudflareError("WebSocket not connected")
|
| |
|
| | message = {
|
| | "type": message_type,
|
| | "payload": payload,
|
| | "timestamp": int(time.time()),
|
| | }
|
| |
|
| | try:
|
| | await self.websocket.send(json.dumps(message))
|
| | except Exception as e:
|
| | logger.error(f"Failed to send WebSocket message: {e}")
|
| | raise CloudflareError(f"Failed to send message: {e}")
|
| |
|
| | def add_message_handler(self, message_type: str, handler):
|
| | """Add a message handler for specific message types"""
|
| | if message_type not in self.message_handlers:
|
| | self.message_handlers[message_type] = []
|
| | self.message_handlers[message_type].append(handler)
|
| |
|
| | async def _message_loop(self):
|
| | """Handle incoming WebSocket messages"""
|
| | try:
|
| | async for message in self.websocket:
|
| | try:
|
| | data = json.loads(message)
|
| | message_type = data.get("type")
|
| |
|
| | if message_type in self.message_handlers:
|
| | for handler in self.message_handlers[message_type]:
|
| | try:
|
| | if callable(handler):
|
| | if asyncio.iscoroutinefunction(handler):
|
| | await handler(data)
|
| | else:
|
| | handler(data)
|
| | except Exception as e:
|
| | logger.error(f"Message handler error: {e}")
|
| |
|
| | except json.JSONDecodeError as e:
|
| | logger.error(f"Failed to parse WebSocket message: {e}")
|
| | except Exception as e:
|
| | logger.error(f"WebSocket message processing error: {e}")
|
| |
|
| | except Exception as e:
|
| | logger.error(f"WebSocket message loop error: {e}")
|
| | self.connected = False
|
| |
|
| |
|
| | async def __aenter__(self):
|
| | await self.connect()
|
| | return self
|
| |
|
| | async def __aexit__(self, exc_type, exc_val, exc_tb):
|
| | await self.disconnect()
|
| |
|