Skip to content

Commit

Permalink
[longhaul test] Filter out device IDs from event hub notifications (#…
Browse files Browse the repository at this point in the history
…1858)

* longhaul debug trace

* added more trace logs

* Filter AMQP message based on the device ID

* remove longhaul log noise
  • Loading branch information
ericwolz authored Feb 14, 2021
1 parent c23ff3f commit f64bb51
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,7 @@ static int on_message_received(void* context, const char* data, size_t size)
}
else
{
LogError("on_message_received() failed (tests_id:%s, iotHubLonghaul->test_id:%s)", tests_id, iotHubLonghaul->test_id);
LogError("on_message_received() failed (deviceId:%s)", iotHubLonghaul->deviceInfo->deviceId);
result = MU_FAILURE; // This is not the message we expected. Abandoning it.
}
}
Expand Down Expand Up @@ -1335,7 +1335,7 @@ static int send_c2d(const void* context)
IOTHUB_MESSAGING_RESULT iotHubMessagingResult = IoTHubMessaging_SendAsync(iotHubLonghaul->iotHubSvcMsgHandle, iotHubLonghaul->deviceInfo->deviceId, message, on_c2d_message_sent, send_context);
if (iotHubMessagingResult == IOTHUB_MESSAGING_ERROR)
{
LogError("Failed sending c2d message with error IOTHUB_MESSAGING_ERROR calling IoTHubMessaging_Open");
LogInfo("Failed sending c2d message with error IOTHUB_MESSAGING_ERROR calling IoTHubMessaging_Open");
result = MU_FAILURE;
// close the current service handle
IoTHubMessaging_Close(iotHubLonghaul->iotHubSvcMsgHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1197,8 +1197,6 @@ int iothub_client_statistics_add_device_twin_desired_info(IOTHUB_CLIENT_STATISTI
DEVICE_TWIN_DESIRED_INFO* queued_info;
LIST_ITEM_HANDLE list_item = singlylinkedlist_find(stats->twin_desired_properties, find_device_twin_info_by_id, info);

LogInfo("type=%s, id=%lu)", MU_ENUM_TO_STRING(DEVICE_TWIN_EVENT_TYPE, type), (unsigned long)info->update_id);

if (list_item == NULL)
{
if (type != DEVICE_TWIN_UPDATE_SENT)
Expand Down
52 changes: 50 additions & 2 deletions testtools/iothub_test/src/iothubtest.c
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,35 @@ static char* CreateSendAuthCid(IOTHUB_VALIDATION_INFO* devhubValInfo)
return result;
}

static AMQP_VALUE GetMessageDeviceId(MESSAGE_HANDLE uamqp_message)
{
AMQP_VALUE device_id = NULL;
AMQP_VALUE uamqp_message_annotations = NULL;
if (message_get_message_annotations(uamqp_message, &uamqp_message_annotations) != 0)
{
LogError("Failed reading the incoming uAMQP message annotations.");
}
else
{
if (uamqp_message_annotations == NULL)
{
LogError("No AMQP message annotations");
}
else
{
AMQP_VALUE property_key = amqpvalue_create_symbol("iothub-connection-device-id");
if (property_key != NULL)
{
device_id = amqpvalue_get_map_value(uamqp_message_annotations, property_key);
amqpvalue_destroy(property_key);
}
amqpvalue_destroy(uamqp_message_annotations);
}
}

return device_id;
}

static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE message)
{
MESSAGE_RECEIVER_CONTEXT* msg_received_context = (MESSAGE_RECEIVER_CONTEXT*)context;
Expand Down Expand Up @@ -654,14 +683,33 @@ static AMQP_VALUE on_message_received_new(const void* context, MESSAGE_HANDLE me
}
else
{
if (devhubValInfo->onMessageReceivedCallback(devhubValInfo->onMessageReceivedContext, (const char*)binary_data.bytes, binary_data.length) == 0)
const char* deviceid = NULL;
AMQP_VALUE device_id = GetMessageDeviceId(message);
if (device_id != NULL)
{
result = messaging_delivery_accepted();
amqpvalue_get_string(device_id, &deviceid);
}

if (deviceid == NULL || strcmp(devhubValInfo->deviceId, deviceid) == 0)
{
if (devhubValInfo->onMessageReceivedCallback(devhubValInfo->onMessageReceivedContext, (const char*)binary_data.bytes, binary_data.length) == 0)
{
result = messaging_delivery_accepted();
}
else
{
result = messaging_delivery_released();
}
}
else
{
result = messaging_delivery_released();
}

if (device_id != NULL)
{
amqpvalue_destroy(device_id);
}
}
}

Expand Down

0 comments on commit f64bb51

Please sign in to comment.