| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | import gc |
| |
|
| | import usocket as socket |
| | import ustruct as struct |
| |
|
| | gc.collect() |
| | import uasyncio as asyncio |
| | from ubinascii import hexlify |
| |
|
| | gc.collect() |
| | from uerrno import EINPROGRESS, ETIMEDOUT |
| | from utime import ticks_diff, ticks_ms |
| |
|
| | gc.collect() |
| | import network |
| | from machine import unique_id |
| | from micropython import const |
| |
|
| | gc.collect() |
| | from sys import platform |
| |
|
| | VERSION = (0, 7, 1) |
| |
|
| | |
| | _DEFAULT_MS = const(20) |
| | _SOCKET_POLL_DELAY = const(5) |
| |
|
| | |
| | ESP32 = platform == "esp32" |
| | RP2 = platform == "rp2" |
| | if ESP32: |
| | |
| | BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, 118, 119] |
| | elif RP2: |
| | BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, -110] |
| | else: |
| | BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT] |
| |
|
| | ESP8266 = platform == "esp8266" |
| | PYBOARD = platform == "pyboard" |
| |
|
| |
|
| | |
| | async def eliza(*_): |
| | await asyncio.sleep_ms(_DEFAULT_MS) |
| |
|
| |
|
| | class MsgQueue: |
| | def __init__(self, size): |
| | self._q = [0 for _ in range(max(size, 4))] |
| | self._size = size |
| | self._wi = 0 |
| | self._ri = 0 |
| | self._evt = asyncio.Event() |
| | self.discards = 0 |
| |
|
| | def put(self, *v): |
| | self._q[self._wi] = v |
| | self._evt.set() |
| | self._wi = (self._wi + 1) % self._size |
| | if self._wi == self._ri: |
| | self._ri = (self._ri + 1) % self._size |
| | self.discards += 1 |
| |
|
| | def __aiter__(self): |
| | return self |
| |
|
| | async def __anext__(self): |
| | if self._ri == self._wi: |
| | self._evt.clear() |
| | await self._evt.wait() |
| | r = self._q[self._ri] |
| | self._ri = (self._ri + 1) % self._size |
| | return r |
| |
|
| |
|
| | config = { |
| | "client_id": hexlify(unique_id()), |
| | "server": None, |
| | "port": 0, |
| | "user": "", |
| | "password": "", |
| | "keepalive": 60, |
| | "ping_interval": 0, |
| | "ssl": False, |
| | "ssl_params": {}, |
| | "response_time": 10, |
| | "clean_init": True, |
| | "clean": True, |
| | "max_repubs": 4, |
| | "will": None, |
| | "subs_cb": lambda *_: None, |
| | "wifi_coro": eliza, |
| | "connect_coro": eliza, |
| | "ssid": None, |
| | "wifi_pw": None, |
| | "queue_len": 0, |
| | "gateway": False, |
| | } |
| |
|
| |
|
| | class MQTTException(Exception): |
| | pass |
| |
|
| |
|
| | def pid_gen(): |
| | pid = 0 |
| | while True: |
| | pid = pid + 1 if pid < 65535 else 1 |
| | yield pid |
| |
|
| |
|
| | def qos_check(qos): |
| | if not (qos == 0 or qos == 1): |
| | raise ValueError("Only qos 0 and 1 are supported.") |
| |
|
| |
|
| | |
| | |
| | class MQTT_base: |
| | REPUB_COUNT = 0 |
| | DEBUG = False |
| |
|
| | def __init__(self, config): |
| | self._events = config["queue_len"] > 0 |
| | |
| | self._client_id = config["client_id"] |
| | self._user = config["user"] |
| | self._pswd = config["password"] |
| | self._keepalive = config["keepalive"] |
| | if self._keepalive >= 65536: |
| | raise ValueError("invalid keepalive time") |
| | self._response_time = ( |
| | config["response_time"] * 1000 |
| | ) |
| | self._max_repubs = config["max_repubs"] |
| | self._clean_init = config[ |
| | "clean_init" |
| | ] |
| | self._clean = config["clean"] |
| | will = config["will"] |
| | if will is None: |
| | self._lw_topic = False |
| | else: |
| | self._set_last_will(*will) |
| | |
| | self._ssid = config["ssid"] |
| | self._wifi_pw = config["wifi_pw"] |
| | self._ssl = config["ssl"] |
| | self._ssl_params = config["ssl_params"] |
| | |
| | if self._events: |
| | self.up = asyncio.Event() |
| | self.down = asyncio.Event() |
| | self.queue = MsgQueue(config["queue_len"]) |
| | else: |
| | self._cb = config["subs_cb"] |
| | self._wifi_handler = config["wifi_coro"] |
| | self._connect_handler = config["connect_coro"] |
| | |
| | self.port = config["port"] |
| | if self.port == 0: |
| | self.port = 8883 if self._ssl else 1883 |
| | self.server = config["server"] |
| | if self.server is None: |
| | raise ValueError("no server specified.") |
| | self._sock = None |
| | self._sta_if = network.WLAN(network.STA_IF) |
| | self._sta_if.active(True) |
| | if config["gateway"]: |
| | import aioespnow |
| |
|
| | while not (sta := self._sta_if).active(): |
| | time.sleep(0.1) |
| | sta.config(pm=sta.PM_NONE) |
| | sta.active(True) |
| | self._espnow = ( |
| | aioespnow.AIOESPNow() |
| | ) |
| | self._espnow.active(True) |
| |
|
| | self.newpid = pid_gen() |
| | self.rcv_pids = set() |
| | self.last_rx = ticks_ms() |
| | self.lock = asyncio.Lock() |
| |
|
| | def _set_last_will(self, topic, msg, retain=False, qos=0): |
| | qos_check(qos) |
| | if not topic: |
| | raise ValueError("Empty topic.") |
| | self._lw_topic = topic |
| | self._lw_msg = msg |
| | self._lw_qos = qos |
| | self._lw_retain = retain |
| |
|
| | def dprint(self, msg, *args): |
| | if self.DEBUG: |
| | print(msg % args) |
| |
|
| | def _timeout(self, t): |
| | return ticks_diff(ticks_ms(), t) > self._response_time |
| |
|
| | async def _as_read(self, n, sock=None): |
| | if sock is None: |
| | sock = self._sock |
| | |
| | |
| | |
| | data = bytearray(n) |
| | buffer = memoryview(data) |
| | size = 0 |
| | t = ticks_ms() |
| | while size < n: |
| | if self._timeout(t) or not self.isconnected(): |
| | raise OSError(-1, "Timeout on socket read") |
| | try: |
| | msg_size = sock.readinto(buffer[size:], n - size) |
| | except OSError as e: |
| | msg_size = None |
| | if e.args[0] not in BUSY_ERRORS: |
| | raise |
| | if msg_size == 0: |
| | raise OSError(-1, "Connection closed by host") |
| | if msg_size is not None: |
| | size += msg_size |
| | t = ticks_ms() |
| | self.last_rx = ticks_ms() |
| | await asyncio.sleep_ms(_SOCKET_POLL_DELAY) |
| | return data |
| |
|
| | async def _as_write(self, bytes_wr, length=0, sock=None): |
| | if sock is None: |
| | sock = self._sock |
| |
|
| | |
| | bytes_wr = memoryview(bytes_wr) |
| | if length: |
| | bytes_wr = bytes_wr[:length] |
| | t = ticks_ms() |
| | while bytes_wr: |
| | if self._timeout(t) or not self.isconnected(): |
| | raise OSError(-1, "Timeout on socket write") |
| | try: |
| | n = sock.write(bytes_wr) |
| | except OSError as e: |
| | n = 0 |
| | if e.args[0] not in BUSY_ERRORS: |
| | raise |
| | if n: |
| | t = ticks_ms() |
| | bytes_wr = bytes_wr[n:] |
| | await asyncio.sleep_ms(_SOCKET_POLL_DELAY) |
| |
|
| | async def _send_str(self, s): |
| | await self._as_write(struct.pack("!H", len(s))) |
| | await self._as_write(s) |
| |
|
| | async def _recv_len(self): |
| | n = 0 |
| | sh = 0 |
| | while 1: |
| | res = await self._as_read(1) |
| | b = res[0] |
| | n |= (b & 0x7F) << sh |
| | if not b & 0x80: |
| | return n |
| | sh += 7 |
| |
|
| | async def _connect(self, clean): |
| | self._sock = socket.socket() |
| | self._sock.setblocking(False) |
| | try: |
| | self._sock.connect(self._addr) |
| | except OSError as e: |
| | if e.args[0] not in BUSY_ERRORS: |
| | raise |
| | await asyncio.sleep_ms(_DEFAULT_MS) |
| | self.dprint("Connecting to broker.") |
| | if self._ssl: |
| | import ssl |
| |
|
| | self._sock = ssl.wrap_socket(self._sock, **self._ssl_params) |
| | premsg = bytearray(b"\x10\0\0\0\0\0") |
| | msg = bytearray(b"\x04MQTT\x04\0\0\0") |
| |
|
| | sz = 10 + 2 + len(self._client_id) |
| | msg[6] = clean << 1 |
| | if self._user: |
| | sz += 2 + len(self._user) + 2 + len(self._pswd) |
| | msg[6] |= 0xC0 |
| | if self._keepalive: |
| | msg[7] |= self._keepalive >> 8 |
| | msg[8] |= self._keepalive & 0x00FF |
| | if self._lw_topic: |
| | sz += 2 + len(self._lw_topic) + 2 + len(self._lw_msg) |
| | msg[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3 |
| | msg[6] |= self._lw_retain << 5 |
| |
|
| | i = 1 |
| | while sz > 0x7F: |
| | premsg[i] = (sz & 0x7F) | 0x80 |
| | sz >>= 7 |
| | i += 1 |
| | premsg[i] = sz |
| | await self._as_write(premsg, i + 2) |
| | await self._as_write(msg) |
| | await self._send_str(self._client_id) |
| | if self._lw_topic: |
| | await self._send_str(self._lw_topic) |
| | await self._send_str(self._lw_msg) |
| | if self._user: |
| | await self._send_str(self._user) |
| | await self._send_str(self._pswd) |
| | |
| | |
| | resp = await self._as_read(4) |
| | self.dprint("Connected to broker.") |
| | if ( |
| | resp[3] != 0 or resp[0] != 0x20 or resp[1] != 0x02 |
| | ): |
| | raise OSError( |
| | -1, |
| | f"Connect fail: 0x{(resp[0] << 8) + resp[1]:04x} {resp[3]} (README 7)", |
| | ) |
| |
|
| | async def _ping(self): |
| | async with self.lock: |
| | await self._as_write(b"\xc0\0") |
| |
|
| | |
| | async def wan_ok( |
| | self, |
| | packet=b"$\x1a\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x06google\x03com\x00\x00\x01\x00\x01", |
| | ): |
| | if not self.isconnected(): |
| | return False |
| | length = 32 |
| | s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| | s.setblocking(False) |
| | s.connect(("8.8.8.8", 53)) |
| | await asyncio.sleep(1) |
| | try: |
| | await self._as_write(packet, sock=s) |
| | await asyncio.sleep(2) |
| | res = await self._as_read(length, s) |
| | if len(res) == length: |
| | return True |
| | except OSError: |
| | return False |
| | finally: |
| | s.close() |
| | return False |
| |
|
| | async def broker_up(self): |
| | if not self.isconnected(): |
| | return False |
| | tlast = self.last_rx |
| | if ticks_diff(ticks_ms(), tlast) < 1000: |
| | return True |
| | try: |
| | await self._ping() |
| | except OSError: |
| | return False |
| | t = ticks_ms() |
| | while not self._timeout(t): |
| | await asyncio.sleep_ms(100) |
| | if ticks_diff(self.last_rx, tlast) > 0: |
| | return True |
| | return False |
| |
|
| | async def disconnect(self): |
| | if self._sock is not None: |
| | await self._kill_tasks(False) |
| | try: |
| | async with self.lock: |
| | self._sock.write(b"\xe0\0") |
| | await asyncio.sleep_ms(100) |
| | except OSError: |
| | pass |
| | self._close() |
| | self._has_connected = False |
| |
|
| | def _close(self): |
| | if self._sock is not None: |
| | self._sock.close() |
| |
|
| | def close( |
| | self, |
| | ): |
| | self._close() |
| | try: |
| | self._sta_if.disconnect() |
| | except OSError: |
| | self.dprint("Wi-Fi not started, unable to disconnect interface") |
| | self._sta_if.active(False) |
| |
|
| | async def _await_pid(self, pid): |
| | t = ticks_ms() |
| | while pid in self.rcv_pids: |
| | if self._timeout(t) or not self.isconnected(): |
| | break |
| | await asyncio.sleep_ms(100) |
| | else: |
| | return True |
| | return False |
| |
|
| | |
| | |
| | async def publish(self, topic, msg, retain, qos): |
| | pid = next(self.newpid) |
| | if qos: |
| | self.rcv_pids.add(pid) |
| | async with self.lock: |
| | await self._publish(topic, msg, retain, qos, 0, pid) |
| | if qos == 0: |
| | return |
| |
|
| | count = 0 |
| | while 1: |
| | if await self._await_pid(pid): |
| | return |
| | |
| | if count >= self._max_repubs or not self.isconnected(): |
| | raise OSError(-1) |
| | async with self.lock: |
| | await self._publish(topic, msg, retain, qos, dup=1, pid=pid) |
| | count += 1 |
| | self.REPUB_COUNT += 1 |
| |
|
| | async def _publish(self, topic, msg, retain, qos, dup, pid): |
| | pkt = bytearray(b"\x30\0\0\0") |
| | pkt[0] |= qos << 1 | retain | dup << 3 |
| | sz = 2 + len(topic) + len(msg) |
| | if qos > 0: |
| | sz += 2 |
| | if sz >= 2097152: |
| | raise MQTTException("Strings too long.") |
| | i = 1 |
| | while sz > 0x7F: |
| | pkt[i] = (sz & 0x7F) | 0x80 |
| | sz >>= 7 |
| | i += 1 |
| | pkt[i] = sz |
| | await self._as_write(pkt, i + 1) |
| | await self._send_str(topic) |
| | if qos > 0: |
| | struct.pack_into("!H", pkt, 0, pid) |
| | await self._as_write(pkt, 2) |
| | await self._as_write(msg) |
| |
|
| | |
| | async def subscribe(self, topic, qos): |
| | pkt = bytearray(b"\x82\0\0\0") |
| | pid = next(self.newpid) |
| | self.rcv_pids.add(pid) |
| | struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, pid) |
| | async with self.lock: |
| | await self._as_write(pkt) |
| | await self._send_str(topic) |
| | await self._as_write(qos.to_bytes(1, "little")) |
| |
|
| | if not await self._await_pid(pid): |
| | raise OSError(-1) |
| |
|
| | |
| | async def unsubscribe(self, topic): |
| | pkt = bytearray(b"\xa2\0\0\0") |
| | pid = next(self.newpid) |
| | self.rcv_pids.add(pid) |
| | struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic), pid) |
| | async with self.lock: |
| | await self._as_write(pkt) |
| | await self._send_str(topic) |
| |
|
| | if not await self._await_pid(pid): |
| | raise OSError(-1) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | async def wait_msg(self): |
| | try: |
| | res = self._sock.read(1) |
| | except OSError as e: |
| | if e.args[0] in BUSY_ERRORS: |
| | await asyncio.sleep_ms(0) |
| | return |
| | raise |
| | if res is None: |
| | return |
| | if res == b"": |
| | raise OSError(-1, "Empty response") |
| |
|
| | if res == b"\xd0": |
| | await self._as_read(1) |
| | return |
| | op = res[0] |
| |
|
| | if op == 0x40: |
| | sz = await self._as_read(1) |
| | if sz != b"\x02": |
| | raise OSError(-1, "Invalid PUBACK packet") |
| | rcv_pid = await self._as_read(2) |
| | pid = rcv_pid[0] << 8 | rcv_pid[1] |
| | if pid in self.rcv_pids: |
| | self.rcv_pids.discard(pid) |
| | else: |
| | raise OSError(-1, "Invalid pid in PUBACK packet") |
| |
|
| | if op == 0x90: |
| | resp = await self._as_read(4) |
| | if resp[3] == 0x80: |
| | raise OSError(-1, "Invalid SUBACK packet") |
| | pid = resp[2] | (resp[1] << 8) |
| | if pid in self.rcv_pids: |
| | self.rcv_pids.discard(pid) |
| | else: |
| | raise OSError(-1, "Invalid pid in SUBACK packet") |
| |
|
| | if op == 0xB0: |
| | resp = await self._as_read(3) |
| | pid = resp[2] | (resp[1] << 8) |
| | if pid in self.rcv_pids: |
| | self.rcv_pids.discard(pid) |
| | else: |
| | raise OSError(-1) |
| |
|
| | if op & 0xF0 != 0x30: |
| | return |
| | sz = await self._recv_len() |
| | topic_len = await self._as_read(2) |
| | topic_len = (topic_len[0] << 8) | topic_len[1] |
| | topic = await self._as_read(topic_len) |
| | sz -= topic_len + 2 |
| | if op & 6: |
| | pid = await self._as_read(2) |
| | pid = pid[0] << 8 | pid[1] |
| | sz -= 2 |
| | msg = await self._as_read(sz) |
| | retained = op & 0x01 |
| | if self._events: |
| | self.queue.put(topic, msg, bool(retained)) |
| | else: |
| | self._cb(topic, msg, bool(retained)) |
| | if op & 6 == 2: |
| | pkt = bytearray(b"\x40\x02\0\0") |
| | struct.pack_into("!H", pkt, 2, pid) |
| | await self._as_write(pkt) |
| | elif op & 6 == 4: |
| | raise OSError(-1, "QoS 2 not supported") |
| |
|
| |
|
| | |
| |
|
| |
|
| | class MQTTClient(MQTT_base): |
| | def __init__(self, config): |
| | super().__init__(config) |
| | self._isconnected = False |
| | keepalive = 1000 * self._keepalive |
| | self._ping_interval = keepalive // 4 if keepalive else 20000 |
| | p_i = ( |
| | config["ping_interval"] * 1000 |
| | ) |
| | if p_i and p_i < self._ping_interval: |
| | self._ping_interval = p_i |
| | self._in_connect = False |
| | self._has_connected = False |
| | self._tasks = [] |
| | if ESP8266: |
| | import esp |
| |
|
| | esp.sleep_type( |
| | 0 |
| | ) |
| |
|
| | async def wifi_connect(self, quick=False): |
| | s = self._sta_if |
| | if ESP8266: |
| | if s.isconnected(): |
| | return |
| | s.active(True) |
| | s.connect() |
| | for _ in range(60): |
| | if ( |
| | s.status() != network.STAT_CONNECTING |
| | ): |
| | break |
| | await asyncio.sleep(1) |
| | if ( |
| | s.status() == network.STAT_CONNECTING |
| | ): |
| | s.disconnect() |
| | await asyncio.sleep(1) |
| | if ( |
| | not s.isconnected() |
| | and self._ssid is not None |
| | and self._wifi_pw is not None |
| | ): |
| | s.connect(self._ssid, self._wifi_pw) |
| | while ( |
| | s.status() == network.STAT_CONNECTING |
| | ): |
| | await asyncio.sleep(1) |
| | else: |
| | s.active(True) |
| | if RP2: |
| | |
| | |
| | s.config(pm=0xA11140) |
| | s.connect(self._ssid, self._wifi_pw) |
| | for _ in range(60): |
| | await asyncio.sleep(1) |
| | |
| | if s.isconnected(): |
| | break |
| | if ESP32: |
| | if s.status() != network.STAT_CONNECTING: |
| | break |
| | elif PYBOARD: |
| | if not 1 <= s.status() <= 2: |
| | break |
| | elif RP2: |
| | if not 1 <= s.status() <= 2: |
| | break |
| | else: |
| | s.disconnect() |
| | await asyncio.sleep(1) |
| |
|
| | if not s.isconnected(): |
| | raise OSError("Wi-Fi connect timed out") |
| | if not quick: |
| | |
| | self.dprint("Checking WiFi integrity.") |
| | for _ in range(5): |
| | if not s.isconnected(): |
| | raise OSError("Connection Unstable") |
| | await asyncio.sleep(1) |
| | self.dprint("Got reliable connection") |
| |
|
| | async def connect( |
| | self, *, quick=False |
| | ): |
| | if not self._has_connected: |
| | await self.wifi_connect(quick) |
| | |
| | |
| | self._addr = socket.getaddrinfo(self.server, self.port)[0][-1] |
| | self._in_connect = True |
| | try: |
| | if not self._has_connected and self._clean_init and not self._clean: |
| | |
| | |
| | await self._connect(True) |
| | try: |
| | async with self.lock: |
| | self._sock.write( |
| | b"\xe0\0" |
| | ) |
| | except OSError: |
| | pass |
| | self.dprint("Waiting for disconnect") |
| | await asyncio.sleep(2) |
| | self.dprint("About to reconnect with unclean session.") |
| | await self._connect(self._clean) |
| | except Exception: |
| | self._close() |
| | self._in_connect = False |
| | raise |
| | self.rcv_pids.clear() |
| | |
| | self._isconnected = True |
| | self._in_connect = False |
| | if not self._events: |
| | asyncio.create_task(self._wifi_handler(True)) |
| | if not self._has_connected: |
| | self._has_connected = True |
| | asyncio.create_task(self._keep_connected()) |
| | |
| |
|
| | asyncio.create_task(self._handle_msg()) |
| | self._tasks.append(asyncio.create_task(self._keep_alive())) |
| | if self.DEBUG: |
| | self._tasks.append(asyncio.create_task(self._memory())) |
| | if self._events: |
| | self.up.set() |
| | else: |
| | asyncio.create_task(self._connect_handler(self)) |
| |
|
| | |
| | |
| | async def _handle_msg(self): |
| | try: |
| | while self.isconnected(): |
| | async with self.lock: |
| | await self.wait_msg() |
| | await asyncio.sleep_ms(_DEFAULT_MS) |
| |
|
| | except OSError: |
| | pass |
| | self._reconnect() |
| |
|
| | |
| | |
| | async def _keep_alive(self): |
| | while self.isconnected(): |
| | pings_due = ticks_diff(ticks_ms(), self.last_rx) // self._ping_interval |
| | if pings_due >= 4: |
| | self.dprint("Reconnect: broker fail.") |
| | break |
| | await asyncio.sleep_ms(self._ping_interval) |
| | try: |
| | await self._ping() |
| | except OSError: |
| | break |
| | self._reconnect() |
| |
|
| | async def _kill_tasks(self, kill_skt): |
| | for task in self._tasks: |
| | task.cancel() |
| | self._tasks.clear() |
| | await asyncio.sleep_ms(0) |
| | if kill_skt: |
| | self._close() |
| |
|
| | |
| | async def _memory(self): |
| | while True: |
| | await asyncio.sleep(20) |
| | gc.collect() |
| | self.dprint("RAM free %d alloc %d", gc.mem_free(), gc.mem_alloc()) |
| |
|
| | def isconnected(self): |
| | if self._in_connect: |
| | return True |
| | if self._isconnected and not self._sta_if.isconnected(): |
| | self._reconnect() |
| | return self._isconnected |
| |
|
| | def _reconnect(self): |
| | if self._isconnected: |
| | self._isconnected = False |
| | asyncio.create_task(self._kill_tasks(True)) |
| | if self._events: |
| | self.down.set() |
| | else: |
| | asyncio.create_task(self._wifi_handler(False)) |
| |
|
| | |
| | async def _connection(self): |
| | while not self._isconnected: |
| | await asyncio.sleep(1) |
| |
|
| | |
| | |
| | async def _keep_connected(self): |
| | while self._has_connected: |
| | if self.isconnected(): |
| | await asyncio.sleep(1) |
| | gc.collect() |
| | else: |
| | try: |
| | self._sta_if.disconnect() |
| | except OSError: |
| | self.dprint("Wi-Fi not started, unable to disconnect interface") |
| | await asyncio.sleep(1) |
| | try: |
| | await self.wifi_connect() |
| | except OSError: |
| | continue |
| | if ( |
| | not self._has_connected |
| | ): |
| | self.dprint("Disconnected, exiting _keep_connected") |
| | break |
| | try: |
| | await self.connect() |
| | |
| | self.dprint("Reconnect OK!") |
| | except OSError as e: |
| | self.dprint("Error in reconnect. %s", e) |
| | |
| | self._close() |
| | self._in_connect = False |
| | self._isconnected = False |
| | self.dprint("Disconnected, exited _keep_connected") |
| |
|
| | async def subscribe(self, topic, qos=0): |
| | qos_check(qos) |
| | while 1: |
| | await self._connection() |
| | try: |
| | return await super().subscribe(topic, qos) |
| | except OSError: |
| | pass |
| | self._reconnect() |
| |
|
| | async def unsubscribe(self, topic): |
| | while 1: |
| | await self._connection() |
| | try: |
| | return await super().unsubscribe(topic) |
| | except OSError: |
| | pass |
| | self._reconnect() |
| |
|
| | async def publish(self, topic, msg, retain=False, qos=0): |
| | qos_check(qos) |
| | while 1: |
| | await self._connection() |
| | try: |
| | return await super().publish(topic, msg, retain, qos) |
| | except OSError: |
| | pass |
| | self._reconnect() |
| |
|