diff --git a/lib/include/mqtt5_client_priv.h b/lib/include/mqtt5_client_priv.h index e2334295..2337e064 100644 --- a/lib/include/mqtt5_client_priv.h +++ b/lib/include/mqtt5_client_priv.h @@ -36,7 +36,8 @@ typedef struct { mqtt5_topic_alias_handle_t peer_topic_alias; } mqtt5_config_storage_t; -void esp_mqtt5_flow_control(esp_mqtt5_client_handle_t client); +void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client); +void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client); void esp_mqtt5_parse_pubcomp(esp_mqtt5_client_handle_t client); void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client); void esp_mqtt5_parse_unsuback(esp_mqtt5_client_handle_t client); diff --git a/mqtt5_client.c b/mqtt5_client.c index be3a2eff..b9d0f994 100644 --- a/mqtt5_client.c +++ b/mqtt5_client.c @@ -16,17 +16,21 @@ static char *esp_mqtt5_client_get_topic_alias(mqtt5_topic_alias_handle_t topic_a static void esp_mqtt5_client_delete_topic_alias(mqtt5_topic_alias_handle_t topic_alias_handle); static esp_err_t esp_mqtt5_user_property_copy(mqtt5_user_property_handle_t user_property_new, const mqtt5_user_property_handle_t user_property_old); -void esp_mqtt5_flow_control(esp_mqtt5_client_handle_t client) +void esp_mqtt5_increment_packet_counter(esp_mqtt5_client_handle_t client) { - if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { - int msg_type = mqtt5_get_type(client->mqtt_state.outbound_message->data); - if (msg_type == MQTT_MSG_TYPE_PUBLISH) { - int msg_qos = mqtt5_get_qos(client->mqtt_state.outbound_message->data); - if (msg_qos > 0) { - client->send_publish_packet_count ++; - ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count); - } - } + bool msg_dup = mqtt5_get_dup(client->mqtt_state.outbound_message->data); + int msg_qos = mqtt5_get_qos(client->mqtt_state.outbound_message->data); + if ((msg_dup == false) && (msg_qos > 0)) { + client->send_publish_packet_count ++; + ESP_LOGD(TAG, "Sent (%d) qos > 0 publish packet without ack", client->send_publish_packet_count); + } +} + +void esp_mqtt5_decrement_packet_counter(esp_mqtt5_client_handle_t client) +{ + if (client->send_publish_packet_count > 0) { + client->send_publish_packet_count --; + ESP_LOGD(TAG, "Receive (%d) qos > 0 publish packet with ack", client->send_publish_packet_count); } } @@ -51,7 +55,6 @@ void esp_mqtt5_parse_puback(esp_mqtt5_client_handle_t client) client->event.data_len = msg_data_len; client->event.total_data_len = msg_data_len; client->event.current_data_offset = 0; - client->send_publish_packet_count --; } } @@ -291,7 +294,7 @@ esp_err_t esp_mqtt5_client_publish_check(esp_mqtt5_client_handle_t client, int q } /* Flow control to check PUBLISH(No PUBACK or PUBCOMP received) packet sent count(Only record QoS1 and QoS2)*/ - if (client->send_publish_packet_count >= client->mqtt5_config->server_resp_property_info.receive_maximum) { + if (client->send_publish_packet_count > client->mqtt5_config->server_resp_property_info.receive_maximum) { ESP_LOGE(TAG, "Client send more than %d QoS1 and QoS2 PUBLISH packet without no ack", client->mqtt5_config->server_resp_property_info.receive_maximum); return ESP_FAIL; } diff --git a/mqtt_client.c b/mqtt_client.c index 6a755de1..b224a006 100644 --- a/mqtt_client.c +++ b/mqtt_client.c @@ -709,6 +709,7 @@ static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_m if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { #ifdef MQTT_PROTOCOL_5 if (esp_mqtt5_parse_connack(client, &connect_rsp_code) == ESP_OK) { + client->send_publish_packet_count = 0; return ESP_OK; } #endif @@ -943,7 +944,9 @@ static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client) return ESP_FAIL; } #ifdef MQTT_PROTOCOL_5 - esp_mqtt5_flow_control(client); + if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + esp_mqtt5_increment_packet_counter(client); + } #endif return ESP_OK; } @@ -1367,6 +1370,11 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) } break; case MQTT_MSG_TYPE_PUBACK: +#ifdef MQTT_PROTOCOL_5 + if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + esp_mqtt5_decrement_packet_counter(client); + } +#endif if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish"); #ifdef MQTT_PROTOCOL_5 @@ -1413,6 +1421,11 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) break; case MQTT_MSG_TYPE_PUBCOMP: ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP"); +#ifdef MQTT_PROTOCOL_5 + if (client->connect_info.protocol_ver == MQTT_PROTOCOL_V_5) { + esp_mqtt5_decrement_packet_counter(client); + } +#endif if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); #ifdef MQTT_PROTOCOL_5