diff --git a/awsiot/mqtt_connection_builder.py b/awsiot/mqtt_connection_builder.py index 785cd774..a78a1850 100644 --- a/awsiot/mqtt_connection_builder.py +++ b/awsiot/mqtt_connection_builder.py @@ -39,6 +39,27 @@ * `**kwargs` (dict): Forward-compatibility kwargs. + **on_connection_success** (`Callable`): Optional callback invoked whenever the connection successfully connects. + The function should take the following arguments and return nothing: + + * `connection` (:class:`awscrt.mqtt.Connection`): This MQTT Connection. + + * `callback_data` (:class:`awscrt.mqtt.OnConnectionSuccessData`): The data returned from the connection success. + + **on_connection_failure** (`Callable`): Optional callback invoked whenever the connection fails to connect. + The function should take the following arguments and return nothing: + + * `connection` (:class:`awscrt.mqtt.Connection`): This MQTT Connection. + + * `callback_data` (:class:`awscrt.mqtt.OnConnectionFailureData`): The data returned from the connection failure. + + **on_connection_closed** (`Callable`): Optional callback invoked whenever the connection has been disconnected and shutdown successfully. + The function should take the following arguments and return nothing: + + * `connection` (:class:`awscrt.mqtt.Connection`): This MQTT Connection. + + * `callback_data` (:class:`awscrt.mqtt.OnConnectionClosedData`): The data returned from the connection close. + **clean_session** (`bool`): Whether or not to start a clean session with each reconnect. If True, the server will forget all subscriptions with each reconnect. Set False to request that the server resume an existing session @@ -230,6 +251,9 @@ def _builder( use_websockets=use_websockets, websocket_handshake_transform=websocket_handshake_transform, proxy_options=proxy_options, + on_connection_success=_get(kwargs, 'on_connection_success'), + on_connection_failure=_get(kwargs, 'on_connection_failure'), + on_connection_closed=_get(kwargs, 'on_connection_closed'), ) diff --git a/samples/pubsub.py b/samples/pubsub.py index b71dbf2b..14b50dc0 100644 --- a/samples/pubsub.py +++ b/samples/pubsub.py @@ -58,6 +58,19 @@ def on_message_received(topic, payload, dup, qos, retain, **kwargs): if received_count == cmdData.input_count: received_all_event.set() +# Callback when the connection successfully connects +def on_connection_success(connection, callback_data): + assert isinstance(callback_data, mqtt.OnConnectionSuccessData) + print("Connection Successful with return code: {} session present: {}".format(callback_data.return_code, callback_data.session_present)) + +# Callback when a connection attempt fails +def on_connection_failure(connection, callback_data): + assert isinstance(callback_data, mqtt.OnConnectionFailuredata) + print("Connection failed with error code: {}".format(callback_data.error)) + +# Callback when a connection has been disconnected or shutdown successfully +def on_connection_closed(connection, callback_data): + print("Connection closed") if __name__ == '__main__': # Create the proxy options if the data is present in cmdData @@ -79,7 +92,10 @@ def on_message_received(topic, payload, dup, qos, retain, **kwargs): client_id=cmdData.input_clientId, clean_session=False, keep_alive_secs=30, - http_proxy_options=proxy_options) + http_proxy_options=proxy_options, + on_connection_success=on_connection_success, + on_connection_failure=on_connection_failure, + on_connection_closed=on_connection_closed) if not cmdData.input_is_ci: print(f"Connecting to {cmdData.input_endpoint} with client ID '{cmdData.input_clientId}'...")