| import json |
| import logging |
| import asyncio |
| from flask import Flask, request, Response, stream_with_context, jsonify |
| import httpx |
| import time |
| from dotenv import load_dotenv |
| import os |
| import ast |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| logging.getLogger("httpx").setLevel(logging.WARNING) |
|
|
| |
| load_dotenv() |
|
|
| |
| VALID_API_KEYS = [key.strip() for key in os.getenv("VALID_API_KEYS", "").split(",") if key] |
|
|
| |
| |
| |
| CONVERSATION_MEMORY_MODE = int(os.getenv('CONVERSATION_MEMORY_MODE', '1')) |
|
|
| class DifyModelManager: |
| def __init__(self): |
| self.api_keys = [] |
| self.name_to_api_key = {} |
| self.api_key_to_name = {} |
| self.load_api_keys() |
|
|
| def load_api_keys(self): |
| """从环境变量加载API Keys""" |
| api_keys_str = os.getenv('DIFY_API_KEYS', '') |
| if api_keys_str: |
| self.api_keys = [key.strip() for key in api_keys_str.split(',') if key.strip()] |
| logger.info(f"Loaded {len(self.api_keys)} API keys") |
|
|
| async def fetch_app_info(self, api_key): |
| """获取Dify应用信息""" |
| try: |
| async with httpx.AsyncClient() as client: |
| headers = { |
| "Authorization": f"Bearer {api_key}", |
| "Content-Type": "application/json" |
| } |
| response = await client.get( |
| f"{DIFY_API_BASE}/info", |
| headers=headers, |
| params={"user": "default_user"} |
| ) |
| |
| if response.status_code == 200: |
| app_info = response.json() |
| return app_info.get("name", "Unknown App") |
| else: |
| logger.error(f"Failed to fetch app info for API key: {api_key[:8]}...") |
| return None |
| except Exception as e: |
| logger.error(f"Error fetching app info: {str(e)}") |
| return None |
|
|
| async def refresh_model_info(self): |
| """刷新所有应用信息""" |
| self.name_to_api_key.clear() |
| self.api_key_to_name.clear() |
| |
| for api_key in self.api_keys: |
| app_name = await self.fetch_app_info(api_key) |
| if app_name: |
| self.name_to_api_key[app_name] = api_key |
| self.api_key_to_name[api_key] = app_name |
| logger.info(f"Mapped app '{app_name}' to API key: {api_key[:8]}...") |
|
|
| def get_api_key(self, model_name): |
| """根据模型名称获取API Key""" |
| return self.name_to_api_key.get(model_name) |
|
|
| def get_available_models(self): |
| """获取可用模型列表""" |
| return [ |
| { |
| "id": name, |
| "object": "model", |
| "created": int(time.time()), |
| "owned_by": "dify" |
| } |
| for name in self.name_to_api_key.keys() |
| ] |
|
|
| |
| model_manager = DifyModelManager() |
|
|
| |
| DIFY_API_BASE = os.getenv("DIFY_API_BASE", "") |
|
|
| app = Flask(__name__) |
|
|
| def get_api_key(model_name): |
| """根据模型名称获取对应的API密钥""" |
| api_key = model_manager.get_api_key(model_name) |
| if not api_key: |
| logger.warning(f"No API key found for model: {model_name}") |
| return api_key |
|
|
| def transform_openai_to_dify(openai_request, endpoint): |
| """将OpenAI格式的请求转换为Dify格式""" |
| |
| if endpoint == "/chat/completions": |
| messages = openai_request.get("messages", []) |
| stream = openai_request.get("stream", False) |
| |
| |
| conversation_id = None |
| |
| |
| system_content = "" |
| system_messages = [msg for msg in messages if msg.get("role") == "system"] |
| if system_messages: |
| system_content = system_messages[0].get("content", "") |
| |
| logger.info(f"Found system message: {system_content[:100]}{'...' if len(system_content) > 100 else ''}") |
| |
| if CONVERSATION_MEMORY_MODE == 2: |
| if len(messages) > 1: |
| |
| for msg in reversed(messages[:-1]): |
| if msg.get("role") == "assistant": |
| content = msg.get("content", "") |
| |
| conversation_id = decode_conversation_id(content) |
| if conversation_id: |
| break |
| |
| |
| user_query = messages[-1]["content"] if messages and messages[-1].get("role") != "system" else "" |
| |
| |
| if system_content and not conversation_id: |
| user_query = f"系统指令: {system_content}\n\n用户问题: {user_query}" |
| logger.info(f"[零宽字符模式] 首次对话,添加system内容到查询前") |
| |
| dify_request = { |
| "inputs": {}, |
| "query": user_query, |
| "response_mode": "streaming" if stream else "blocking", |
| "conversation_id": conversation_id, |
| "user": openai_request.get("user", "default_user") |
| } |
| else: |
| |
| user_query = messages[-1]["content"] if messages and messages[-1].get("role") != "system" else "" |
| |
| |
| if len(messages) > 1: |
| history_messages = [] |
| has_system_in_history = False |
| |
| |
| for msg in messages[:-1]: |
| role = msg.get("role", "") |
| content = msg.get("content", "") |
| if role and content: |
| if role == "system": |
| has_system_in_history = True |
| history_messages.append(f"{role}: {content}") |
| |
| |
| if system_content and not has_system_in_history: |
| history_messages.insert(0, f"system: {system_content}") |
| logger.info(f"[history_message模式] 添加system内容到历史消息前") |
| |
| |
| if history_messages: |
| history_context = "\n\n".join(history_messages) |
| user_query = f"<history>\n{history_context}\n</history>\n\n用户当前问题: {user_query}" |
| elif system_content: |
| user_query = f"系统指令: {system_content}\n\n用户问题: {user_query}" |
| logger.info(f"[history_message模式] 首次对话,添加system内容到查询前") |
| |
| dify_request = { |
| "inputs": {}, |
| "query": user_query, |
| "response_mode": "streaming" if stream else "blocking", |
| "user": openai_request.get("user", "default_user") |
| } |
|
|
| return dify_request |
| |
| return None |
|
|
| def transform_dify_to_openai(dify_response, model="claude-3-5-sonnet-v2", stream=False): |
| """将Dify格式的响应转换为OpenAI格式""" |
| |
| if not stream: |
| |
| answer = "" |
| mode = dify_response.get("mode", "") |
| |
| |
| if "answer" in dify_response: |
| answer = dify_response.get("answer", "") |
| |
| |
| elif "agent_thoughts" in dify_response: |
| |
| agent_thoughts = dify_response.get("agent_thoughts", []) |
| if agent_thoughts: |
| for thought in agent_thoughts: |
| if thought.get("thought"): |
| answer = thought.get("thought", "") |
| |
| |
| if CONVERSATION_MEMORY_MODE == 2: |
| conversation_id = dify_response.get("conversation_id", "") |
| history = dify_response.get("conversation_history", []) |
| |
| |
| has_conversation_id = False |
| if history: |
| for msg in history: |
| if msg.get("role") == "assistant": |
| content = msg.get("content", "") |
| if decode_conversation_id(content) is not None: |
| has_conversation_id = True |
| break |
| |
| |
| if conversation_id and not has_conversation_id: |
| logger.info(f"[Debug] Inserting conversation_id: {conversation_id}, history_length: {len(history)}") |
| encoded = encode_conversation_id(conversation_id) |
| answer = answer + encoded |
| logger.info(f"[Debug] Response content after insertion: {repr(answer)}") |
| |
| return { |
| "id": dify_response.get("message_id", ""), |
| "object": "chat.completion", |
| "created": dify_response.get("created", int(time.time())), |
| "model": model, |
| "choices": [{ |
| "index": 0, |
| "message": { |
| "role": "assistant", |
| "content": answer |
| }, |
| "finish_reason": "stop" |
| }] |
| } |
| else: |
| |
| return dify_response |
|
|
| def create_openai_stream_response(content, message_id, model="claude-3-5-sonnet-v2"): |
| """创建OpenAI格式的流式响应""" |
| return { |
| "id": message_id, |
| "object": "chat.completion.chunk", |
| "created": int(time.time()), |
| "model": model, |
| "choices": [{ |
| "index": 0, |
| "delta": { |
| "content": content |
| }, |
| "finish_reason": None |
| }] |
| } |
|
|
| def encode_conversation_id(conversation_id): |
| """将conversation_id编码为不可见的字符序列""" |
| if not conversation_id: |
| return "" |
| |
| |
| import base64 |
| encoded = base64.b64encode(conversation_id.encode()).decode() |
| |
| |
| |
| char_map = { |
| '0': '\u200b', |
| '1': '\u200c', |
| '2': '\u200d', |
| '3': '\ufeff', |
| '4': '\u2060', |
| '5': '\u180e', |
| '6': '\u2061', |
| '7': '\u2062', |
| } |
| |
| |
| result = [] |
| for c in encoded: |
| |
| if c.isalpha(): |
| if c.isupper(): |
| val = ord(c) - ord('A') |
| else: |
| val = ord(c) - ord('a') + 26 |
| elif c.isdigit(): |
| val = int(c) + 52 |
| elif c == '+': |
| val = 62 |
| elif c == '/': |
| val = 63 |
| else: |
| val = 0 |
| |
| |
| first = (val >> 3) & 0x7 |
| second = val & 0x7 |
| result.append(char_map[str(first)]) |
| if c != '=': |
| result.append(char_map[str(second)]) |
| |
| return ''.join(result) |
|
|
| def decode_conversation_id(content): |
| """从消息内容中解码conversation_id""" |
| try: |
| |
| char_to_val = { |
| '\u200b': '0', |
| '\u200c': '1', |
| '\u200d': '2', |
| '\ufeff': '3', |
| '\u2060': '4', |
| '\u180e': '5', |
| '\u2061': '6', |
| '\u2062': '7', |
| } |
| |
| |
| space_chars = [] |
| for c in reversed(content): |
| if c not in char_to_val: |
| break |
| space_chars.append(c) |
| |
| if not space_chars: |
| return None |
| |
| |
| space_chars.reverse() |
| base64_chars = [] |
| for i in range(0, len(space_chars), 2): |
| first = int(char_to_val[space_chars[i]], 8) |
| if i + 1 < len(space_chars): |
| second = int(char_to_val[space_chars[i + 1]], 8) |
| val = (first << 3) | second |
| else: |
| val = first << 3 |
| |
| |
| if val < 26: |
| base64_chars.append(chr(val + ord('A'))) |
| elif val < 52: |
| base64_chars.append(chr(val - 26 + ord('a'))) |
| elif val < 62: |
| base64_chars.append(str(val - 52)) |
| elif val == 62: |
| base64_chars.append('+') |
| else: |
| base64_chars.append('/') |
| |
| |
| padding = len(base64_chars) % 4 |
| if padding: |
| base64_chars.extend(['='] * (4 - padding)) |
| |
| |
| import base64 |
| base64_str = ''.join(base64_chars) |
| return base64.b64decode(base64_str).decode() |
| |
| except Exception as e: |
| logger.debug(f"Failed to decode conversation_id: {e}") |
| return None |
|
|
| @app.route('/v1/chat/completions', methods=['POST']) |
| def chat_completions(): |
| try: |
| |
| auth_header = request.headers.get('Authorization') |
| if not auth_header: |
| return jsonify({ |
| "error": { |
| "message": "Missing Authorization header", |
| "type": "invalid_request_error", |
| "param": None, |
| "code": "invalid_api_key" |
| } |
| }), 401 |
|
|
| parts = auth_header.split() |
| if len(parts) != 2 or parts[0].lower() != 'bearer': |
| return jsonify({ |
| "error": { |
| "message": "Invalid Authorization header format. Expected: Bearer <API_KEY>", |
| "type": "invalid_request_error", |
| "param": None, |
| "code": "invalid_api_key" |
| } |
| }), 401 |
|
|
| provided_api_key = parts[1] |
| if provided_api_key not in VALID_API_KEYS: |
| return jsonify({ |
| "error": { |
| "message": "Invalid API key", |
| "type": "invalid_request_error", |
| "param": None, |
| "code": "invalid_api_key" |
| } |
| }), 401 |
|
|
| |
| openai_request = request.get_json() |
| logger.info(f"Received request: {json.dumps(openai_request, ensure_ascii=False)}") |
| |
| model = openai_request.get("model", "claude-3-5-sonnet-v2") |
| |
| |
| api_key = get_api_key(model) |
| if not api_key: |
| error_msg = f"Model {model} is not supported. Available models: {', '.join(model_manager.name_to_api_key.keys())}" |
| logger.error(error_msg) |
| return { |
| "error": { |
| "message": error_msg, |
| "type": "invalid_request_error", |
| "code": "model_not_found" |
| } |
| }, 404 |
| |
| dify_request = transform_openai_to_dify(openai_request, "/chat/completions") |
| |
| if not dify_request: |
| logger.error("Failed to transform request") |
| return { |
| "error": { |
| "message": "Invalid request format", |
| "type": "invalid_request_error", |
| } |
| }, 400 |
|
|
| headers = { |
| "Authorization": f"Bearer {api_key}", |
| "Content-Type": "application/json" |
| } |
|
|
| stream = openai_request.get("stream", False) |
| dify_endpoint = f"{DIFY_API_BASE}/chat-messages" |
| logger.info(f"Sending request to Dify endpoint: {dify_endpoint}, stream={stream}") |
|
|
| if stream: |
| def generate(): |
| client = httpx.Client(timeout=None) |
| |
| def flush_chunk(chunk_data): |
| """Helper function to flush chunks immediately""" |
| return chunk_data.encode('utf-8') |
| |
| def calculate_delay(buffer_size): |
| """ |
| 根据缓冲区大小动态计算延迟 |
| buffer_size: 缓冲区中剩余的字符数量 |
| """ |
| if buffer_size > 30: |
| return 0.001 |
| elif buffer_size > 20: |
| return 0.002 |
| elif buffer_size > 10: |
| return 0.01 |
| else: |
| return 0.02 |
| |
| def send_char(char, message_id): |
| """Helper function to send single character""" |
| openai_chunk = { |
| "id": message_id, |
| "object": "chat.completion.chunk", |
| "created": int(time.time()), |
| "model": model, |
| "choices": [{ |
| "index": 0, |
| "delta": { |
| "content": char |
| }, |
| "finish_reason": None |
| }] |
| } |
| chunk_data = f"data: {json.dumps(openai_chunk)}\n\n" |
| return flush_chunk(chunk_data) |
| |
| |
| output_buffer = [] |
| |
| try: |
| with client.stream( |
| 'POST', |
| dify_endpoint, |
| json=dify_request, |
| headers={ |
| **headers, |
| 'Accept': 'text/event-stream', |
| 'Cache-Control': 'no-cache', |
| 'Connection': 'keep-alive' |
| } |
| ) as response: |
| generate.message_id = None |
| buffer = "" |
| |
| for raw_bytes in response.iter_raw(): |
| if not raw_bytes: |
| continue |
| |
| try: |
| buffer += raw_bytes.decode('utf-8') |
| |
| while '\n' in buffer: |
| line, buffer = buffer.split('\n', 1) |
| line = line.strip() |
| |
| if not line or not line.startswith('data: '): |
| continue |
| |
| try: |
| json_str = line[6:] |
| dify_chunk = json.loads(json_str) |
| |
| if dify_chunk.get("event") == "message" and "answer" in dify_chunk: |
| current_answer = dify_chunk["answer"] |
| if not current_answer: |
| continue |
| |
| message_id = dify_chunk.get("message_id", "") |
| if not generate.message_id: |
| generate.message_id = message_id |
| |
| |
| for char in current_answer: |
| output_buffer.append((char, generate.message_id)) |
| |
| |
| while output_buffer: |
| char, msg_id = output_buffer.pop(0) |
| yield send_char(char, msg_id) |
| |
| delay = calculate_delay(len(output_buffer)) |
| time.sleep(delay) |
| |
| |
| continue |
| |
| |
| elif dify_chunk.get("event") == "agent_message" and "answer" in dify_chunk: |
| current_answer = dify_chunk["answer"] |
| if not current_answer: |
| continue |
| |
| message_id = dify_chunk.get("message_id", "") |
| if not generate.message_id: |
| generate.message_id = message_id |
| |
| |
| for char in current_answer: |
| output_buffer.append((char, generate.message_id)) |
| |
| |
| while output_buffer: |
| char, msg_id = output_buffer.pop(0) |
| yield send_char(char, msg_id) |
| |
| delay = calculate_delay(len(output_buffer)) |
| time.sleep(delay) |
| |
| |
| continue |
| |
| |
| elif dify_chunk.get("event") == "agent_thought": |
| thought_id = dify_chunk.get("id", "") |
| thought = dify_chunk.get("thought", "") |
| tool = dify_chunk.get("tool", "") |
| tool_input = dify_chunk.get("tool_input", "") |
| observation = dify_chunk.get("observation", "") |
| |
| logger.info(f"[Agent Thought] ID: {thought_id}, Tool: {tool}") |
| if thought: |
| logger.info(f"[Agent Thought] Thought: {thought}") |
| if tool_input: |
| logger.info(f"[Agent Thought] Tool Input: {tool_input}") |
| if observation: |
| logger.info(f"[Agent Thought] Observation: {observation}") |
| |
| |
| message_id = dify_chunk.get("message_id", "") |
| if not generate.message_id and message_id: |
| generate.message_id = message_id |
| |
| continue |
| |
| |
| elif dify_chunk.get("event") == "message_file": |
| file_id = dify_chunk.get("id", "") |
| file_type = dify_chunk.get("type", "") |
| file_url = dify_chunk.get("url", "") |
| |
| logger.info(f"[Message File] ID: {file_id}, Type: {file_type}, URL: {file_url}") |
| continue |
| |
| elif dify_chunk.get("event") == "message_end": |
| |
| while output_buffer: |
| char, msg_id = output_buffer.pop(0) |
| yield send_char(char, msg_id) |
| time.sleep(0.001) |
| |
| |
| if CONVERSATION_MEMORY_MODE == 2: |
| conversation_id = dify_chunk.get("conversation_id") |
| history = dify_chunk.get("conversation_history", []) |
| |
| has_conversation_id = False |
| if history: |
| for msg in history: |
| if msg.get("role") == "assistant": |
| content = msg.get("content", "") |
| if decode_conversation_id(content) is not None: |
| has_conversation_id = True |
| break |
| |
| |
| if conversation_id and not has_conversation_id: |
| logger.info(f"[Debug] Inserting conversation_id in stream: {conversation_id}") |
| encoded = encode_conversation_id(conversation_id) |
| logger.info(f"[Debug] Stream encoded content: {repr(encoded)}") |
| for char in encoded: |
| yield send_char(char, generate.message_id) |
| |
| final_chunk = { |
| "id": generate.message_id, |
| "object": "chat.completion.chunk", |
| "created": int(time.time()), |
| "model": model, |
| "choices": [{ |
| "index": 0, |
| "delta": {}, |
| "finish_reason": "stop" |
| }] |
| } |
| yield flush_chunk(f"data: {json.dumps(final_chunk)}\n\n") |
| yield flush_chunk("data: [DONE]\n\n") |
| |
| except json.JSONDecodeError as e: |
| logger.error(f"JSON decode error: {str(e)}") |
| continue |
| |
| except Exception as e: |
| logger.error(f"Error processing chunk: {str(e)}") |
| continue |
|
|
| finally: |
| client.close() |
|
|
| return Response( |
| stream_with_context(generate()), |
| content_type='text/event-stream', |
| headers={ |
| 'Cache-Control': 'no-cache, no-transform', |
| 'Connection': 'keep-alive', |
| 'Transfer-Encoding': 'chunked', |
| 'X-Accel-Buffering': 'no', |
| 'Content-Encoding': 'none' |
| }, |
| direct_passthrough=True |
| ) |
| else: |
| async def sync_response(): |
| try: |
| async with httpx.AsyncClient() as client: |
| response = await client.post( |
| dify_endpoint, |
| json=dify_request, |
| headers=headers |
| ) |
| |
| if response.status_code != 200: |
| error_msg = f"Dify API error: {response.text}" |
| logger.error(f"Request failed: {error_msg}") |
| return { |
| "error": { |
| "message": error_msg, |
| "type": "api_error", |
| "code": response.status_code |
| } |
| }, response.status_code |
|
|
| dify_response = response.json() |
| logger.info(f"Received response from Dify: {json.dumps(dify_response, ensure_ascii=False)}") |
| logger.info(f"[Debug] Response content: {repr(dify_response.get('answer', ''))}") |
| openai_response = transform_dify_to_openai(dify_response, model=model) |
| conversation_id = dify_response.get("conversation_id") |
| if conversation_id: |
| |
| return Response( |
| json.dumps(openai_response), |
| content_type='application/json', |
| headers={ |
| 'Conversation-Id': conversation_id |
| } |
| ) |
| else: |
| return openai_response |
| except httpx.RequestError as e: |
| error_msg = f"Failed to connect to Dify: {str(e)}" |
| logger.error(error_msg) |
| return { |
| "error": { |
| "message": error_msg, |
| "type": "api_error", |
| "code": "connection_error" |
| } |
| }, 503 |
|
|
| return asyncio.run(sync_response()) |
|
|
| except Exception as e: |
| logger.exception("Unexpected error occurred") |
| return { |
| "error": { |
| "message": str(e), |
| "type": "internal_error", |
| } |
| }, 500 |
|
|
| @app.route('/v1/models', methods=['GET']) |
| def list_models(): |
| """返回可用的模型列表""" |
| logger.info("Listing available models") |
| |
| |
| asyncio.run(model_manager.refresh_model_info()) |
| |
| |
| available_models = model_manager.get_available_models() |
| |
| response = { |
| "object": "list", |
| "data": available_models |
| } |
| logger.info(f"Available models: {json.dumps(response, ensure_ascii=False)}") |
| return response |
|
|
| |
| if __name__ == "__main__": |
| |
| asyncio.run(model_manager.refresh_model_info()) |
| |
| |
| host = os.getenv("SERVER_HOST", "0.0.0.0") |
| port = int(os.getenv("SERVER_PORT", 7860)) |
| |
| logger.info(f"Starting server on {host}:{port}") |
| app.run(host=host, port=port) |
|
|