From a9e3aed65b0e865e31d720ab5f237987d7175af8 Mon Sep 17 00:00:00 2001 From: Juha Ylikoski Date: Mon, 14 Jun 2021 22:55:20 +0300 Subject: [PATCH 1/2] Changed library to use background loop instead of manually looping --- README.rst | 4 +- mosquitto/mosquitto-no-passwd.conf | 2 + mosquitto/mosquitto.conf | 3 +- src/MQTTLibrary/MQTTKeywords.py | 158 ++++++++++++++++------------- 4 files changed, 92 insertions(+), 75 deletions(-) create mode 100644 mosquitto/mosquitto-no-passwd.conf diff --git a/README.rst b/README.rst index 4d1c125..7563db1 100644 --- a/README.rst +++ b/README.rst @@ -74,8 +74,8 @@ The keywords in this library are based on some of the methods available in eclip The tests are in ``tests`` folder and make use of Robot Framework itself. They are run automatically through travis when code is pushed to a branch. When run locally, these tests rely on locally running mqtt brokers. We need 2 running brokers, one without auth that is used by most of the tests, and the other one with auth (configuration file is provided). You'll need to start them before running the tests. You can then run the tests locally:: docker pull eclipse-mosquitto - docker run -d -p 1883:1883 eclipse-mosquitto - docker run -d -p 11883:1883 -p 9001:9001 -v $(pwd)/mosquitto:/mosquitto/config eclipse-mosquitto + docker run -d -p 1883:1883 -v $(pwd)/mosquitto/mosquitto-no-passwd.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto + docker run -d -p 11883:1883 -p 9001:9001 -v $(pwd)/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf -v $(pwd)/mosquitto/passwd_file:/mosquitto/config/passwd_file eclipse-mosquitto robot -P src tests diff --git a/mosquitto/mosquitto-no-passwd.conf b/mosquitto/mosquitto-no-passwd.conf new file mode 100644 index 0000000..c8348ac --- /dev/null +++ b/mosquitto/mosquitto-no-passwd.conf @@ -0,0 +1,2 @@ +listener 1883 +allow_anonymous true diff --git a/mosquitto/mosquitto.conf b/mosquitto/mosquitto.conf index 5f47cb3..64cb39d 100644 --- a/mosquitto/mosquitto.conf +++ b/mosquitto/mosquitto.conf @@ -1,2 +1,3 @@ allow_anonymous false -password_file ./mosquitto/config/passwd_file \ No newline at end of file +password_file /mosquitto/config/passwd_file +listener 1883 \ No newline at end of file diff --git a/src/MQTTLibrary/MQTTKeywords.py b/src/MQTTLibrary/MQTTKeywords.py index 63934b5..c269ab9 100644 --- a/src/MQTTLibrary/MQTTKeywords.py +++ b/src/MQTTLibrary/MQTTKeywords.py @@ -80,14 +80,17 @@ def connect(self, broker, port=1883, client_id="", clean_session=True): self._mqttc.connect(broker, int(port)) timer_start = time.time() - while time.time() < timer_start + self._loop_timeout: - if self._connected or self._unexpected_disconnect: - break; - self._mqttc.loop() - - if self._unexpected_disconnect: - raise RuntimeError("The client disconnected unexpectedly") - logger.debug('client_id: %s' % self._mqttc._client_id) + try: + self._mqttc.loop_start() + while time.time() < timer_start + self._loop_timeout: + time.sleep(1e-3) + if self._connected or self._unexpected_disconnect: + break + if self._unexpected_disconnect: + raise RuntimeError("The client disconnected unexpectedly") + logger.debug('client_id: %s' % self._mqttc._client_id) + finally: + self._mqttc.loop_stop() return self._mqttc def publish(self, topic, message=None, qos=0, retain=False): @@ -108,22 +111,26 @@ def publish(self, topic, message=None, qos=0, retain=False): | Publish | test/test | test message | 1 | ${false} | """ - logger.info('Publish topic: %s, message: %s, qos: %s, retain: %s' - % (topic, message, qos, retain)) - self._mid = -1 - self._mqttc.on_publish = self._on_publish - result, mid = self._mqttc.publish(topic, message, int(qos), retain) - if result != 0: - raise RuntimeError('Error publishing: %s' % result) - - timer_start = time.time() - while time.time() < timer_start + self._loop_timeout: - if mid == self._mid: - break; - self._mqttc.loop() - - if mid != self._mid: - logger.warn('mid wasn\'t matched: %s' % mid) + try: + self._mqttc.loop_start() + logger.info('Publish topic: %s, message: %s, qos: %s, retain: %s' + % (topic, message, qos, retain)) + self._mid = -1 + self._mqttc.on_publish = self._on_publish + + result, mid = self._mqttc.publish(topic, message, int(qos), retain) + if result != 0: + raise RuntimeError('Error publishing: %s' % result) + + timer_start = time.time() + while time.time() < timer_start + self._loop_timeout: + if mid == self._mid: + break + + if mid != self._mid: + logger.warn('mid wasn\'t matched: %s' % mid) + finally: + self._mqttc.loop_stop() def subscribe(self, topic, qos, timeout=1, limit=1): """ Subscribe to a topic and return a list of message payloads received @@ -166,17 +173,16 @@ def subscribe(self, topic, qos, timeout=1, limit=1): return self._messages[topic] timer_start = time.time() - while time.time() < timer_start + seconds: - if limit == 0 or len(self._messages[topic]) < limit: - self._mqttc.loop() - else: - # workaround for client to ack the publish. Otherwise, - # it seems that if client disconnects quickly, broker - # will not get the ack and publish the message again on - # next connect. - time.sleep(1) - break + + self._mqttc.loop_start() + try: + while time.time() < timer_start + seconds: + if limit == 0 or len(self._messages[topic]) < limit: + time.sleep(1e-3) + finally: + self._mqttc.loop_stop() return self._messages[topic] + def listen(self, topic, timeout=1, limit=1): """ Listen to a topic and return a list of message payloads received @@ -202,7 +208,7 @@ def listen(self, topic, timeout=1, limit=1): timer_start = time.time() while time.time() < timer_start + self._loop_timeout: if self._subscribed: - break; + break time.sleep(1) if not self._subscribed: logger.warn('Cannot listen when not subscribed to a topic') @@ -223,22 +229,15 @@ def listen(self, topic, timeout=1, limit=1): logger.info('Listening on topic: %s' % topic) timer_start = time.time() - while time.time() < timer_start + seconds: - if limit == 0 or len(self._messages[topic]) < limit: - # If the loop is running in the background - # merely sleep here for a second or so and continue - # otherwise, do the loop ourselves - if self._background_mqttc: - time.sleep(1) + self._mqttc.loop_start() + try: + while time.time() < timer_start + seconds: + if limit == 0 or len(self._messages[topic]) < limit: + time.sleep(1e-3) else: - self._mqttc.loop() - else: - # workaround for client to ack the publish. Otherwise, - # it seems that if client disconnects quickly, broker - # will not get the ack and publish the message again on - # next connect. - time.sleep(1) - break + break + finally: + self._mqttc.loop_stop() messages = self._messages[topic][:] # Copy the list's contents self._messages[topic] = [] @@ -273,13 +272,17 @@ def subscribe_and_validate(self, topic, qos, payload, timeout=1): self._mqttc.subscribe(str(topic), int(qos)) timer_start = time.time() - while time.time() < timer_start + seconds: - if self._verified: - break - self._mqttc.loop() + self._mqttc.loop_start() + try: + while time.time() < timer_start + seconds: + if self._verified: + break + time.sleep(1e-3) - if not self._verified: - raise AssertionError("The expected payload didn't arrive in the topic") + if not self._verified: + raise AssertionError("The expected payload didn't arrive in the topic") + finally: + self._mqttc.loop_stop() def unsubscribe(self, topic): """ Unsubscribe the client from the specified topic. @@ -309,13 +312,17 @@ def unsubscribe(self, topic): self._mqttc.on_unsubscribe = self._on_unsubscribe self._mqttc.unsubscribe(str(topic)) + self._mqttc.loop_start() timer_start = time.time() - while (not self._unsubscribed and - time.time() < timer_start + self._loop_timeout): - self._mqttc.loop() + try: + while (not self._unsubscribed and + time.time() < timer_start + self._loop_timeout): + time.sleep(1e-3) - if not self._unsubscribed: - logger.warn('Client didn\'t receive an unsubscribe callback') + if not self._unsubscribed: + logger.warn('Client didn\'t receive an unsubscribe callback') + finally: + self._mqttc.loop_stop() def disconnect(self): """ Disconnect from MQTT Broker. @@ -336,12 +343,18 @@ def disconnect(self): self._mqttc.disconnect() timer_start = time.time() - while time.time() < timer_start + self._loop_timeout: - if self._disconnected or self._unexpected_disconnect: - break; - self._mqttc.loop() - if self._unexpected_disconnect: - raise RuntimeError("The client disconnected unexpectedly") + self._mqttc.loop_start() + try: + while time.time() < timer_start + self._loop_timeout: + if self._disconnected or self._unexpected_disconnect: + break + + if self._unexpected_disconnect: + raise RuntimeError("The client disconnected unexpectedly") + + finally: + self._mqttc.loop_stop() + def publish_single(self, topic, payload=None, qos=0, retain=False, hostname="localhost", port=1883, client_id="", keepalive=60, @@ -446,11 +459,12 @@ def _on_connect(self, client, userdata, flags, rc): self._connected = True if rc == 0 else False def _on_disconnect(self, client, userdata, rc): - if rc == 0: - self._disconnected = True - self._unexpected_disconnect = False - else: - self._unexpected_disconnect = True + pass + # if rc == 0: + # self._disconnected = True + # self._unexpected_disconnect = False + # else: + # self._unexpected_disconnect = True def _on_subscribe(self, client, userdata, mid, granted_qos): self._subscribed = True From 8e537b1fc6758b1edbda905ab609dc72a6c61534 Mon Sep 17 00:00:00 2001 From: Juha Ylikoski Date: Mon, 14 Jun 2021 23:24:24 +0300 Subject: [PATCH 2/2] Updated travis mosquitto launch to work with mosquitto 2.0 --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 37e6327..cd35929 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,8 +7,8 @@ python: - '3.7' before_install: - docker pull eclipse-mosquitto -- docker run -d -p 11883:1883 -p 9001:9001 -v $(pwd)/mosquitto:/mosquitto/config eclipse-mosquitto -- docker run -d -p 1883:1883 eclipse-mosquitto +- docker run -d -p 1883:1883 -v $(pwd)/mosquitto/mosquitto-no-passwd.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto +- docker run -d -p 11883:1883 -p 9001:9001 -v $(pwd)/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf -v $(pwd)/mosquitto/passwd_file:/mosquitto/config/passwd_file eclipse-mosquitto install: - pip install -r requirements.txt script: