Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix send_error_count not reset by amqp_messenger_stop (gh #1386) #2120

Merged
merged 1 commit into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions iothub_client/src/iothubtransport_amqp_messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,9 @@ int amqp_messenger_stop(AMQP_MESSENGER_HANDLE messenger_handle)
{
// Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_062: [`instance->state` shall be set to AMQP_MESSENGER_STATE_STOPPED, and `instance->on_state_changed_callback` invoked if provided]
update_messenger_state(instance, AMQP_MESSENGER_STATE_STOPPED);

instance->send_error_count = 0;

// Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_063: [If no failures occur, amqp_messenger_stop() shall return 0]
result = RESULT_OK;
}
Expand Down
92 changes: 90 additions & 2 deletions iothub_client/tests/iothubtr_amqp_msgr_ut/iothubtr_amqp_msgr_ut.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ static const char* test_get_product_info(void* ctx)
}

#define DEFAULT_EVENT_SEND_RETRY_LIMIT 10
#define DEFAULT_EVENT_SEND_TIMEOUT_SECS 600
#define DEFAULT_EVENT_SEND_TIMEOUT_SECS 600
#define DEFAULT_MAX_SEND_ERROR_COUNT 10

#define UNIQUE_ID_BUFFER_SIZE 37
#define TEST_UNIQUE_ID "A1234DE234A1234DE234A1234DE234A1234DEA1234DE234A1234DE234A1234DE234A1234DEA1234DE234A1234DE234A1234DE234A1234DE"
Expand Down Expand Up @@ -296,7 +297,7 @@ static int TEST_messagereceiver_open(MESSAGE_RECEIVER_HANDLE message_receiver, O
return TEST_messagereceiver_open_result;
}

#define TEST_MESSAGE_QUEUE_ADD_BUFFERS_SIZE 10
#define TEST_MESSAGE_QUEUE_ADD_BUFFERS_SIZE 20
static size_t TEST_message_queue_add_count;
static MESSAGE_QUEUE_HANDLE TEST_message_queue_add_message_queue[TEST_MESSAGE_QUEUE_ADD_BUFFERS_SIZE];
static MQ_MESSAGE_HANDLE TEST_message_queue_add_message[TEST_MESSAGE_QUEUE_ADD_BUFFERS_SIZE];
Expand Down Expand Up @@ -2345,6 +2346,93 @@ TEST_FUNCTION(amqp_messenger_send_async_failure_checks)
amqp_messenger_destroy(handle);
}

TEST_FUNCTION(amqp_messenger_send_async_succeeds)
{
// arrange
AMQP_MESSENGER_CONFIG* config = get_messenger_config();
AMQP_MESSENGER_HANDLE handle = create_and_start_messenger2(config, false);

umock_c_reset_all_calls();

set_expected_calls_for_amqp_messenger_send_async();

// act
int result = amqp_messenger_send_async(handle, TEST_MESSAGE_HANDLE, TEST_on_event_send_complete, TEST_IOTHUB_CLIENT_HANDLE);

// assert
ASSERT_ARE_EQUAL(char_ptr, umock_c_get_expected_calls(), umock_c_get_actual_calls());
ASSERT_ARE_EQUAL(int, 0, result);

// cleanup
amqp_messenger_destroy(handle);
}

TEST_FUNCTION(gh1386_amqp_messenger_stop_resets_send_error_count)
{
// arrange
AMQP_MESSENGER_CONFIG* config = get_messenger_config();
AMQP_MESSENGER_HANDLE handle = create_and_start_messenger2(config, false);
const char user_context[] = "abcdefghijklmnopqrstuvwxyz";

time_t current_time = time(NULL);
MESSENGER_DO_WORK_EXP_CALL_PROFILE* do_work_profile = get_msgr_do_work_exp_call_profile(AMQP_MESSENGER_STATE_STARTED, false, false, 1, 0, current_time, DEFAULT_EVENT_SEND_TIMEOUT_SECS);
crank_amqp_messenger_do_work(handle, do_work_profile);

for (int i = 0; i <= DEFAULT_MAX_SEND_ERROR_COUNT; i++)
{
umock_c_reset_all_calls();
set_expected_calls_for_amqp_messenger_send_async();

int result = amqp_messenger_send_async(handle, TEST_MESSAGE_HANDLE, TEST_on_event_send_complete, (void*)&user_context[i]);

ASSERT_ARE_EQUAL(int, 0, result);
}

// act
for (size_t i = 0; i < TEST_message_queue_add_count; i++)
{
TEST_message_queue_add_on_message_processing_completed_callback[i]
(
TEST_message_queue_add_message[i],
MESSAGE_QUEUE_TIMEOUT,
NULL,
TEST_message_queue_add_user_context[i]
);

ASSERT_ARE_EQUAL(char_ptr, &user_context[i], TEST_on_event_send_complete_context);
ASSERT_ARE_EQUAL(int, AMQP_MESSENGER_SEND_RESULT_ERROR, TEST_on_event_send_complete_result);
ASSERT_ARE_EQUAL(int, AMQP_MESSENGER_REASON_TIMEOUT, TEST_on_event_send_complete_reason);
}

TEST_message_queue_add_count = 0; // To simulate the queue empty after all the messages processed above.

do_work_profile = get_msgr_do_work_exp_call_profile(AMQP_MESSENGER_STATE_STARTED, false, false, 0, DEFAULT_MAX_SEND_ERROR_COUNT, current_time, DEFAULT_EVENT_SEND_TIMEOUT_SECS);
crank_amqp_messenger_do_work(handle, do_work_profile);

ASSERT_ARE_EQUAL(int, AMQP_MESSENGER_STATE_ERROR, saved_on_state_changed_callback_new_state);

int result = amqp_messenger_stop(handle);
ASSERT_ARE_EQUAL(int, 0, result);

result = amqp_messenger_start(handle, TEST_SESSION_HANDLE);
ASSERT_ARE_EQUAL(int, 0, result);

ASSERT_ARE_EQUAL(int, AMQP_MESSENGER_STATE_STARTING, saved_on_state_changed_callback_new_state);

do_work_profile = get_msgr_do_work_exp_call_profile(AMQP_MESSENGER_STATE_STARTING, false, false, 0, 0, current_time, DEFAULT_EVENT_SEND_TIMEOUT_SECS);
crank_amqp_messenger_do_work(handle, do_work_profile);
crank_amqp_messenger_do_work(handle, do_work_profile);

// assert

// Before the fix, GH issue #1386 would cause the messenger state to go to AMQP_MESSENGER_STATE_ERROR again,
// because send_error_count was not reset by amqp_messenger_stop.
ASSERT_ARE_EQUAL(int, AMQP_MESSENGER_STATE_STARTED, saved_on_state_changed_callback_new_state);

// cleanup
amqp_messenger_destroy(handle);
}

// Tests_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_029: [If `messenger_handle` or `send_status` are NULL, amqp_messenger_get_send_status() shall fail and return a non-zero value]
TEST_FUNCTION(amqp_messenger_get_send_status_NULL_handle)
{
Expand Down