From 32102558d3c0d58c8ed45e0029a29da26cb398a5 Mon Sep 17 00:00:00 2001 From: Euripedes Rocha Date: Mon, 12 Dec 2022 15:27:40 +0100 Subject: [PATCH] Feature: Enable SUBSCRIBE to multiple topics - Adds an api for multiple topics on SUBSCRIBE message. Apply 2 suggestion(s) to 1 file(s) Removing headers y --- include/mqtt_client.h | 273 +++++++++++++++++++++++----------------- lib/include/mqtt5_msg.h | 2 +- lib/include/mqtt_msg.h | 2 +- lib/mqtt5_msg.c | 73 +++++------ lib/mqtt_msg.c | 26 ++-- mqtt_client.c | 46 ++++--- 6 files changed, 248 insertions(+), 174 deletions(-) diff --git a/include/mqtt_client.h b/include/mqtt_client.h index aa2b1ece..f1037948 100644 --- a/include/mqtt_client.h +++ b/include/mqtt_client.h @@ -32,7 +32,7 @@ typedef struct esp_mqtt_client *esp_mqtt_client_handle_t; * @brief *MQTT* event types. * * User event handler receives context data in `esp_mqtt_event_t` structure with - * - `client` - *MQTT* client handle + * - client - *MQTT* client handle * - various other data depending on event type * */ @@ -223,132 +223,140 @@ typedef esp_err_t (*mqtt_event_callback_t)(esp_mqtt_event_handle_t event); * character and the related len field set to 0. DER format requires a related len field set to the correct length. */ typedef struct esp_mqtt_client_config_t { - /** - * Broker related configuration - */ - struct broker_t { - /** - * Broker address - * - * - uri have precedence over other fields - * - If uri isn't set at least hostname, transport and port should. - */ - struct address_t { - const char *uri; /*!< Complete *MQTT* broker URI */ - const char *hostname; /*!< Hostname, to set ipv4 pass it as string) */ - esp_mqtt_transport_t transport; /*!< Selects transport*/ - const char *path; /*!< Path in the URI*/ - uint32_t port; /*!< *MQTT* server port */ - } address; /*!< Broker address configuration */ - /** - * Broker identity verification - * - * If fields are not set broker's identity isn't verified. it's recommended - * to set the options in this struct for security reasons. - */ - struct verification_t { - bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls + /** + * Broker related configuration + */ + struct broker_t { + /** + * Broker address + * + * - uri have precedence over other fields + * - If uri isn't set at least hostname, transport and port should. + */ + struct address_t { + const char *uri; /*!< Complete *MQTT* broker URI */ + const char *hostname; /*!< Hostname, to set ipv4 pass it as string) */ + esp_mqtt_transport_t transport; /*!< Selects transport*/ + const char *path; /*!< Path in the URI*/ + uint32_t port; /*!< *MQTT* server port */ + } address; /*!< Broker address configuration */ + /** + * Broker identity verification + * + * If fields are not set broker's identity isn't verified. it's recommended + * to set the options in this struct for security reasons. + */ + struct verification_t { + bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls documentation for details. */ - esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for + esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for the usage of certificate bundles. */ - const char *certificate; /*!< Certificate data, default is NULL, not required to verify the server. */ - size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */ - const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK + const char *certificate; /*!< Certificate data, default is NULL, not required to verify the server. */ + size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */ + const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK authentication (as alternative to certificate verification). PSK is enabled only if there are no other ways to verify broker.*/ - bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the + bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the security of TLS and makes the *MQTT* client susceptible to MITM attacks */ - const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN */ - } verification; /*!< Security verification of the broker */ - } broker; /*!< Broker address and security verification */ - /** - * Client related credentials for authentication. - */ - struct credentials_t { - const char *username; /*!< *MQTT* username */ - const char *client_id; /*!< Set *MQTT* client identifier. Ignored if set_null_client_id == true If NULL set + const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN */ + } verification; /*!< Security verification of the broker */ + } broker; /*!< Broker address and security verification */ + /** + * Client related credentials for authentication. + */ + struct credentials_t { + const char *username; /*!< *MQTT* username */ + const char *client_id; /*!< Set *MQTT* client identifier. Ignored if set_null_client_id == true If NULL set the default client id. Default client id is ``ESP32_%CHIPID%`` where `%CHIPID%` are last 3 bytes of MAC address in hex format */ - bool set_null_client_id; /*!< Selects a NULL client id */ - /** - * Client authentication - * - * Fields related to client authentication by broker - * - * For mutual authentication using TLS, user could select certificate and key, - * secure element or digital signature peripheral if available. - * - */ - struct authentication_t { - const char *password; /*!< *MQTT* password */ - const char *certificate; /*!< Certificate for ssl mutual authentication, not required if mutual + bool set_null_client_id; /*!< Selects a NULL client id */ + /** + * Client authentication + * + * Fields related to client authentication by broker + * + * For mutual authentication using TLS, user could select certificate and key, + * secure element or digital signature peripheral if available. + * + */ + struct authentication_t { + const char *password; /*!< *MQTT* password */ + const char *certificate; /*!< Certificate for ssl mutual authentication, not required if mutual authentication is not needed. Must be provided with `key`.*/ - size_t certificate_len; /*!< Length of the buffer pointed to by certificate.*/ - const char *key; /*!< Private key for SSL mutual authentication, not required if mutual authentication + size_t certificate_len; /*!< Length of the buffer pointed to by certificate.*/ + const char *key; /*!< Private key for SSL mutual authentication, not required if mutual authentication is not needed. If it is not NULL, also `certificate` has to be provided.*/ - size_t key_len; /*!< Length of the buffer pointed to by key.*/ - const char *key_password; /*!< Client key decryption password, not PEM nor DER, if provided + size_t key_len; /*!< Length of the buffer pointed to by key.*/ + const char *key_password; /*!< Client key decryption password, not PEM nor DER, if provided `key_password_len` must be correctly set. */ - int key_password_len; /*!< Length of the password pointed to by `key_password` */ - bool use_secure_element; /*!< Enable secure element, available in ESP32-ROOM-32SE, for SSL connection */ - void *ds_data; /*!< Carrier of handle for digital signature parameters, digital signature peripheral is + int key_password_len; /*!< Length of the password pointed to by `key_password` */ + bool use_secure_element; /*!< Enable secure element, available in ESP32-ROOM-32SE, for SSL connection */ + void *ds_data; /*!< Carrier of handle for digital signature parameters, digital signature peripheral is available in some Espressif devices. */ - } authentication; /*!< Client authentication */ - } credentials; /*!< User credentials for broker */ - /** - * *MQTT* Session related configuration - */ - struct session_t { - /** - * Last Will and Testament message configuration. - */ - struct last_will_t { - const char *topic; /*!< LWT (Last Will and Testament) message topic */ - const char *msg; /*!< LWT message, may be NULL terminated*/ - int msg_len; /*!< LWT message length, if msg isn't NULL terminated must have the correct length */ - int qos; /*!< LWT message QoS */ - int retain; /*!< LWT retained message flag */ - } last_will; /*!< Last will configuration */ - bool disable_clean_session; /*!< *MQTT* clean session, default clean_session is true */ - int keepalive; /*!< *MQTT* keepalive, default is 120 seconds */ - bool disable_keepalive; /*!< Set `disable_keepalive=true` to turn off keep-alive mechanism, keepalive is active + } authentication; /*!< Client authentication */ + } credentials; /*!< User credentials for broker */ + /** + * *MQTT* Session related configuration + */ + struct session_t { + /** + * Last Will and Testament message configuration. + */ + struct last_will_t { + const char *topic; /*!< LWT (Last Will and Testament) message topic */ + const char *msg; /*!< LWT message, may be NULL terminated*/ + int msg_len; /*!< LWT message length, if msg isn't NULL terminated must have the correct length */ + int qos; /*!< LWT message QoS */ + int retain; /*!< LWT retained message flag */ + } last_will; /*!< Last will configuration */ + bool disable_clean_session; /*!< *MQTT* clean session, default clean_session is true */ + int keepalive; /*!< *MQTT* keepalive, default is 120 seconds */ + bool disable_keepalive; /*!< Set `disable_keepalive=true` to turn off keep-alive mechanism, keepalive is active by default. Note: setting the config value `keepalive` to `0` doesn't disable keepalive feature, but uses a default keepalive period */ - esp_mqtt_protocol_ver_t protocol_ver; /*!< *MQTT* protocol version used for connection.*/ - int message_retransmit_timeout; /*!< timeout for retransmitting of failed packet */ - } session; /*!< *MQTT* session configuration. */ - /** - * Network related configuration - */ - struct network_t { - int reconnect_timeout_ms; /*!< Reconnect to the broker after this value in miliseconds if auto reconnect is not + esp_mqtt_protocol_ver_t protocol_ver; /*!< *MQTT* protocol version used for connection.*/ + int message_retransmit_timeout; /*!< timeout for retransmitting of failed packet */ + } session; /*!< *MQTT* session configuration. */ + /** + * Network related configuration + */ + struct network_t { + int reconnect_timeout_ms; /*!< Reconnect to the broker after this value in miliseconds if auto reconnect is not disabled (defaults to 10s) */ - int timeout_ms; /*!< Abort network operation if it is not completed after this value, in milliseconds + int timeout_ms; /*!< Abort network operation if it is not completed after this value, in milliseconds (defaults to 10s). */ - int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */ - bool disable_auto_reconnect; /*!< Client will reconnect to server (when errors/disconnect). Set + int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */ + bool disable_auto_reconnect; /*!< Client will reconnect to server (when errors/disconnect). Set `disable_auto_reconnect=true` to disable */ - } network; /*!< Network configuration */ - /** - * Client task configuration - */ - struct task_t { - int priority; /*!< *MQTT* task priority*/ - int stack_size; /*!< *MQTT* task stack size*/ - } task; /*!< FreeRTOS task configuration.*/ - /** - * Client buffer size configuration - * - * Client have two buffers for input and output respectivelly. - */ - struct buffer_t { - int size; /*!< size of *MQTT* send/receive buffer*/ - int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by + } network; /*!< Network configuration */ + /** + * Client task configuration + */ + struct task_t { + int priority; /*!< *MQTT* task priority*/ + int stack_size; /*!< *MQTT* task stack size*/ + } task; /*!< FreeRTOS task configuration.*/ + /** + * Client buffer size configuration + * + * Client have two buffers for input and output respectivelly. + */ + struct buffer_t { + int size; /*!< size of *MQTT* send/receive buffer*/ + int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by ``buffer_size`` */ - } buffer; /*!< Buffer size configuration.*/ + } buffer; /*!< Buffer size configuration.*/ } esp_mqtt_client_config_t; +/** + * Topic definition struct + */ +typedef struct topic_t { + const char *filter; /*!< Topic filter to subscribe */ + int qos; /*!< Max QoS level of the subscription */ +} esp_mqtt_topic_t; + /** * @brief Creates *MQTT* client handle based on the configuration * @@ -417,6 +425,26 @@ esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client); */ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client); + +/** + * @brief Convenience macro to select subscribe function to use. + * + * Notes: + * - Usage of `esp_mqtt_client_subscribe_single` is the same as previous + * esp_mqtt_client_subscribe, refer to it for details. + * + * @param client_handle *MQTT* client handle + * @param topic_type Needs to be char* for single subscription or `esp_mqtt_topic_t` for multiple topics + * @param qos_or_size It's either a qos when subscribing to a single topic or the size of the subscription array when subscribing to multiple topics. + * + * @return message_id of the subscribe message on success + * -1 on failure + */ +#define esp_mqtt_client_subscribe(client_handle, topic_type, qos_or_size) _Generic((topic_type), \ + char *: esp_mqtt_client_subscribe_single, \ + esp_mqtt_topic_t*: esp_mqtt_client_subscribe_multiple \ + )(client_handle, topic_type, qos_or_size) + /** * @brief Subscribe the client to defined topic with defined qos * @@ -426,23 +454,44 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client); * from a *MQTT* event callback i.e. internal *MQTT* task * (API is protected by internal mutex, so it might block * if a longer data receive operation is in progress. + * - `esp_mqtt_client_subscribe` could be used to call this function. * * @param client *MQTT* client handle - * @param topic - * @param qos // TODO describe parameters + * @param topic topic filter to subscribe + * @param qos Max qos level of the subscription * * @return message_id of the subscribe message on success * -1 on failure */ -int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, - const char *topic, int qos); +int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client, + const char *topic, int qos); +/** + * @brief Subscribe the client to a list of defined topics with defined qos + * + * Notes: + * - Client must be connected to send subscribe message + * - This API is could be executed from a user task or + * from a *MQTT* event callback i.e. internal *MQTT* task + * (API is protected by internal mutex, so it might block + * if a longer data receive operation is in progress. + * - `esp_mqtt_client_subscribe` could be used to call this function. + * + * @param client *MQTT* client handle + * @param topic_list List of topics to subscribe + * @param size size of topic_list + * + * @return message_id of the subscribe message on success + * -1 on failure + */ +int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, + const esp_mqtt_topic_t *topic_list, int size); /** * @brief Unsubscribe the client from defined topic * * Notes: * - Client must be connected to send unsubscribe message - * - It is thread safe, please refer to `esp_mqtt_client_subscribe` for details + * - It is thread safe, please refer to `esp_mqtt_client_subscribe_single` for details * * @param client *MQTT* client handle * @param topic diff --git a/lib/include/mqtt5_msg.h b/lib/include/mqtt5_msg.h index c66c2314..6c2a036c 100644 --- a/lib/include/mqtt5_msg.h +++ b/lib/include/mqtt5_msg.h @@ -126,7 +126,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id, const esp_mqtt5_publish_property_config_t *property, const char *resp_info); esp_err_t mqtt5_msg_parse_connack_property(uint8_t *buffer, size_t buffer_len, mqtt_connect_info_t *connection_info, esp_mqtt5_connection_property_storage_t *connection_property, esp_mqtt5_connection_server_resp_property_t *resp_property, int *reason_code, uint8_t *ack_flag, mqtt5_user_property_handle_t *user_property); int mqtt5_msg_get_reason_code(uint8_t *buffer, size_t length); -mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property); +mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t *topic, int size, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property); mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id, const esp_mqtt5_unsubscribe_property_config_t *property); mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_disconnect_property_config_t *disconnect_property_info); mqtt_message_t *mqtt5_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id); diff --git a/lib/include/mqtt_msg.h b/lib/include/mqtt_msg.h index 52f0e9e6..60fcf8c4 100644 --- a/lib/include/mqtt_msg.h +++ b/lib/include/mqtt_msg.h @@ -138,7 +138,7 @@ mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_ mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id); mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id); mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id); -mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id); +mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id) __attribute__((nonnull)); mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id); mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection); mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection); diff --git a/lib/mqtt5_msg.c b/lib/mqtt5_msg.c index 7af30ff9..16a479b2 100644 --- a/lib/mqtt5_msg.c +++ b/lib/mqtt5_msg.c @@ -1,5 +1,6 @@ #include #include "mqtt5_msg.h" +#include "mqtt_client.h" #include "mqtt_config.h" #include "platform.h" #include "esp_log.h" @@ -764,7 +765,7 @@ mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *top } snprintf(response_topic, response_topic_size, "%s/%s", property->response_topic, resp_info); if (append_property(connection, MQTT5_PROPERTY_RESPONSE_TOPIC, 2, response_topic, response_topic_size) == -1) { - ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__); + ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__); free(response_topic); return fail_message(connection); } @@ -849,14 +850,10 @@ int mqtt5_msg_get_reason_code(uint8_t *buffer, size_t length) return -1; } -mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property) +mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t *topic_list, int size, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property) { init_message(connection); - if (topic == NULL || topic[0] == '\0') { - return fail_message(connection); - } - if ((*message_id = append_message_id(connection, 0)) == 0) { return fail_message(connection); } @@ -877,41 +874,47 @@ mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *t } } APPEND_CHECK(update_property_len_value(connection, connection->message.length - properties_offset - 1, properties_offset), fail_message(connection)); - if (property && property->is_share_subscribe) { - uint16_t shared_topic_size = strlen(topic) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name); - char *shared_topic = calloc(1, shared_topic_size); - if (!shared_topic) { - ESP_LOGE(TAG, "Failed to calloc %d memory", shared_topic_size); - fail_message(connection); + + for (int topic_number = 0; topic_number < size; ++topic_number) { + if (topic_list[topic_number].filter[0] == '\0') { + return fail_message(connection); } - snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic); - if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) { - ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__); + if (property && property->is_share_subscribe) { + uint16_t shared_topic_size = strlen(topic_list[topic_number].filter) + strlen(MQTT5_SHARED_SUB) + strlen(property->share_name); + char *shared_topic = calloc(1, shared_topic_size); + if (!shared_topic) { + ESP_LOGE(TAG, "Failed to calloc %d memory", shared_topic_size); + fail_message(connection); + } + snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic_list[topic_number].filter); + if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) { + ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__); + free(shared_topic); + return fail_message(connection); + } free(shared_topic); - return fail_message(connection); + } else { + APPEND_CHECK(append_property(connection, 0, 2, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)), fail_message(connection)); } - free(shared_topic); - } else { - APPEND_CHECK(append_property(connection, 0, 2, topic, strlen(topic)), fail_message(connection)); - } - if (connection->message.length + 1 > connection->buffer_length) { - return fail_message(connection); - } - connection->buffer[connection->message.length] = 0; - if (property) { - if (property->retain_handle > 0 && property->retain_handle < 3) { - connection->buffer[connection->message.length] |= (property->retain_handle & 3) << 4; - } - if (property->no_local_flag) { - connection->buffer[connection->message.length] |= (property->no_local_flag << 2); + if (connection->message.length + 1 > connection->buffer_length) { + return fail_message(connection); } - if (property->retain_as_published_flag) { - connection->buffer[connection->message.length] |= (property->retain_as_published_flag << 3); + connection->buffer[connection->message.length] = 0; + if (property) { + if (property->retain_handle > 0 && property->retain_handle < 3) { + connection->buffer[connection->message.length] |= (property->retain_handle & 3) << 4; + } + if (property->no_local_flag) { + connection->buffer[connection->message.length] |= (property->no_local_flag << 2); + } + if (property->retain_as_published_flag) { + connection->buffer[connection->message.length] |= (property->retain_as_published_flag << 3); + } } + connection->buffer[connection->message.length] |= (topic_list[topic_number].qos & 3); + connection->message.length ++; } - connection->buffer[connection->message.length] |= (qos & 3); - connection->message.length ++; return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); } @@ -975,7 +978,7 @@ mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char } snprintf(shared_topic, shared_topic_size, MQTT5_SHARED_SUB, property->share_name, topic); if (append_property(connection, 0, 2, shared_topic, strlen(shared_topic)) == -1) { - ESP_LOGE(TAG,"%s(%d) fail",__FUNCTION__, __LINE__); + ESP_LOGE(TAG, "%s(%d) fail", __FUNCTION__, __LINE__); free(shared_topic); return fail_message(connection); } diff --git a/lib/mqtt_msg.c b/lib/mqtt_msg.c index 31a5fb09..8b78ae38 100644 --- a/lib/mqtt_msg.c +++ b/lib/mqtt_msg.c @@ -29,6 +29,7 @@ * */ #include +#include "mqtt_client.h" #include "mqtt_msg.h" #include "mqtt_config.h" #include "platform.h" @@ -518,26 +519,29 @@ mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0); } -mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id) +mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id) { init_message(connection); - if (topic == NULL || topic[0] == '\0') { - return fail_message(connection); - } - if ((*message_id = append_message_id(connection, 0)) == 0) { return fail_message(connection); } - if (append_string(connection, topic, strlen(topic)) < 0) { - return fail_message(connection); - } + for (int topic_number = 0; topic_number < size; ++topic_number) { + if (topic_list[topic_number].filter[0] == '\0') { + return fail_message(connection); + } - if (connection->message.length + 1 > connection->buffer_length) { - return fail_message(connection); + if (append_string(connection, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)) < 0) { + return fail_message(connection); + } + + if (connection->message.length + 1 > connection->buffer_length) { + return fail_message(connection); + } + connection->buffer[connection->message.length] = topic_list[topic_number].qos; + connection->message.length ++; } - connection->buffer[connection->message.length++] = qos; return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); } diff --git a/mqtt_client.c b/mqtt_client.c index 9a041b26..2a502544 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -1101,8 +1101,11 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client) client->event.error_handle->error_type = MQTT_ERROR_TYPE_NONE; client->event.error_handle->connect_return_code = MQTT_CONNECTION_ACCEPTED; // post data event - if ((uint8_t)*msg_data == 0x80) { - client->event.error_handle->error_type = MQTT_ERROR_TYPE_SUBSCRIBE_FAILED; + for (int topic = 0; topic < msg_data_len; ++topic) { + if ((uint8_t)msg_data[topic] == 0x80) { + client->event.error_handle->error_type = MQTT_ERROR_TYPE_SUBSCRIBE_FAILED; + break; + } } client->event.data_len = msg_data_len; client->event.total_data_len = msg_data_len; @@ -1114,7 +1117,9 @@ static esp_err_t deliver_suback(esp_mqtt_client_handle_t client) return ESP_OK; } -static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id) +// Deletes the initial message in MQTT communication protocol +// Return false when message is not found, making the received counterpart invalid. +static bool remove_initiator_message(esp_mqtt_client_handle_t client, int msg_type, int msg_id) { ESP_LOGD(TAG, "pending_id=%d, pending_msg_count = %d", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_count); if (client->mqtt_state.pending_msg_count == 0) { @@ -1311,7 +1316,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) switch (msg_type) { case MQTT_MSG_TYPE_SUBACK: - if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) { + if (remove_initiator_message(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) { #ifdef MQTT_PROTOCOL_5 esp_mqtt5_parse_suback(client); #endif @@ -1323,7 +1328,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) } break; case MQTT_MSG_TYPE_UNSUBACK: - if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) { + if (remove_initiator_message(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) { #ifdef MQTT_PROTOCOL_5 esp_mqtt5_parse_unsuback(client); #endif @@ -1375,7 +1380,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) esp_mqtt5_decrement_packet_counter(client); } #endif - if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { + if (remove_initiator_message(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish"); #ifdef MQTT_PROTOCOL_5 esp_mqtt5_parse_puback(client); @@ -1426,7 +1431,7 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) esp_mqtt5_decrement_packet_counter(client); } #endif - if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { + if (remove_initiator_message(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); #ifdef MQTT_PROTOCOL_5 esp_mqtt5_parse_pubcomp(client); @@ -1820,7 +1825,8 @@ static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client) return ESP_OK; } -int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos) +int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client, + const esp_mqtt_topic_t *topic_list, int size) { if (!client) { ESP_LOGE(TAG, "Client was not initialized"); @@ -1834,13 +1840,19 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic } if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 - if (esp_mqtt5_client_subscribe_check(client, qos) != ESP_OK) { - ESP_LOGI(TAG, "MQTT5 subscribe check fail"); + int max_qos = topic_list[0].qos; + for (int topic_number = 0; topic_number < size; ++topic_number) { + if (topic_list[topic_number].qos > max_qos) { + max_qos = topic_list[topic_number].qos; + } + } + if (esp_mqtt5_client_subscribe_check(client, max_qos) != ESP_OK) { + ESP_LOGI(TAG, "MQTT5 subscribe check fail: QoS %d not accepted by broker ", max_qos); MQTT_API_UNLOCK(client); return -1; } client->mqtt_state.outbound_message = mqtt5_msg_subscribe(&client->mqtt_state.mqtt_connection, - topic, qos, + topic_list, size, &client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info); if (client->mqtt_state.outbound_message->length) { client->mqtt5_config->subscribe_property_info = NULL; @@ -1848,7 +1860,7 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic #endif } else { client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, - topic, qos, + topic_list, size, &client->mqtt_state.pending_msg_id); } if (client->mqtt_state.outbound_message->length == 0) { @@ -1867,14 +1879,20 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);// handle error if (mqtt_write_data(client) != ESP_OK) { - ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos); + ESP_LOGE(TAG, "Error to send subscribe message, first topic: %s, qos: %d", topic_list[0].filter, topic_list[0].qos); MQTT_API_UNLOCK(client); return -1; } - ESP_LOGD(TAG, "Sent subscribe topic=%s, id: %d, type=%d successful", topic, client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); + ESP_LOGD(TAG, "Sent subscribe, first topic=%s, id: %d", topic_list[0].filter, client->mqtt_state.pending_msg_id); MQTT_API_UNLOCK(client); return client->mqtt_state.pending_msg_id; + +} +int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client, const char *topic, int qos) +{ + esp_mqtt_topic_t user_topic = {.filter = topic, .qos = qos}; + return esp_mqtt_client_subscribe_multiple(client, &user_topic, 1); } int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic)