| |
|
|
| |
| |
|
|
|
|
| class NullContext: |
| def __init__(self, enter_result=None): |
| self.enter_result = enter_result |
|
|
| def __enter__(self): |
| return self.enter_result |
|
|
| def __exit__(self, exc_type, exc_value, traceback): |
| pass |
|
|
| async def __aenter__(self): |
| return self.enter_result |
|
|
| async def __aexit__(self, exc_type, exc_value, traceback): |
| pass |
|
|
|
|
| |
| |
|
|
|
|
| class Socket: |
| async def close(self): |
| pass |
|
|
| async def getpeername(self): |
| raise NotImplementedError |
|
|
| async def getsockname(self): |
| raise NotImplementedError |
|
|
| async def getpeercert(self, timeout): |
| raise NotImplementedError |
|
|
| async def __aenter__(self): |
| return self |
|
|
| async def __aexit__(self, exc_type, exc_value, traceback): |
| await self.close() |
|
|
|
|
| class DatagramSocket(Socket): |
| def __init__(self, family: int): |
| self.family = family |
|
|
| async def sendto(self, what, destination, timeout): |
| raise NotImplementedError |
|
|
| async def recvfrom(self, size, timeout): |
| raise NotImplementedError |
|
|
|
|
| class StreamSocket(Socket): |
| async def sendall(self, what, timeout): |
| raise NotImplementedError |
|
|
| async def recv(self, size, timeout): |
| raise NotImplementedError |
|
|
|
|
| class NullTransport: |
| async def connect_tcp(self, host, port, timeout, local_address): |
| raise NotImplementedError |
|
|
|
|
| class Backend: |
| def name(self): |
| return "unknown" |
|
|
| async def make_socket( |
| self, |
| af, |
| socktype, |
| proto=0, |
| source=None, |
| destination=None, |
| timeout=None, |
| ssl_context=None, |
| server_hostname=None, |
| ): |
| raise NotImplementedError |
|
|
| def datagram_connection_required(self): |
| return False |
|
|
| async def sleep(self, interval): |
| raise NotImplementedError |
|
|
| def get_transport_class(self): |
| raise NotImplementedError |
|
|
| async def wait_for(self, awaitable, timeout): |
| raise NotImplementedError |
|
|