Skip to content

Commit

Permalink
Fix send_error_count not reset by amqp_messenger_stop (gh #1386)
Browse files Browse the repository at this point in the history
  • Loading branch information
ewertons committed Oct 23, 2021
1 parent 6ff24a9 commit 0c74b7d
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 2 deletions.
3 changes: 3 additions & 0 deletions iothub_client/src/iothubtransport_amqp_messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,9 @@ int amqp_messenger_stop(AMQP_MESSENGER_HANDLE messenger_handle)
{
// Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_062: [`instance->state` shall be set to AMQP_MESSENGER_STATE_STOPPED, and `instance->on_state_changed_callback` invoked if provided]
update_messenger_state(instance, AMQP_MESSENGER_STATE_STOPPED);

instance->send_error_count = 0;

// Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_063: [If no failures occur, amqp_messenger_stop() shall return 0]
result = RESULT_OK;
}
Expand Down
92 changes: 90 additions & 2 deletions iothub_client/tests/iothubtr_amqp_msgr_ut/iothubtr_amqp_msgr_ut.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ static const char* test_get_product_info(void* ctx)
}

#define DEFAULT_EVENT_SEND_RETRY_LIMIT 10
#define DEFAULT_EVENT_SEND_TIMEOUT_SECS 600
#define DEFAULT_EVENT_SEND_TIMEOUT_SECS 600
#define DEFAULT_MAX_SEND_ERROR_COUNT 10

#define UNIQUE_ID_BUFFER_SIZE 37
#define TEST_UNIQUE_ID "A1234DE234A1234DE234A1234DE234A1234DEA1234DE234A1234DE234A1234DE234A1234DEA1234DE234A1234DE234A1234DE234A1234DE"
Expand Down Expand Up @@ -296,7 +297,7 @@ static int TEST_messagereceiver_open(MESSAGE_RECEIVER_HANDLE message_receiver, O
return TEST_messagereceiver_open_result;
}

#define TEST_MESSAGE_QUEUE_ADD_BUFFERS_SIZE 10
#define TEST_MESSAGE_QUEUE_ADD_BUFFERS_SIZE 20
static size_t TEST_message_queue_add_count;
static MESSAGE_QUEUE_HANDLE TEST_message_queue_add_message_queue[TEST_MESSAGE_QUEUE_ADD_BUFFERS_SIZE];
static MQ_MESSAGE_HANDLE TEST_message_queue_add_message[TEST_MESSAGE_QUEUE_ADD_BUFFERS_SIZE];
Expand Down Expand Up @@ -2345,6 +2346,93 @@ TEST_FUNCTION(amqp_messenger_send_async_failure_checks)
amqp_messenger_destroy(handle);
}

TEST_FUNCTION(amqp_messenger_send_async_succeeds)
{
// arrange
AMQP_MESSENGER_CONFIG* config = get_messenger_config();
AMQP_MESSENGER_HANDLE handle = create_and_start_messenger2(config, false);

umock_c_reset_all_calls();

set_expected_calls_for_amqp_messenger_send_async();

// act
int result = amqp_messenger_send_async(handle, TEST_MESSAGE_HANDLE, TEST_on_event_send_complete, TEST_IOTHUB_CLIENT_HANDLE);

// assert
ASSERT_ARE_EQUAL(char_ptr, umock_c_get_expected_calls(), umock_c_get_actual_calls());
ASSERT_ARE_EQUAL(int, 0, result);

// cleanup
amqp_messenger_destroy(handle);
}

TEST_FUNCTION(gh1386_amqp_messenger_stop_resets_send_error_count)
{
// arrange
AMQP_MESSENGER_CONFIG* config = get_messenger_config();
AMQP_MESSENGER_HANDLE handle = create_and_start_messenger2(config, false);
const char user_context[] = "abcdefghijklmnopqrstuvwxyz";

time_t current_time = time(NULL);
MESSENGER_DO_WORK_EXP_CALL_PROFILE* do_work_profile = get_msgr_do_work_exp_call_profile(AMQP_MESSENGER_STATE_STARTED, false, false, 1, 0, current_time, DEFAULT_EVENT_SEND_TIMEOUT_SECS);
crank_amqp_messenger_do_work(handle, do_work_profile);

for (int i = 0; i <= DEFAULT_MAX_SEND_ERROR_COUNT; i++)
{
umock_c_reset_all_calls();
set_expected_calls_for_amqp_messenger_send_async();

int result = amqp_messenger_send_async(handle, TEST_MESSAGE_HANDLE, TEST_on_event_send_complete, (void*)&user_context[i]);

ASSERT_ARE_EQUAL(int, 0, result);
}

// act
for (int i = 0; i < TEST_message_queue_add_count; i++)
{
TEST_message_queue_add_on_message_processing_completed_callback[i]
(
TEST_message_queue_add_message[i],
MESSAGE_QUEUE_TIMEOUT,
NULL,
TEST_message_queue_add_user_context[i]
);

ASSERT_ARE_EQUAL(char_ptr, &user_context[i], TEST_on_event_send_complete_context);
ASSERT_ARE_EQUAL(int, AMQP_MESSENGER_SEND_RESULT_ERROR, TEST_on_event_send_complete_result);
ASSERT_ARE_EQUAL(int, AMQP_MESSENGER_REASON_TIMEOUT, TEST_on_event_send_complete_reason);
}

TEST_message_queue_add_count = 0; // To simulate the queue empty after all the messages processed above.

do_work_profile = get_msgr_do_work_exp_call_profile(AMQP_MESSENGER_STATE_STARTED, false, false, 0, DEFAULT_MAX_SEND_ERROR_COUNT, current_time, DEFAULT_EVENT_SEND_TIMEOUT_SECS);
crank_amqp_messenger_do_work(handle, do_work_profile);

ASSERT_ARE_EQUAL(int, AMQP_MESSENGER_STATE_ERROR, saved_on_state_changed_callback_new_state);

int result = amqp_messenger_stop(handle);
ASSERT_ARE_EQUAL(int, 0, result);

result = amqp_messenger_start(handle, TEST_SESSION_HANDLE);
ASSERT_ARE_EQUAL(int, 0, result);

ASSERT_ARE_EQUAL(int, AMQP_MESSENGER_STATE_STARTING, saved_on_state_changed_callback_new_state);

do_work_profile = get_msgr_do_work_exp_call_profile(AMQP_MESSENGER_STATE_STARTING, false, false, 0, 0, current_time, DEFAULT_EVENT_SEND_TIMEOUT_SECS);
crank_amqp_messenger_do_work(handle, do_work_profile);
crank_amqp_messenger_do_work(handle, do_work_profile);

// assert

// Before the fix, GH issue #1386 would cause the messenger state to go to AMQP_MESSENGER_STATE_ERROR again,
// because send_error_count was not reset by amqp_messenger_stop.
ASSERT_ARE_EQUAL(int, AMQP_MESSENGER_STATE_STARTED, saved_on_state_changed_callback_new_state);

// cleanup
amqp_messenger_destroy(handle);
}

// Tests_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_029: [If `messenger_handle` or `send_status` are NULL, amqp_messenger_get_send_status() shall fail and return a non-zero value]
TEST_FUNCTION(amqp_messenger_get_send_status_NULL_handle)
{
Expand Down

0 comments on commit 0c74b7d

Please sign in to comment.