Skip to content

Commit

Permalink
Mqtt twin timeout cleanup (#1234)
Browse files Browse the repository at this point in the history
* Check correct timeout time

* Fix MQTT timeout issues

Fix regression introduced on overly aggressive MQTT timeouts.

To fix and cleanup code a bit:
* Remove concept of enqueue time and instead have a message creation time.
* Set this message creation time ALWAYS, regardless of whether it's a PUT or GET.
* Check this publish time on timeouts.
* Create a common function to create TWIN requests and cleanup some dup'd code.

On test side:
* Fix up UT's required of course
* Change the default timer that UT uses to start at 30 minutes, not 0 minutes.
  If we had done this it would've caught initial bug where a message creation time was never init'd but close enough to 0 to have tests pass.
  • Loading branch information
jspaith authored Oct 11, 2019
1 parent 272bf28 commit eec5e3a
Show file tree
Hide file tree
Showing 2 changed files with 298 additions and 195 deletions.
116 changes: 51 additions & 65 deletions iothub_client/src/iothubtransport_mqtt_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#define MAX_DISCONNECT_VALUE 50

#define ON_DEMAND_GET_TWIN_REQUEST_TIMEOUT_SECS 60
#define TWIN_REPORT_UPDATE_TIMEOUT_SECS (60*5)

static const char TOPIC_DEVICE_TWIN_PREFIX[] = "$iothub/twin";
static const char TOPIC_DEVICE_METHOD_PREFIX[] = "$iothub/methods";
Expand Down Expand Up @@ -241,10 +242,9 @@ typedef struct MQTTTRANSPORT_HANDLE_DATA_TAG

typedef struct MQTT_DEVICE_TWIN_ITEM_TAG
{
tickcounter_ms_t msgEnqueueTime;
tickcounter_ms_t msgCreationTime;
tickcounter_ms_t msgPublishTime;
size_t retryCount;
IOTHUB_IDENTITY_TYPE iothub_type;
uint16_t packet_id;
uint32_t iothub_msg_id;
IOTHUB_DEVICE_TWIN* device_twin_data;
Expand Down Expand Up @@ -948,26 +948,28 @@ static void destroy_device_twin_get_message(MQTT_DEVICE_TWIN_ITEM* msg_entry)
free(msg_entry);
}

static MQTT_DEVICE_TWIN_ITEM* create_device_twin_get_message(MQTTTRANSPORT_HANDLE_DATA* transport_data)
static MQTT_DEVICE_TWIN_ITEM* create_device_twin_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, DEVICE_TWIN_MSG_TYPE device_twin_msg_type, uint32_t iothub_msg_id)
{
MQTT_DEVICE_TWIN_ITEM* result;
tickcounter_ms_t current_time;

if ((result = (MQTT_DEVICE_TWIN_ITEM*)malloc(sizeof(MQTT_DEVICE_TWIN_ITEM))) == NULL)
if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_time) != 0)
{
LogError("Failed retrieving tickcounter info");
result = NULL;
}
else if ((result = (MQTT_DEVICE_TWIN_ITEM*)malloc(sizeof(MQTT_DEVICE_TWIN_ITEM))) == NULL)
{
LogError("Failed allocating device twin data.");
result = NULL;
}
else
{
memset(result, 0, sizeof(*result));
result->msgCreationTime = current_time;
result->packet_id = get_next_packet_id(transport_data);
result->iothub_msg_id = 0;
result->device_twin_msg_type = RETRIEVE_PROPERTIES;
result->retryCount = 0;
result->msgPublishTime = 0;
result->msgEnqueueTime = 0;
result->iothub_type = IOTHUB_TYPE_DEVICE_TWIN;
result->device_twin_data = NULL;
result->userCallback = NULL;
result->userContext = NULL;
result->iothub_msg_id = iothub_msg_id;
result->device_twin_msg_type = device_twin_msg_type;
}

return result;
Expand Down Expand Up @@ -1036,67 +1038,59 @@ static void sendPendingGetTwinRequests(PMQTTTRANSPORT_HANDLE_DATA transportData)
}
}

static void removeExpiredPendingGetTwinRequests(PMQTTTRANSPORT_HANDLE_DATA transport_data)
{
tickcounter_ms_t current_ms;

if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_ms) == 0)
static void removeExpiredTwinRequestsFromList(PMQTTTRANSPORT_HANDLE_DATA transport_data, tickcounter_ms_t current_ms, DLIST_ENTRY* twin_list)
{
PDLIST_ENTRY list_item = twin_list->Flink;

while (list_item != twin_list)
{
PDLIST_ENTRY listItem = transport_data->pending_get_twin_queue.Flink;
DLIST_ENTRY next_list_item;
next_list_item.Flink = list_item->Flink;
MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(list_item, MQTT_DEVICE_TWIN_ITEM, entry);
bool item_timed_out = false;

while (listItem != &transport_data->pending_get_twin_queue)
if ((msg_entry->device_twin_msg_type == RETRIEVE_PROPERTIES) &&
(((current_ms - msg_entry->msgCreationTime) / 1000) >= ON_DEMAND_GET_TWIN_REQUEST_TIMEOUT_SECS))
{
DLIST_ENTRY nextListItem;
nextListItem.Flink = listItem->Flink;
MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(listItem, MQTT_DEVICE_TWIN_ITEM, entry);

if (((current_ms - msg_entry->msgEnqueueTime) / 1000) >= ON_DEMAND_GET_TWIN_REQUEST_TIMEOUT_SECS)
item_timed_out = true;
if (msg_entry->userCallback != NULL)
{
(void)DList_RemoveEntryList(listItem);
msg_entry->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, NULL, 0, msg_entry->userContext);
destroy_device_twin_get_message(msg_entry);
}
}
else if ((msg_entry->device_twin_msg_type == REPORTED_STATE) &&
(((current_ms - msg_entry->msgCreationTime) / 1000) >= TWIN_REPORT_UPDATE_TIMEOUT_SECS))
{
item_timed_out = true;
transport_data->transport_callbacks.twin_rpt_state_complete_cb(msg_entry->iothub_msg_id, STATUS_CODE_TIMEOUT_VALUE, transport_data->transport_ctx);
}

listItem = nextListItem.Flink;
if (item_timed_out)
{
(void)DList_RemoveEntryList(list_item);
destroy_device_twin_get_message(msg_entry);
}

list_item = next_list_item.Flink;
}

}

static void removeExpiredGetTwinRequestsPendingAck(PMQTTTRANSPORT_HANDLE_DATA transport_data)
static void removeExpiredTwinRequests(PMQTTTRANSPORT_HANDLE_DATA transport_data)
{
tickcounter_ms_t current_ms;

if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_ms) == 0)
{
PDLIST_ENTRY listItem = transport_data->ack_waiting_queue.Flink;

while (listItem != &transport_data->ack_waiting_queue)
{
DLIST_ENTRY nextListItem;
nextListItem.Flink = listItem->Flink;
MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(listItem, MQTT_DEVICE_TWIN_ITEM, entry);

// Check if it is a on-demand get-twin request.
if (msg_entry->device_twin_msg_type == RETRIEVE_PROPERTIES && msg_entry->userCallback != NULL)
{
if (((current_ms - msg_entry->msgEnqueueTime) / 1000) >= ON_DEMAND_GET_TWIN_REQUEST_TIMEOUT_SECS)
{
(void)DList_RemoveEntryList(listItem);
msg_entry->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, NULL, 0, msg_entry->userContext);
destroy_device_twin_get_message(msg_entry);
}
}

listItem = nextListItem.Flink;
}
removeExpiredTwinRequestsFromList(transport_data, current_ms, &transport_data->pending_get_twin_queue);
removeExpiredTwinRequestsFromList(transport_data, current_ms, &transport_data->ack_waiting_queue);
}
}

static int publish_device_twin_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, IOTHUB_DEVICE_TWIN* device_twin_info, MQTT_DEVICE_TWIN_ITEM* mqtt_info)
{
int result;
mqtt_info->packet_id = get_next_packet_id(transport_data);
mqtt_info->device_twin_msg_type = REPORTED_STATE;

STRING_HANDLE msgTopic = STRING_construct_sprintf(REPORTED_PROPERTIES_TOPIC, mqtt_info->packet_id);
if (msgTopic == NULL)
Expand Down Expand Up @@ -2816,13 +2810,13 @@ IOTHUB_CLIENT_RESULT IoTHubTransport_MQTT_Common_GetTwinAsync(IOTHUB_DEVICE_HAND
PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
MQTT_DEVICE_TWIN_ITEM* mqtt_info;

if ((mqtt_info = create_device_twin_get_message(transport_data)) == NULL)
if ((mqtt_info = create_device_twin_message(transport_data, RETRIEVE_PROPERTIES, 0)) == NULL)
{
LogError("Failed creating the device twin get request message");
// Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_003: [ If any failure occurs, IoTHubTransport_MQTT_Common_GetTwinAsync shall return IOTHUB_CLIENT_ERROR ]
result = IOTHUB_CLIENT_ERROR;
}
else if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqtt_info->msgEnqueueTime) != 0)
else if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqtt_info->msgCreationTime) != 0)
{
LogError("Failed setting the get twin request enqueue time");
destroy_device_twin_get_message(mqtt_info);
Expand Down Expand Up @@ -3043,7 +3037,7 @@ IOTHUB_PROCESS_ITEM_RESULT IoTHubTransport_MQTT_Common_ProcessItem(TRANSPORT_LL_
// Ensure the reported property suback has been received
if (item_type == IOTHUB_TYPE_DEVICE_TWIN && transport_data->twin_resp_sub_recv)
{
MQTT_DEVICE_TWIN_ITEM* mqtt_info = (MQTT_DEVICE_TWIN_ITEM*)malloc(sizeof(MQTT_DEVICE_TWIN_ITEM));
MQTT_DEVICE_TWIN_ITEM* mqtt_info = create_device_twin_message(transport_data, REPORTED_STATE, iothub_item->device_twin->item_id);
if (mqtt_info == NULL)
{
/* Codes_SRS_IOTHUBCLIENT_LL_07_004: [ If any errors are encountered IoTHubTransport_MQTT_Common_ProcessItem shall return IOTHUB_PROCESS_ERROR. ]*/
Expand All @@ -3052,13 +3046,6 @@ IOTHUB_PROCESS_ITEM_RESULT IoTHubTransport_MQTT_Common_ProcessItem(TRANSPORT_LL_
else
{
/*Codes_SRS_IOTHUBCLIENT_LL_07_003: [ IoTHubTransport_MQTT_Common_ProcessItem shall publish a message to the mqtt protocol with the message topic for the message type.]*/
mqtt_info->iothub_type = item_type;
mqtt_info->iothub_msg_id = iothub_item->device_twin->item_id;
mqtt_info->retryCount = 0;
mqtt_info->userCallback = NULL;
mqtt_info->userContext = NULL;
mqtt_info->msgEnqueueTime = 0;

/* Codes_SRS_IOTHUBCLIENT_LL_07_005: [ If successful IoTHubTransport_MQTT_Common_ProcessItem shall add mqtt info structure acknowledgement queue. ] */
DList_InsertTailList(&transport_data->ack_waiting_queue, &mqtt_info->entry);

Expand Down Expand Up @@ -3117,7 +3104,7 @@ void IoTHubTransport_MQTT_Common_DoWork(TRANSPORT_LL_HANDLE handle)
/* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ IoTHubTransport_MQTT_Common_DoWork shall send a device twin get property message upon successfully retrieving a SUBACK on device twin topics. ] */
MQTT_DEVICE_TWIN_ITEM* mqtt_info;

if ((mqtt_info = create_device_twin_get_message(transport_data)) == NULL)
if ((mqtt_info = create_device_twin_message(transport_data, RETRIEVE_PROPERTIES, 0)) == NULL)
{
LogError("Failure: could not create message for twin get command");
}
Expand Down Expand Up @@ -3192,8 +3179,7 @@ void IoTHubTransport_MQTT_Common_DoWork(TRANSPORT_LL_HANDLE handle)

// Check the ack messages timeouts
process_queued_ack_messages(transport_data);
removeExpiredPendingGetTwinRequests(transport_data);
removeExpiredGetTwinRequestsPendingAck(transport_data);
removeExpiredTwinRequests(transport_data);
}
}

Expand Down Expand Up @@ -3647,4 +3633,4 @@ int IoTHubTransport_MQTT_GetSupportedPlatformInfo(TRANSPORT_LL_HANDLE handle, PL
}

return result;
}
}
Loading

0 comments on commit eec5e3a

Please sign in to comment.