| """ |
| 通用的HTTP客户端模块 |
| 为所有需要使用httpx的模块提供统一的客户端配置和方法 |
| 保持通用性,不与特定业务逻辑耦合 |
| """ |
|
|
| from contextlib import asynccontextmanager |
| from typing import Any, AsyncGenerator, Dict, Optional |
|
|
| import httpx |
|
|
| from config import get_proxy_config |
| from log import log |
|
|
|
|
| class HttpxClientManager: |
| """通用HTTP客户端管理器""" |
|
|
| async def get_client_kwargs(self, timeout: float = 30.0, **kwargs) -> Dict[str, Any]: |
| """获取httpx客户端的通用配置参数""" |
| client_kwargs = {"timeout": timeout, **kwargs} |
|
|
| |
| current_proxy_config = await get_proxy_config() |
| if current_proxy_config: |
| client_kwargs["proxy"] = current_proxy_config |
|
|
| return client_kwargs |
|
|
| @asynccontextmanager |
| async def get_client( |
| self, timeout: float = 30.0, **kwargs |
| ) -> AsyncGenerator[httpx.AsyncClient, None]: |
| """获取配置好的异步HTTP客户端""" |
| client_kwargs = await self.get_client_kwargs(timeout=timeout, **kwargs) |
|
|
| async with httpx.AsyncClient(**client_kwargs) as client: |
| yield client |
|
|
| @asynccontextmanager |
| async def get_streaming_client( |
| self, timeout: float = None, **kwargs |
| ) -> AsyncGenerator[httpx.AsyncClient, None]: |
| """获取用于流式请求的HTTP客户端(无超时限制)""" |
| client_kwargs = await self.get_client_kwargs(timeout=timeout, **kwargs) |
|
|
| |
| client = httpx.AsyncClient(**client_kwargs) |
| try: |
| yield client |
| finally: |
| |
| try: |
| await client.aclose() |
| except Exception as e: |
| log.warning(f"Error closing streaming client: {e}") |
|
|
|
|
| |
| http_client = HttpxClientManager() |
|
|
|
|
| |
| async def get_async( |
| url: str, headers: Optional[Dict[str, str]] = None, timeout: float = 30.0, **kwargs |
| ) -> httpx.Response: |
| """通用异步GET请求""" |
| async with http_client.get_client(timeout=timeout, **kwargs) as client: |
| return await client.get(url, headers=headers) |
|
|
|
|
| async def post_async( |
| url: str, |
| data: Any = None, |
| json: Any = None, |
| headers: Optional[Dict[str, str]] = None, |
| timeout: float = 30.0, |
| **kwargs, |
| ) -> httpx.Response: |
| """通用异步POST请求""" |
| async with http_client.get_client(timeout=timeout, **kwargs) as client: |
| return await client.post(url, data=data, json=json, headers=headers) |
|
|
|
|
| async def stream_post_async( |
| url: str, |
| body: Dict[str, Any], |
| native: bool = False, |
| headers: Optional[Dict[str, str]] = None, |
| **kwargs, |
| ): |
| """流式异步POST请求""" |
| async with http_client.get_streaming_client(**kwargs) as client: |
| async with client.stream("POST", url, json=body, headers=headers) as r: |
| |
| if r.status_code != 200: |
| from fastapi import Response |
| yield Response(await r.aread(), r.status_code, dict(r.headers)) |
| return |
|
|
| |
| if native: |
| async for chunk in r.aiter_bytes(): |
| yield chunk |
| else: |
| |
| async for line in r.aiter_lines(): |
| yield line |
|
|