Skip to content

Commit

Permalink
Merge branch 'feature/multiple_subscribe' into 'master'
Browse files Browse the repository at this point in the history
Feature:  Enable SUBSCRIBE to multiple topics

See merge request espressif/esp-mqtt!156
  • Loading branch information
euripedesrocha committed Feb 8, 2023
2 parents 5adbe11 + 3210255 commit 5a156f5
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 174 deletions.
273 changes: 161 additions & 112 deletions include/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ typedef struct esp_mqtt_client *esp_mqtt_client_handle_t;
* @brief *MQTT* event types.
*
* User event handler receives context data in `esp_mqtt_event_t` structure with
* - `client` - *MQTT* client handle
* - client - *MQTT* client handle
* - various other data depending on event type
*
*/
Expand Down Expand Up @@ -223,132 +223,140 @@ typedef esp_err_t (*mqtt_event_callback_t)(esp_mqtt_event_handle_t event);
* character and the related len field set to 0. DER format requires a related len field set to the correct length.
*/
typedef struct esp_mqtt_client_config_t {
/**
* Broker related configuration
*/
struct broker_t {
/**
* Broker address
*
* - uri have precedence over other fields
* - If uri isn't set at least hostname, transport and port should.
*/
struct address_t {
const char *uri; /*!< Complete *MQTT* broker URI */
const char *hostname; /*!< Hostname, to set ipv4 pass it as string) */
esp_mqtt_transport_t transport; /*!< Selects transport*/
const char *path; /*!< Path in the URI*/
uint32_t port; /*!< *MQTT* server port */
} address; /*!< Broker address configuration */
/**
* Broker identity verification
*
* If fields are not set broker's identity isn't verified. it's recommended
* to set the options in this struct for security reasons.
*/
struct verification_t {
bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls
/**
* Broker related configuration
*/
struct broker_t {
/**
* Broker address
*
* - uri have precedence over other fields
* - If uri isn't set at least hostname, transport and port should.
*/
struct address_t {
const char *uri; /*!< Complete *MQTT* broker URI */
const char *hostname; /*!< Hostname, to set ipv4 pass it as string) */
esp_mqtt_transport_t transport; /*!< Selects transport*/
const char *path; /*!< Path in the URI*/
uint32_t port; /*!< *MQTT* server port */
} address; /*!< Broker address configuration */
/**
* Broker identity verification
*
* If fields are not set broker's identity isn't verified. it's recommended
* to set the options in this struct for security reasons.
*/
struct verification_t {
bool use_global_ca_store; /*!< Use a global ca_store, look esp-tls
documentation for details. */
esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for
esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for
the usage of certificate bundles. */
const char *certificate; /*!< Certificate data, default is NULL, not required to verify the server. */
size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */
const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK
const char *certificate; /*!< Certificate data, default is NULL, not required to verify the server. */
size_t certificate_len; /*!< Length of the buffer pointed to by certificate. */
const struct psk_key_hint *psk_hint_key; /*!< Pointer to PSK struct defined in esp_tls.h to enable PSK
authentication (as alternative to certificate verification).
PSK is enabled only if there are no other ways to
verify broker.*/
bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the
bool skip_cert_common_name_check; /*!< Skip any validation of server certificate CN field, this reduces the
security of TLS and makes the *MQTT* client susceptible to MITM attacks */
const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN */
} verification; /*!< Security verification of the broker */
} broker; /*!< Broker address and security verification */
/**
* Client related credentials for authentication.
*/
struct credentials_t {
const char *username; /*!< *MQTT* username */
const char *client_id; /*!< Set *MQTT* client identifier. Ignored if set_null_client_id == true If NULL set
const char **alpn_protos; /*!< NULL-terminated list of supported application protocols to be used for ALPN */
} verification; /*!< Security verification of the broker */
} broker; /*!< Broker address and security verification */
/**
* Client related credentials for authentication.
*/
struct credentials_t {
const char *username; /*!< *MQTT* username */
const char *client_id; /*!< Set *MQTT* client identifier. Ignored if set_null_client_id == true If NULL set
the default client id. Default client id is ``ESP32_%CHIPID%`` where `%CHIPID%` are
last 3 bytes of MAC address in hex format */
bool set_null_client_id; /*!< Selects a NULL client id */
/**
* Client authentication
*
* Fields related to client authentication by broker
*
* For mutual authentication using TLS, user could select certificate and key,
* secure element or digital signature peripheral if available.
*
*/
struct authentication_t {
const char *password; /*!< *MQTT* password */
const char *certificate; /*!< Certificate for ssl mutual authentication, not required if mutual
bool set_null_client_id; /*!< Selects a NULL client id */
/**
* Client authentication
*
* Fields related to client authentication by broker
*
* For mutual authentication using TLS, user could select certificate and key,
* secure element or digital signature peripheral if available.
*
*/
struct authentication_t {
const char *password; /*!< *MQTT* password */
const char *certificate; /*!< Certificate for ssl mutual authentication, not required if mutual
authentication is not needed. Must be provided with `key`.*/
size_t certificate_len; /*!< Length of the buffer pointed to by certificate.*/
const char *key; /*!< Private key for SSL mutual authentication, not required if mutual authentication
size_t certificate_len; /*!< Length of the buffer pointed to by certificate.*/
const char *key; /*!< Private key for SSL mutual authentication, not required if mutual authentication
is not needed. If it is not NULL, also `certificate` has to be provided.*/
size_t key_len; /*!< Length of the buffer pointed to by key.*/
const char *key_password; /*!< Client key decryption password, not PEM nor DER, if provided
size_t key_len; /*!< Length of the buffer pointed to by key.*/
const char *key_password; /*!< Client key decryption password, not PEM nor DER, if provided
`key_password_len` must be correctly set. */
int key_password_len; /*!< Length of the password pointed to by `key_password` */
bool use_secure_element; /*!< Enable secure element, available in ESP32-ROOM-32SE, for SSL connection */
void *ds_data; /*!< Carrier of handle for digital signature parameters, digital signature peripheral is
int key_password_len; /*!< Length of the password pointed to by `key_password` */
bool use_secure_element; /*!< Enable secure element, available in ESP32-ROOM-32SE, for SSL connection */
void *ds_data; /*!< Carrier of handle for digital signature parameters, digital signature peripheral is
available in some Espressif devices. */
} authentication; /*!< Client authentication */
} credentials; /*!< User credentials for broker */
/**
* *MQTT* Session related configuration
*/
struct session_t {
/**
* Last Will and Testament message configuration.
*/
struct last_will_t {
const char *topic; /*!< LWT (Last Will and Testament) message topic */
const char *msg; /*!< LWT message, may be NULL terminated*/
int msg_len; /*!< LWT message length, if msg isn't NULL terminated must have the correct length */
int qos; /*!< LWT message QoS */
int retain; /*!< LWT retained message flag */
} last_will; /*!< Last will configuration */
bool disable_clean_session; /*!< *MQTT* clean session, default clean_session is true */
int keepalive; /*!< *MQTT* keepalive, default is 120 seconds */
bool disable_keepalive; /*!< Set `disable_keepalive=true` to turn off keep-alive mechanism, keepalive is active
} authentication; /*!< Client authentication */
} credentials; /*!< User credentials for broker */
/**
* *MQTT* Session related configuration
*/
struct session_t {
/**
* Last Will and Testament message configuration.
*/
struct last_will_t {
const char *topic; /*!< LWT (Last Will and Testament) message topic */
const char *msg; /*!< LWT message, may be NULL terminated*/
int msg_len; /*!< LWT message length, if msg isn't NULL terminated must have the correct length */
int qos; /*!< LWT message QoS */
int retain; /*!< LWT retained message flag */
} last_will; /*!< Last will configuration */
bool disable_clean_session; /*!< *MQTT* clean session, default clean_session is true */
int keepalive; /*!< *MQTT* keepalive, default is 120 seconds */
bool disable_keepalive; /*!< Set `disable_keepalive=true` to turn off keep-alive mechanism, keepalive is active
by default. Note: setting the config value `keepalive` to `0` doesn't disable
keepalive feature, but uses a default keepalive period */
esp_mqtt_protocol_ver_t protocol_ver; /*!< *MQTT* protocol version used for connection.*/
int message_retransmit_timeout; /*!< timeout for retransmitting of failed packet */
} session; /*!< *MQTT* session configuration. */
/**
* Network related configuration
*/
struct network_t {
int reconnect_timeout_ms; /*!< Reconnect to the broker after this value in miliseconds if auto reconnect is not
esp_mqtt_protocol_ver_t protocol_ver; /*!< *MQTT* protocol version used for connection.*/
int message_retransmit_timeout; /*!< timeout for retransmitting of failed packet */
} session; /*!< *MQTT* session configuration. */
/**
* Network related configuration
*/
struct network_t {
int reconnect_timeout_ms; /*!< Reconnect to the broker after this value in miliseconds if auto reconnect is not
disabled (defaults to 10s) */
int timeout_ms; /*!< Abort network operation if it is not completed after this value, in milliseconds
int timeout_ms; /*!< Abort network operation if it is not completed after this value, in milliseconds
(defaults to 10s). */
int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */
bool disable_auto_reconnect; /*!< Client will reconnect to server (when errors/disconnect). Set
int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */
bool disable_auto_reconnect; /*!< Client will reconnect to server (when errors/disconnect). Set
`disable_auto_reconnect=true` to disable */
} network; /*!< Network configuration */
/**
* Client task configuration
*/
struct task_t {
int priority; /*!< *MQTT* task priority*/
int stack_size; /*!< *MQTT* task stack size*/
} task; /*!< FreeRTOS task configuration.*/
/**
* Client buffer size configuration
*
* Client have two buffers for input and output respectivelly.
*/
struct buffer_t {
int size; /*!< size of *MQTT* send/receive buffer*/
int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by
} network; /*!< Network configuration */
/**
* Client task configuration
*/
struct task_t {
int priority; /*!< *MQTT* task priority*/
int stack_size; /*!< *MQTT* task stack size*/
} task; /*!< FreeRTOS task configuration.*/
/**
* Client buffer size configuration
*
* Client have two buffers for input and output respectivelly.
*/
struct buffer_t {
int size; /*!< size of *MQTT* send/receive buffer*/
int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by
``buffer_size`` */
} buffer; /*!< Buffer size configuration.*/
} buffer; /*!< Buffer size configuration.*/
} esp_mqtt_client_config_t;

/**
* Topic definition struct
*/
typedef struct topic_t {
const char *filter; /*!< Topic filter to subscribe */
int qos; /*!< Max QoS level of the subscription */
} esp_mqtt_topic_t;

/**
* @brief Creates *MQTT* client handle based on the configuration
*
Expand Down Expand Up @@ -417,6 +425,26 @@ esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client);
*/
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);


/**
* @brief Convenience macro to select subscribe function to use.
*
* Notes:
* - Usage of `esp_mqtt_client_subscribe_single` is the same as previous
* esp_mqtt_client_subscribe, refer to it for details.
*
* @param client_handle *MQTT* client handle
* @param topic_type Needs to be char* for single subscription or `esp_mqtt_topic_t` for multiple topics
* @param qos_or_size It's either a qos when subscribing to a single topic or the size of the subscription array when subscribing to multiple topics.
*
* @return message_id of the subscribe message on success
* -1 on failure
*/
#define esp_mqtt_client_subscribe(client_handle, topic_type, qos_or_size) _Generic((topic_type), \
char *: esp_mqtt_client_subscribe_single, \
esp_mqtt_topic_t*: esp_mqtt_client_subscribe_multiple \
)(client_handle, topic_type, qos_or_size)

/**
* @brief Subscribe the client to defined topic with defined qos
*
Expand All @@ -426,23 +454,44 @@ esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
* from a *MQTT* event callback i.e. internal *MQTT* task
* (API is protected by internal mutex, so it might block
* if a longer data receive operation is in progress.
* - `esp_mqtt_client_subscribe` could be used to call this function.
*
* @param client *MQTT* client handle
* @param topic
* @param qos // TODO describe parameters
* @param topic topic filter to subscribe
* @param qos Max qos level of the subscription
*
* @return message_id of the subscribe message on success
* -1 on failure
*/
int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client,
const char *topic, int qos);
int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client,
const char *topic, int qos);
/**
* @brief Subscribe the client to a list of defined topics with defined qos
*
* Notes:
* - Client must be connected to send subscribe message
* - This API is could be executed from a user task or
* from a *MQTT* event callback i.e. internal *MQTT* task
* (API is protected by internal mutex, so it might block
* if a longer data receive operation is in progress.
* - `esp_mqtt_client_subscribe` could be used to call this function.
*
* @param client *MQTT* client handle
* @param topic_list List of topics to subscribe
* @param size size of topic_list
*
* @return message_id of the subscribe message on success
* -1 on failure
*/
int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
const esp_mqtt_topic_t *topic_list, int size);

/**
* @brief Unsubscribe the client from defined topic
*
* Notes:
* - Client must be connected to send unsubscribe message
* - It is thread safe, please refer to `esp_mqtt_client_subscribe` for details
* - It is thread safe, please refer to `esp_mqtt_client_subscribe_single` for details
*
* @param client *MQTT* client handle
* @param topic
Expand Down
2 changes: 1 addition & 1 deletion lib/include/mqtt5_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ mqtt_message_t *mqtt5_msg_connect(mqtt_connection_t *connection, mqtt_connect_in
mqtt_message_t *mqtt5_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id, const esp_mqtt5_publish_property_config_t *property, const char *resp_info);
esp_err_t mqtt5_msg_parse_connack_property(uint8_t *buffer, size_t buffer_len, mqtt_connect_info_t *connection_info, esp_mqtt5_connection_property_storage_t *connection_property, esp_mqtt5_connection_server_resp_property_t *resp_property, int *reason_code, uint8_t *ack_flag, mqtt5_user_property_handle_t *user_property);
int mqtt5_msg_get_reason_code(uint8_t *buffer, size_t length);
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property);
mqtt_message_t *mqtt5_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t *topic, int size, uint16_t *message_id, const esp_mqtt5_subscribe_property_config_t *property);
mqtt_message_t *mqtt5_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id, const esp_mqtt5_unsubscribe_property_config_t *property);
mqtt_message_t *mqtt5_msg_disconnect(mqtt_connection_t *connection, esp_mqtt5_disconnect_property_config_t *disconnect_property_info);
mqtt_message_t *mqtt5_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id);
Expand Down
2 changes: 1 addition & 1 deletion lib/include/mqtt_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_
mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id);
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const char *topic, int qos, uint16_t *message_id);
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id) __attribute__((nonnull));
mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id);
mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection);
mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection);
Expand Down
Loading

0 comments on commit 5a156f5

Please sign in to comment.