-
I have the follow python code, which is modified from the pubsub sample from awscrt import mqtt, http
from awsiot import mqtt_connection_builder
import sys
import secrets
import threading
import time
import json
import os
class example_settings:
def __init__(self, cred_dir, sn, mode):
self.CREDENTIALS_DIR = cred_dir
self.SERIAL_NUMBER = sn
self.HEADLESS_START = mode
def on_connection_interrupted(connection, error, **kwargs):
print("Connection interrupted. error: {}".format(error))
# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))
if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
print("Session did not persist. Resubscribing to existing topics...")
resubscribe_future, _ = connection.resubscribe_existing_topics()
# Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
# evaluate result with a callback instead.
resubscribe_future.add_done_callback(on_resubscribe_complete)
def on_resubscribe_complete(resubscribe_future):
resubscribe_results = resubscribe_future.result()
print("Resubscribe results: {}".format(resubscribe_results))
for topic, qos in resubscribe_results['topics']:
if qos is None:
sys.exit("Server rejected resubscribe to topic: {}".format(topic))
# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
print("Received message from topic '{}': {}".format(topic, payload))
# Callback when the connection successfully connects
def on_connection_success(connection, callback_data):
assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
print("Connection Successful with return code: {} session present: {}".format(callback_data.return_code, callback_data.session_present))
# Callback when a connection attempt fails
def on_connection_failure(connection, callback_data):
assert isinstance(callback_data, mqtt.OnConnectionFailureData)
print("Connection failed with error code: {}".format(callback_data.error))
# Callback when a connection has been disconnected or shutdown successfully
def on_connection_closed(connection, callback_data):
print("Connection closed")
class mqtt_transmitter:
def __init__(self, cert_fp, pkey_fp, ca_fp, endpoint="a3dkrncnrub5qb-ats.iot.us-east-2.amazonaws.com"):
self.mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=endpoint,
port=8883,
cert_filepath=os.path.join(settings.CREDENTIALS_DIR, cert_fp),
pri_key_filepath=os.path.join(settings.CREDENTIALS_DIR, pkey_fp),
ca_filepath=os.path.join(settings.CREDENTIALS_DIR, ca_fp),
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=settings.SERIAL_NUMBER,
clean_session=settings.HEADLESS_START == 'receiver',
keep_alive_secs=30,
http_proxy_options=None,
on_connection_success=on_connection_success,
on_connection_failure=on_connection_failure,
on_connection_closed=on_connection_closed)
'''
if not cmdData.input_is_ci:
print(f"Connecting to {cmdData.input_endpoint} with client ID '{cmdData.input_clientId}'...")
else:
print("Connecting to endpoint with client ID")
'''
connect_future = self.mqtt_connection.connect()
# Future.result() waits until a result is available
connect_future.result()
def subscribe(self, topic, action=on_message_received):
# Subscribe
print("Subscribing to topic '{}'...".format(topic))
subscribe_future, packet_id = self.mqtt_connection.subscribe(
topic=topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=action)
subscribe_result = subscribe_future.result()
print("Subscribed with {}".format(str(subscribe_result)))
def publish(self, topic, payload:dict):
self.mqtt_connection.publish(
topic=topic,
payload=json.dumps(payload),
qos=mqtt.QoS.AT_LEAST_ONCE)
def publish_example_message(self, topic, count):
hexdata = secrets.token_hex(1280)
self.publish(topic,
{'device':'labnuc',
'stream':'DEV_MFAM_13',
'count' :count,
'data' :hexdata})
def disconnect(self):
fut = self.mqtt_connection.disconnect()
fut.result()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('serial_number', help='device serial number, used for mqtt ClientID')
parser.add_argument('mode', help='receiver or publisher')
parser.add_argument('cred_path', help='path to directory containing credentials')
parser.add_argument('cert_fp', help='cert filename')
parser.add_argument('pkey_fp', help='private key filename')
parser.add_argument('ca_fp', help='ca filename')
topic = 'datastream'
args = parser.parse_args()
settings = example_settings(args.cred_path, args.serial_number, args.mode)
transmitter = mqtt_transmitter(args.cert_fp, args.pkey_fp, args.ca_fp)
if args.mode == 'publisher':
for i in range(1000):
transmitter.publish_example_message(topic, i)
time.sleep(.1)
if args.mode == 'receiver':
transmitter.subscribe(topic)
for i in range(1000): # hold program open
time.sleep(.1)
else:
from src.Settings.settingsMgr import settingsManager as settings I am currently trying to setup a system of one publisher which publishes data 40 times per second, and one subscriber. Each creates a mqtt_transmitter object. The issue is that on both sides I intermittently receive the following 'Connection interrupted. error: AWS_ERROR_MQTT_UNEXPECTED_HANGUP: The connection was closed unexpectedly.' The data being published is time series data so interpreting it on the subscriber side I can see that it is correct and comes in with low latency (although out of order), but drops out completely for seconds at a time. Interestingly I created a stripped example in the main block of the above code, where a 'publisher' and 'receiver' can be created where the publisher publishes identically structured data as my system. Running these two the data comes in order, with no issue. Wondering what about my complete system could cause this issue with intermittent AWS_ERROR_MQTT_UNEXPECTED_HANGUP. It is a multi-threaded program, but I only have one thread publishing to the mqtt_transmitter. Does the connection object somehow need to be contained in it's own dedicated thread? |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments
-
This like a more complex topic, so I created an issue here to figure out what might be causing this. |
Beta Was this translation helpful? Give feedback.
-
If you are just getting started, I highly recommend you use the MQTT5 client instead of the MQTT311 client. It's better in a number of ways including more information related to disconnections and errors. The setup process is very similar and you can use the same certificate/key/etc during its setup. The MQTT5 sample can be found here: https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/samples/mqtt5_pubsub.py Disconnects from the IoT server side on MQTT311 don't provide much information on why they're happening. Depending on how quickly these unexpected hangups are occurring, one potential cause may be that you're using the same client id for both connections and they are causing each other to be disconnected. If both clients are connecting from the same device, the client ID generation in the code above shows they are using a serial number which may be shared. You can also try setting up CloudWatch logs in AWS to see if you can find why a disconnect is being sent. You can also turn on logging in the SDK to dig deeper into what is happening just prior to getting disconnected: https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/documents/FAQ.md#how-do-i-enable-logging |
Beta Was this translation helpful? Give feedback.
-
Thanks for the info. I will work to change my code to use the MQTT5 instead, appreciate these examples. Will post again with the results of that |
Beta Was this translation helpful? Give feedback.
If you are just getting started, I highly recommend you use the MQTT5 client instead of the MQTT311 client. It's better in a number of ways including more information related to disconnections and errors. The setup process is very similar and you can use the same certificate/key/etc during its setup.
The MQTT5 sample can be found here: https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/samples/mqtt5_pubsub.py
And its readme here: https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/samples/mqtt5_pubsub.md
The MQTT5 client also has a very good user guide here: https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/documents/MQTT5_Userguide.md
Disconnects from the…