Buckets:
MisterAI/LocalAI_Demo_backends / cpu-diffusers.upgrade-tmp /venv /lib /python3.10 /site-packages /ipykernel /inprocess /socket.py
| """Defines a dummy socket implementing (part of) the zmq.Socket interface.""" | |
| # Copyright (c) IPython Development Team. | |
| # Distributed under the terms of the Modified BSD License. | |
| from queue import Queue | |
| import zmq | |
| from traitlets import HasTraits, Instance, Int | |
| # ----------------------------------------------------------------------------- | |
| # Dummy socket class | |
| # ----------------------------------------------------------------------------- | |
| class DummySocket(HasTraits): | |
| """A dummy socket implementing (part of) the zmq.Socket interface.""" | |
| queue = Instance(Queue, ()) | |
| message_sent = Int(0) # Should be an Event | |
| context = Instance(zmq.Context) | |
| def _context_default(self): | |
| return zmq.Context() | |
| # ------------------------------------------------------------------------- | |
| # Socket interface | |
| # ------------------------------------------------------------------------- | |
| def recv_multipart(self, flags=0, copy=True, track=False): | |
| """Recv a multipart message.""" | |
| return self.queue.get_nowait() | |
| def send_multipart(self, msg_parts, flags=0, copy=True, track=False): | |
| """Send a multipart message.""" | |
| msg_parts = list(map(zmq.Message, msg_parts)) | |
| self.queue.put_nowait(msg_parts) | |
| self.message_sent += 1 | |
| def flush(self, timeout=1.0): | |
| """no-op to comply with stream API""" | |
Xet Storage Details
- Size:
- 1.38 kB
- Xet hash:
- fdb8befd66926528dcd9624447ec19db9319f3adf28b0b65bc3442cf6dea36e2
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.