diff --git a/iothub_client/src/iothubtransport_amqp_messenger.c b/iothub_client/src/iothubtransport_amqp_messenger.c index eec975d7ed..cae55c18c7 100644 --- a/iothub_client/src/iothubtransport_amqp_messenger.c +++ b/iothub_client/src/iothubtransport_amqp_messenger.c @@ -1343,6 +1343,9 @@ int amqp_messenger_stop(AMQP_MESSENGER_HANDLE messenger_handle) { // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_062: [`instance->state` shall be set to AMQP_MESSENGER_STATE_STOPPED, and `instance->on_state_changed_callback` invoked if provided] update_messenger_state(instance, AMQP_MESSENGER_STATE_STOPPED); + + instance->send_error_count = 0; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_063: [If no failures occur, amqp_messenger_stop() shall return 0] result = RESULT_OK; } diff --git a/iothub_client/tests/iothubtr_amqp_msgr_ut/iothubtr_amqp_msgr_ut.c b/iothub_client/tests/iothubtr_amqp_msgr_ut/iothubtr_amqp_msgr_ut.c index 98ea93c3e7..94088679c9 100644 --- a/iothub_client/tests/iothubtr_amqp_msgr_ut/iothubtr_amqp_msgr_ut.c +++ b/iothub_client/tests/iothubtr_amqp_msgr_ut/iothubtr_amqp_msgr_ut.c @@ -87,7 +87,8 @@ static const char* test_get_product_info(void* ctx) } #define DEFAULT_EVENT_SEND_RETRY_LIMIT 10 -#define DEFAULT_EVENT_SEND_TIMEOUT_SECS 600 +#define DEFAULT_EVENT_SEND_TIMEOUT_SECS 600 +#define DEFAULT_MAX_SEND_ERROR_COUNT 10 #define UNIQUE_ID_BUFFER_SIZE 37 #define TEST_UNIQUE_ID "A1234DE234A1234DE234A1234DE234A1234DEA1234DE234A1234DE234A1234DE234A1234DEA1234DE234A1234DE234A1234DE234A1234DE" @@ -296,7 +297,7 @@ static int TEST_messagereceiver_open(MESSAGE_RECEIVER_HANDLE message_receiver, O return TEST_messagereceiver_open_result; } -#define TEST_MESSAGE_QUEUE_ADD_BUFFERS_SIZE 10 +#define TEST_MESSAGE_QUEUE_ADD_BUFFERS_SIZE 20 static size_t TEST_message_queue_add_count; static MESSAGE_QUEUE_HANDLE TEST_message_queue_add_message_queue[TEST_MESSAGE_QUEUE_ADD_BUFFERS_SIZE]; static MQ_MESSAGE_HANDLE TEST_message_queue_add_message[TEST_MESSAGE_QUEUE_ADD_BUFFERS_SIZE]; @@ -2345,6 +2346,93 @@ TEST_FUNCTION(amqp_messenger_send_async_failure_checks) amqp_messenger_destroy(handle); } +TEST_FUNCTION(amqp_messenger_send_async_succeeds) +{ + // arrange + AMQP_MESSENGER_CONFIG* config = get_messenger_config(); + AMQP_MESSENGER_HANDLE handle = create_and_start_messenger2(config, false); + + umock_c_reset_all_calls(); + + set_expected_calls_for_amqp_messenger_send_async(); + + // act + int result = amqp_messenger_send_async(handle, TEST_MESSAGE_HANDLE, TEST_on_event_send_complete, TEST_IOTHUB_CLIENT_HANDLE); + + // assert + ASSERT_ARE_EQUAL(char_ptr, umock_c_get_expected_calls(), umock_c_get_actual_calls()); + ASSERT_ARE_EQUAL(int, 0, result); + + // cleanup + amqp_messenger_destroy(handle); +} + +TEST_FUNCTION(gh1386_amqp_messenger_stop_resets_send_error_count) +{ + // arrange + AMQP_MESSENGER_CONFIG* config = get_messenger_config(); + AMQP_MESSENGER_HANDLE handle = create_and_start_messenger2(config, false); + const char user_context[] = "abcdefghijklmnopqrstuvwxyz"; + + time_t current_time = time(NULL); + MESSENGER_DO_WORK_EXP_CALL_PROFILE* do_work_profile = get_msgr_do_work_exp_call_profile(AMQP_MESSENGER_STATE_STARTED, false, false, 1, 0, current_time, DEFAULT_EVENT_SEND_TIMEOUT_SECS); + crank_amqp_messenger_do_work(handle, do_work_profile); + + for (int i = 0; i <= DEFAULT_MAX_SEND_ERROR_COUNT; i++) + { + umock_c_reset_all_calls(); + set_expected_calls_for_amqp_messenger_send_async(); + + int result = amqp_messenger_send_async(handle, TEST_MESSAGE_HANDLE, TEST_on_event_send_complete, (void*)&user_context[i]); + + ASSERT_ARE_EQUAL(int, 0, result); + } + + // act + for (int i = 0; i < TEST_message_queue_add_count; i++) + { + TEST_message_queue_add_on_message_processing_completed_callback[i] + ( + TEST_message_queue_add_message[i], + MESSAGE_QUEUE_TIMEOUT, + NULL, + TEST_message_queue_add_user_context[i] + ); + + ASSERT_ARE_EQUAL(char_ptr, &user_context[i], TEST_on_event_send_complete_context); + ASSERT_ARE_EQUAL(int, AMQP_MESSENGER_SEND_RESULT_ERROR, TEST_on_event_send_complete_result); + ASSERT_ARE_EQUAL(int, AMQP_MESSENGER_REASON_TIMEOUT, TEST_on_event_send_complete_reason); + } + + TEST_message_queue_add_count = 0; // To simulate the queue empty after all the messages processed above. + + do_work_profile = get_msgr_do_work_exp_call_profile(AMQP_MESSENGER_STATE_STARTED, false, false, 0, DEFAULT_MAX_SEND_ERROR_COUNT, current_time, DEFAULT_EVENT_SEND_TIMEOUT_SECS); + crank_amqp_messenger_do_work(handle, do_work_profile); + + ASSERT_ARE_EQUAL(int, AMQP_MESSENGER_STATE_ERROR, saved_on_state_changed_callback_new_state); + + int result = amqp_messenger_stop(handle); + ASSERT_ARE_EQUAL(int, 0, result); + + result = amqp_messenger_start(handle, TEST_SESSION_HANDLE); + ASSERT_ARE_EQUAL(int, 0, result); + + ASSERT_ARE_EQUAL(int, AMQP_MESSENGER_STATE_STARTING, saved_on_state_changed_callback_new_state); + + do_work_profile = get_msgr_do_work_exp_call_profile(AMQP_MESSENGER_STATE_STARTING, false, false, 0, 0, current_time, DEFAULT_EVENT_SEND_TIMEOUT_SECS); + crank_amqp_messenger_do_work(handle, do_work_profile); + crank_amqp_messenger_do_work(handle, do_work_profile); + + // assert + + // Before the fix, GH issue #1386 would cause the messenger state to go to AMQP_MESSENGER_STATE_ERROR again, + // because send_error_count was not reset by amqp_messenger_stop. + ASSERT_ARE_EQUAL(int, AMQP_MESSENGER_STATE_STARTED, saved_on_state_changed_callback_new_state); + + // cleanup + amqp_messenger_destroy(handle); +} + // Tests_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_029: [If `messenger_handle` or `send_status` are NULL, amqp_messenger_get_send_status() shall fail and return a non-zero value] TEST_FUNCTION(amqp_messenger_get_send_status_NULL_handle) {