From 9cefef5dd65ce034706f599c804236f38ad19be0 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Tue, 28 May 2024 15:09:25 -0700 Subject: [PATCH 01/18] add regression test --- tests/CMakeLists.txt | 1 + tests/byo_crypto_test.c | 2 - tests/channel_test.c | 2 +- tests/read_write_test_handler.c | 54 +++--- tests/read_write_test_handler.h | 8 +- tests/socket_handler_test.c | 317 ++++++++++++++++++++++++-------- tests/tls_handler_test.c | 8 +- 7 files changed, 280 insertions(+), 112 deletions(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 194dda6fd..e1187680b 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -123,6 +123,7 @@ add_test_case(pem_sanitize_wrong_format_rejected) add_test_case(socket_handler_echo_and_backpressure) add_test_case(socket_handler_close) +add_test_case(socket_handler_read_to_eof_after_peer_hangup) add_test_case(socket_pinned_event_loop) add_net_test_case(socket_pinned_event_loop_dns_failure) diff --git a/tests/byo_crypto_test.c b/tests/byo_crypto_test.c index 878889646..3f219e0a1 100644 --- a/tests/byo_crypto_test.c +++ b/tests/byo_crypto_test.c @@ -362,7 +362,6 @@ static int s_byo_tls_handler_test(struct aws_allocator *allocator, void *ctx) { allocator, s_byo_crypto_test_handle_read, s_byo_crypto_test_handle_write, - true, s_outgoing_initial_read_window, &outgoing_rw_args); ASSERT_NOT_NULL(outgoing_rw_handler); @@ -371,7 +370,6 @@ static int s_byo_tls_handler_test(struct aws_allocator *allocator, void *ctx) { allocator, s_byo_crypto_test_handle_read, s_byo_crypto_test_handle_write, - true, s_incoming_initial_read_window, &incoming_rw_args); ASSERT_NOT_NULL(outgoing_rw_handler); diff --git a/tests/channel_test.c b/tests/channel_test.c index 9a730a351..7812f8eed 100644 --- a/tests/channel_test.c +++ b/tests/channel_test.c @@ -267,7 +267,7 @@ static int s_test_channel_refcount(struct aws_allocator *allocator, void *ctx) { struct aws_channel_slot *slot = aws_channel_slot_new(channel); ASSERT_NOT_NULL(slot); - struct aws_channel_handler *handler = rw_handler_new(allocator, NULL, NULL, false, 10000, NULL); + struct aws_channel_handler *handler = rw_handler_new(allocator, NULL, NULL, 10000, NULL); struct aws_atomic_var destroy_called = AWS_ATOMIC_INIT_INT(0); struct aws_mutex destroy_mutex = AWS_MUTEX_INIT; diff --git a/tests/read_write_test_handler.c b/tests/read_write_test_handler.c index c43241aad..5b607ec98 100644 --- a/tests/read_write_test_handler.c +++ b/tests/read_write_test_handler.c @@ -23,7 +23,6 @@ struct rw_test_handler_impl { struct aws_condition_variable *destroy_condition_variable; rw_handler_driver_fn *on_read; rw_handler_driver_fn *on_write; - bool event_loop_driven; size_t window; struct aws_condition_variable condition_variable; struct aws_mutex mutex; @@ -138,7 +137,6 @@ struct aws_channel_handler *rw_handler_new( struct aws_allocator *allocator, rw_handler_driver_fn *on_read, rw_handler_driver_fn *on_write, - bool event_loop_driven, size_t window, void *ctx) { @@ -151,7 +149,6 @@ struct aws_channel_handler *rw_handler_new( handler_impl->on_read = on_read; handler_impl->on_write = on_write; handler_impl->ctx = ctx; - handler_impl->event_loop_driven = event_loop_driven; handler_impl->window = window; handler_impl->condition_variable = (struct aws_condition_variable)AWS_CONDITION_VARIABLE_INIT; handler_impl->mutex = (struct aws_mutex)AWS_MUTEX_INIT; @@ -190,6 +187,8 @@ struct rw_handler_write_task_args { struct aws_channel_slot *slot; struct aws_byte_buf *buffer; struct aws_channel_task task; + aws_channel_on_message_write_completed_fn *on_completion; + void *user_data; }; static void s_rw_handler_write_task(struct aws_channel_task *task, void *arg, enum aws_task_status task_status) { @@ -200,36 +199,39 @@ static void s_rw_handler_write_task(struct aws_channel_task *task, void *arg, en struct aws_io_message *msg = aws_channel_acquire_message_from_pool( write_task_args->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, write_task_args->buffer->len); + msg->on_completion = write_task_args->on_completion; + msg->user_data = write_task_args->user_data; + struct aws_byte_cursor write_buffer = aws_byte_cursor_from_buf(write_task_args->buffer); - aws_byte_buf_append(&msg->message_data, &write_buffer); + AWS_FATAL_ASSERT(aws_byte_buf_append(&msg->message_data, &write_buffer) == AWS_OP_SUCCESS); - aws_channel_slot_send_message(write_task_args->slot, msg, AWS_CHANNEL_DIR_WRITE); + AWS_FATAL_ASSERT( + aws_channel_slot_send_message(write_task_args->slot, msg, AWS_CHANNEL_DIR_WRITE) == AWS_OP_SUCCESS); aws_mem_release(write_task_args->handler->alloc, write_task_args); } void rw_handler_write(struct aws_channel_handler *handler, struct aws_channel_slot *slot, struct aws_byte_buf *buffer) { + rw_handler_write_with_callback(handler, slot, buffer, NULL /*on_completion*/, NULL /*user_data*/); +} - struct rw_test_handler_impl *handler_impl = handler->impl; - - if (!handler_impl->event_loop_driven || aws_channel_thread_is_callers_thread(slot->channel)) { - struct aws_io_message *msg = - aws_channel_acquire_message_from_pool(slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, buffer->len); - - struct aws_byte_cursor write_buffer = aws_byte_cursor_from_buf(buffer); - aws_byte_buf_append(&msg->message_data, &write_buffer); - - aws_channel_slot_send_message(slot, msg, AWS_CHANNEL_DIR_WRITE); - } else { - struct rw_handler_write_task_args *write_task_args = - aws_mem_acquire(handler->alloc, sizeof(struct rw_handler_write_task_args)); - write_task_args->handler = handler; - write_task_args->buffer = buffer; - write_task_args->slot = slot; - aws_channel_task_init(&write_task_args->task, s_rw_handler_write_task, write_task_args, "rw_handler_write"); - - aws_channel_schedule_task_now(slot->channel, &write_task_args->task); - } +void rw_handler_write_with_callback( + struct aws_channel_handler *handler, + struct aws_channel_slot *slot, + struct aws_byte_buf *buffer, + aws_channel_on_message_write_completed_fn *on_completion, + void *user_data) { + + struct rw_handler_write_task_args *write_task_args = + aws_mem_acquire(handler->alloc, sizeof(struct rw_handler_write_task_args)); + write_task_args->handler = handler; + write_task_args->buffer = buffer; + write_task_args->slot = slot; + write_task_args->on_completion = on_completion; + write_task_args->user_data = user_data; + aws_channel_task_init(&write_task_args->task, s_rw_handler_write_task, write_task_args, "rw_handler_write"); + + aws_channel_schedule_task_now(slot->channel, &write_task_args->task); } struct increment_read_window_task_args { @@ -259,7 +261,7 @@ void rw_handler_trigger_increment_read_window( struct rw_test_handler_impl *handler_impl = handler->impl; - if (!handler_impl->event_loop_driven || aws_channel_thread_is_callers_thread(slot->channel)) { + if (aws_channel_thread_is_callers_thread(slot->channel)) { handler_impl->window += window_update; aws_channel_slot_increment_read_window(slot, window_update); } else { diff --git a/tests/read_write_test_handler.h b/tests/read_write_test_handler.h index 0fa6a9564..484a32fc4 100644 --- a/tests/read_write_test_handler.h +++ b/tests/read_write_test_handler.h @@ -25,7 +25,6 @@ struct aws_channel_handler *rw_handler_new( struct aws_allocator *allocator, rw_handler_driver_fn *on_read, rw_handler_driver_fn *on_write, - bool event_loop_driven, size_t window, void *ctx); @@ -36,6 +35,13 @@ void rw_handler_enable_wait_on_destroy( void rw_handler_write(struct aws_channel_handler *handler, struct aws_channel_slot *slot, struct aws_byte_buf *buffer); +void rw_handler_write_with_callback( + struct aws_channel_handler *handler, + struct aws_channel_slot *slot, + struct aws_byte_buf *buffer, + aws_channel_on_message_write_completed_fn *on_completion, + void *user_data); + void rw_handler_trigger_read(struct aws_channel_handler *handler, struct aws_channel_slot *slot); bool rw_handler_shutdown_called(struct aws_channel_handler *handler); diff --git a/tests/socket_handler_test.c b/tests/socket_handler_test.c index ed94eff8c..fe3888785 100644 --- a/tests/socket_handler_test.c +++ b/tests/socket_handler_test.c @@ -17,6 +17,9 @@ #include "statistics_handler_test.h" #include +#define NANOS_PER_SEC ((uint64_t)AWS_TIMESTAMP_NANOS) +#define TIMEOUT (10 * NANOS_PER_SEC) + struct socket_test_args { struct aws_allocator *allocator; struct aws_mutex *mutex; @@ -24,7 +27,7 @@ struct socket_test_args { struct aws_channel *channel; struct aws_channel_handler *rw_handler; - struct aws_atomic_var rw_slot; /* pointer-to struct aws_channel_slot */ + struct aws_channel_slot *rw_slot; int error_code; bool shutdown_invoked; bool error_invoked; @@ -87,7 +90,7 @@ static bool s_pinned_channel_setup_predicate(void *user_data) { static bool s_channel_setup_predicate(void *user_data) { struct socket_test_args *setup_test_args = (struct socket_test_args *)user_data; - return aws_atomic_load_ptr(&setup_test_args->rw_slot) != NULL; + return setup_test_args->rw_slot != NULL; } static bool s_channel_shutdown_predicate(void *user_data) { @@ -120,7 +123,7 @@ static void s_socket_handler_test_client_setup_callback( aws_channel_slot_insert_end(channel, rw_slot); aws_channel_slot_set_handler(rw_slot, setup_test_args->rw_handler); - aws_atomic_store_ptr(&setup_test_args->rw_slot, rw_slot); + setup_test_args->rw_slot = rw_slot; aws_mutex_unlock(setup_test_args->mutex); @@ -147,7 +150,7 @@ static void s_socket_handler_test_server_setup_callback( aws_channel_slot_insert_end(channel, rw_slot); aws_channel_slot_set_handler(rw_slot, setup_test_args->rw_handler); - aws_atomic_store_ptr(&setup_test_args->rw_slot, rw_slot); + setup_test_args->rw_slot = rw_slot; } aws_mutex_unlock(setup_test_args->mutex); @@ -198,6 +201,7 @@ struct socket_test_rw_args { struct aws_byte_buf received_message; size_t amount_read; size_t expected_read; + size_t amount_written; bool invocation_happened; bool shutdown_finished; }; @@ -224,8 +228,7 @@ static struct aws_byte_buf s_socket_test_handle_read( struct socket_test_rw_args *rw_args = (struct socket_test_rw_args *)user_data; aws_mutex_lock(rw_args->mutex); - memcpy(rw_args->received_message.buffer + rw_args->received_message.len, data_read->buffer, data_read->len); - rw_args->received_message.len += data_read->len; + AWS_FATAL_ASSERT(aws_byte_buf_write_from_whole_buffer(&rw_args->received_message, *data_read)); rw_args->amount_read += data_read->len; rw_args->invocation_happened = true; aws_condition_variable_notify_one(rw_args->condition_variable); @@ -234,6 +237,23 @@ static struct aws_byte_buf s_socket_test_handle_read( return rw_args->received_message; } +void s_socket_test_handle_on_write_completed( + struct aws_channel *channel, + struct aws_io_message *message, + int error_code, + void *user_data) { + + (void)channel; + AWS_FATAL_ASSERT(error_code == 0); + struct socket_test_rw_args *rw_args = (struct socket_test_rw_args *)user_data; + + aws_mutex_lock(rw_args->mutex); + rw_args->amount_written += message->message_data.len; + rw_args->invocation_happened = true; + aws_condition_variable_notify_one(rw_args->condition_variable); + aws_mutex_unlock(rw_args->mutex); +} + static struct aws_byte_buf s_socket_test_handle_write( struct aws_channel_handler *handler, struct aws_channel_slot *slot, @@ -330,11 +350,11 @@ static int s_socket_pinned_event_loop_test(struct aws_allocator *allocator, void s_socket_common_tester_init(allocator, &c_tester); struct aws_channel_handler *client_rw_handler = - rw_handler_new(allocator, s_socket_test_handle_write, s_socket_test_handle_write, true, SIZE_MAX, NULL); + rw_handler_new(allocator, s_socket_test_handle_write, s_socket_test_handle_write, SIZE_MAX, NULL); ASSERT_NOT_NULL(client_rw_handler); struct aws_channel_handler *server_rw_handler = - rw_handler_new(allocator, s_socket_test_handle_write, s_socket_test_handle_write, true, SIZE_MAX, NULL); + rw_handler_new(allocator, s_socket_test_handle_write, s_socket_test_handle_write, SIZE_MAX, NULL); ASSERT_NOT_NULL(server_rw_handler); struct socket_test_args server_args; @@ -371,10 +391,10 @@ static int s_socket_pinned_event_loop_test(struct aws_allocator *allocator, void ASSERT_SUCCESS(aws_mutex_lock(&c_tester.mutex)); /* wait for both ends to setup */ - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_pinned_channel_setup_predicate, &server_args)); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_pinned_channel_setup_predicate, &client_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_pinned_channel_setup_predicate, &server_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_pinned_channel_setup_predicate, &client_args)); /* Verify the client channel was placed on the requested event loop */ ASSERT_PTR_EQUALS(pinned_event_loop, aws_channel_get_event_loop(client_args.channel)); @@ -382,13 +402,13 @@ static int s_socket_pinned_event_loop_test(struct aws_allocator *allocator, void ASSERT_SUCCESS(aws_channel_shutdown(server_args.channel, AWS_OP_SUCCESS)); ASSERT_SUCCESS(aws_channel_shutdown(client_args.channel, AWS_OP_SUCCESS)); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_channel_shutdown_predicate, &server_args)); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_channel_shutdown_predicate, &client_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_shutdown_predicate, &server_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_shutdown_predicate, &client_args)); aws_server_bootstrap_destroy_socket_listener(local_server_tester.server_bootstrap, local_server_tester.listener); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_listener_destroy_predicate, &server_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_listener_destroy_predicate, &server_args)); aws_mutex_unlock(&c_tester.mutex); @@ -487,8 +507,8 @@ static int s_socket_pinned_event_loop_dns_failure_test(struct aws_allocator *all ASSERT_SUCCESS(aws_client_bootstrap_new_socket_channel(&client_channel_options)); ASSERT_SUCCESS(aws_mutex_lock(&c_tester.mutex)); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_dns_failure_channel_setup_predicate, &c_tester)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_dns_failure_channel_setup_predicate, &c_tester)); /* Verify the setup callback failure was on the requested event loop */ ASSERT_TRUE(c_tester.setup_error_code != 0); @@ -535,7 +555,6 @@ static int s_socket_echo_and_backpressure_test(struct aws_allocator *allocator, allocator, s_socket_test_handle_read, s_socket_test_handle_write, - true, s_client_initial_read_window, &client_rw_args); ASSERT_NOT_NULL(client_rw_handler); @@ -544,7 +563,6 @@ static int s_socket_echo_and_backpressure_test(struct aws_allocator *allocator, allocator, s_socket_test_handle_read, s_socket_test_handle_write, - true, s_server_initial_read_window, &server_rw_args); ASSERT_NOT_NULL(server_rw_handler); @@ -581,20 +599,20 @@ static int s_socket_echo_and_backpressure_test(struct aws_allocator *allocator, ASSERT_SUCCESS(aws_mutex_lock(&c_tester.mutex)); /* wait for both ends to setup */ - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_channel_setup_predicate, &server_args)); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_channel_setup_predicate, &client_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_setup_predicate, &server_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_setup_predicate, &client_args)); /* send msg from client to server, and wait for some bytes to be received */ - rw_handler_write(client_args.rw_handler, aws_atomic_load_ptr(&client_args.rw_slot), &msg_from_client); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_socket_test_read_predicate, &server_rw_args)); + rw_handler_write(client_args.rw_handler, client_args.rw_slot, &msg_from_client); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_socket_test_read_predicate, &server_rw_args)); /* send msg from server to client, and wait for some bytes to be received */ - rw_handler_write(server_args.rw_handler, aws_atomic_load_ptr(&server_args.rw_slot), &msg_from_server); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_socket_test_read_predicate, &client_rw_args)); + rw_handler_write(server_args.rw_handler, server_args.rw_slot, &msg_from_server); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_socket_test_read_predicate, &client_rw_args)); /* confirm that the initial read window was respected */ server_rw_args.invocation_happened = false; @@ -604,13 +622,13 @@ static int s_socket_echo_and_backpressure_test(struct aws_allocator *allocator, ASSERT_INT_EQUALS(s_server_initial_read_window, server_rw_args.amount_read); /* increment the read window on both sides and confirm they receive the remainder of their message */ - rw_handler_trigger_increment_read_window(server_args.rw_handler, aws_atomic_load_ptr(&server_args.rw_slot), 100); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_socket_test_full_read_predicate, &server_rw_args)); + rw_handler_trigger_increment_read_window(server_args.rw_handler, server_args.rw_slot, 100); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_socket_test_full_read_predicate, &server_rw_args)); - rw_handler_trigger_increment_read_window(client_args.rw_handler, aws_atomic_load_ptr(&client_args.rw_slot), 100); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_socket_test_full_read_predicate, &client_rw_args)); + rw_handler_trigger_increment_read_window(client_args.rw_handler, client_args.rw_slot, 100); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_socket_test_full_read_predicate, &client_rw_args)); ASSERT_INT_EQUALS(msg_from_server.len, client_rw_args.amount_read); ASSERT_INT_EQUALS(msg_from_client.len, server_rw_args.amount_read); @@ -630,13 +648,13 @@ static int s_socket_echo_and_backpressure_test(struct aws_allocator *allocator, ASSERT_SUCCESS(aws_channel_shutdown(server_args.channel, AWS_OP_SUCCESS)); ASSERT_SUCCESS(aws_channel_shutdown(client_args.channel, AWS_OP_SUCCESS)); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_channel_shutdown_predicate, &server_args)); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_channel_shutdown_predicate, &client_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_shutdown_predicate, &server_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_shutdown_predicate, &client_args)); aws_server_bootstrap_destroy_socket_listener(local_server_tester.server_bootstrap, local_server_tester.listener); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_listener_destroy_predicate, &server_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_listener_destroy_predicate, &server_args)); aws_mutex_unlock(&c_tester.mutex); @@ -673,11 +691,11 @@ static int s_socket_close_test(struct aws_allocator *allocator, void *ctx) { 0)); struct aws_channel_handler *client_rw_handler = - rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 10000, &client_rw_args); + rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, 10000, &client_rw_args); ASSERT_NOT_NULL(client_rw_handler); struct aws_channel_handler *server_rw_handler = - rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 10000, &server_rw_args); + rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, 10000, &server_rw_args); ASSERT_NOT_NULL(server_rw_handler); struct socket_test_args server_args; @@ -711,24 +729,24 @@ static int s_socket_close_test(struct aws_allocator *allocator, void *ctx) { ASSERT_SUCCESS(aws_mutex_lock(&c_tester.mutex)); /* wait for both ends to setup */ - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_channel_setup_predicate, &server_args)); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_channel_setup_predicate, &client_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_setup_predicate, &server_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_setup_predicate, &client_args)); aws_channel_shutdown(server_args.channel, AWS_OP_SUCCESS); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_channel_shutdown_predicate, &server_args)); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_channel_shutdown_predicate, &client_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_shutdown_predicate, &server_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_shutdown_predicate, &client_args)); ASSERT_INT_EQUALS(AWS_OP_SUCCESS, server_args.error_code); ASSERT_TRUE( AWS_IO_SOCKET_CLOSED == client_args.error_code || AWS_IO_SOCKET_NOT_CONNECTED == client_args.error_code); aws_server_bootstrap_destroy_socket_listener(local_server_tester.server_bootstrap, local_server_tester.listener); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_listener_destroy_predicate, &server_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_listener_destroy_predicate, &server_args)); aws_mutex_unlock(&c_tester.mutex); @@ -743,6 +761,151 @@ static int s_socket_close_test(struct aws_allocator *allocator, void *ctx) { AWS_TEST_CASE(socket_handler_close, s_socket_close_test) +static int s_socket_handler_read_to_eof_after_peer_hangup_test(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + s_socket_common_tester_init(allocator, &c_tester); + + const size_t total_bytes_to_send_from_server = g_aws_channel_max_fragment_size; + uint8_t *client_received_message = aws_mem_acquire(allocator, total_bytes_to_send_from_server); + + struct socket_test_rw_args server_rw_args; + ASSERT_SUCCESS(s_rw_args_init(&server_rw_args, &c_tester, aws_byte_buf_from_empty_array(NULL, 0), 0)); + + struct socket_test_rw_args client_rw_args; + ASSERT_SUCCESS(s_rw_args_init( + &client_rw_args, + &c_tester, + aws_byte_buf_from_empty_array(client_received_message, total_bytes_to_send_from_server), + 0)); + + /* NOTE client start with window=0 */ + struct aws_channel_handler *client_rw_handler = + rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, 0 /*window*/, &client_rw_args); + ASSERT_NOT_NULL(client_rw_handler); + + struct aws_channel_handler *server_rw_handler = + rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, 10000, &server_rw_args); + ASSERT_NOT_NULL(server_rw_handler); + + struct socket_test_args server_args; + ASSERT_SUCCESS(s_socket_test_args_init(&server_args, &c_tester, server_rw_handler)); + + struct socket_test_args client_args; + ASSERT_SUCCESS(s_socket_test_args_init(&client_args, &c_tester, client_rw_handler)); + + struct local_server_tester local_server_tester; + ASSERT_SUCCESS(s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, false)); + + struct aws_client_bootstrap_options client_bootstrap_options = { + .event_loop_group = c_tester.el_group, + .host_resolver = NULL, + }; + struct aws_client_bootstrap *client_bootstrap = aws_client_bootstrap_new(allocator, &client_bootstrap_options); + ASSERT_NOT_NULL(client_bootstrap); + + struct aws_socket_channel_bootstrap_options client_channel_options = { + .bootstrap = client_bootstrap, + .host_name = local_server_tester.endpoint.address, + .port = 0, + .socket_options = &local_server_tester.socket_options, + .setup_callback = s_socket_handler_test_client_setup_callback, + .shutdown_callback = s_socket_handler_test_client_shutdown_callback, + .user_data = &client_args, + .enable_read_back_pressure = true, + }; + + ASSERT_SUCCESS(aws_client_bootstrap_new_socket_channel(&client_channel_options)); + + ASSERT_SUCCESS(aws_mutex_lock(&c_tester.mutex)); + + /* wait for both ends to setup */ + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_setup_predicate, &server_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_setup_predicate, &client_args)); + + /* We want the server to send some data and hang up IMMEDIATELY after, + * before the client has fully read the data. This is tricky to do in a test. + * + * First, have the server send data... */ + struct aws_byte_buf msg_from_server; + ASSERT_SUCCESS(aws_byte_buf_init(&msg_from_server, allocator, total_bytes_to_send_from_server)); + msg_from_server.len = total_bytes_to_send_from_server; + rw_handler_write_with_callback( + server_rw_handler, + server_args.rw_slot, + &msg_from_server, + s_socket_test_handle_on_write_completed, + &server_rw_args); + + /* ...now have the client open its read window and receive data in tiny chunks, + * stopping once the server has sent all data, but BEFORE the client has read all data. + * This is possible because the client's OS will buffer a certain amount of + * incoming data, before the client application calls read() on it. */ + while (server_rw_args.amount_written < total_bytes_to_send_from_server) { + const size_t client_read_chunk_size = 128; + client_rw_args.expected_read += client_read_chunk_size; + rw_handler_trigger_increment_read_window(client_args.rw_handler, client_args.rw_slot, client_read_chunk_size); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, + &c_tester.mutex, + TIMEOUT, + s_socket_test_full_read_predicate, + &client_rw_args)); + } + + /* Now close the server's socket.*/ + ASSERT_SUCCESS(aws_channel_shutdown(server_args.channel, AWS_OP_SUCCESS)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_shutdown_predicate, &server_args)); + + /* Now sleep a moment to give the OS time to propagate the socket-close event to the client-side. */ + aws_mutex_unlock(&c_tester.mutex); + aws_thread_current_sleep(NANOS_PER_SEC / 4); + aws_mutex_lock(&c_tester.mutex); + + /* Ensure the client hasn't shut down before reading all the data. */ + ASSERT_FALSE(client_args.shutdown_invoked, "Client should read all data before shutting down."); + + /* Ensure the client hasn't read all data yet */ + ASSERT_TRUE( + client_rw_args.amount_read < total_bytes_to_send_from_server, + "If this fails, then we're not truly reproducing the regression test." + " The server needs to finish sending data, and close the socket," + " BEFORE the client reads all the data."); + + /* Have the client open its window enough to receive the rest of the data. + * If the client socket closes before all the data is received, then we still have the bug. */ + rw_handler_trigger_increment_read_window( + client_args.rw_handler, client_args.rw_slot, total_bytes_to_send_from_server); + client_rw_args.expected_read = total_bytes_to_send_from_server; + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_socket_test_full_read_predicate, &client_rw_args)); + + /* Wait for client to shutdown, due to the server having closed the socket */ + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_shutdown_predicate, &client_args)); + + aws_server_bootstrap_destroy_socket_listener(local_server_tester.server_bootstrap, local_server_tester.listener); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_listener_destroy_predicate, &server_args)); + + ASSERT_INT_EQUALS(AWS_IO_SOCKET_CLOSED, client_args.error_code); + + aws_mutex_unlock(&c_tester.mutex); + + /* clean up */ + ASSERT_SUCCESS(s_local_server_tester_clean_up(&local_server_tester)); + aws_mem_release(allocator, client_received_message); + + aws_byte_buf_clean_up(&msg_from_server); + aws_client_bootstrap_release(client_bootstrap); + ASSERT_SUCCESS(s_socket_common_tester_clean_up(&c_tester)); + + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(socket_handler_read_to_eof_after_peer_hangup, s_socket_handler_read_to_eof_after_peer_hangup_test) + static void s_creation_callback_test_channel_creation_callback( struct aws_client_bootstrap *bootstrap, int error_code, @@ -829,11 +992,11 @@ static int s_open_channel_statistics_test(struct aws_allocator *allocator, void 0)); struct aws_channel_handler *client_rw_handler = - rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 10000, &client_rw_args); + rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, 10000, &client_rw_args); ASSERT_NOT_NULL(client_rw_handler); struct aws_channel_handler *server_rw_handler = - rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 10000, &server_rw_args); + rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, 10000, &server_rw_args); ASSERT_NOT_NULL(server_rw_handler); struct socket_test_args server_args; @@ -869,22 +1032,20 @@ static int s_open_channel_statistics_test(struct aws_allocator *allocator, void ASSERT_SUCCESS(aws_mutex_lock(&c_tester.mutex)); /* wait for both ends to setup */ - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_channel_setup_predicate, &server_args)); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_channel_setup_predicate, &client_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_setup_predicate, &server_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_setup_predicate, &client_args)); ASSERT_TRUE(client_args.creation_callback_invoked); - struct aws_channel_slot *client_rw_slot = aws_atomic_load_ptr(&client_args.rw_slot); - rw_handler_write(client_args.rw_handler, client_rw_slot, &msg_from_client); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_socket_test_read_predicate, &server_rw_args)); + rw_handler_write(client_args.rw_handler, client_args.rw_slot, &msg_from_client); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_socket_test_read_predicate, &server_rw_args)); - struct aws_channel_slot *server_rw_slot = aws_atomic_load_ptr(&server_args.rw_slot); - rw_handler_write(server_args.rw_handler, server_rw_slot, &msg_from_server); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_socket_test_read_predicate, &client_rw_args)); + rw_handler_write(server_args.rw_handler, server_args.rw_slot, &msg_from_server); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_socket_test_read_predicate, &client_rw_args)); uint64_t ms_to_ns = aws_timestamp_convert(1, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL); @@ -894,8 +1055,8 @@ static int s_open_channel_statistics_test(struct aws_allocator *allocator, void struct aws_statistics_handler_test_impl *stats_impl = stats_handler->impl; aws_mutex_lock(&stats_impl->lock); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &stats_impl->signal, &stats_impl->lock, s_stats_processed_predicate, stats_handler)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &stats_impl->signal, &stats_impl->lock, TIMEOUT, s_stats_processed_predicate, stats_handler)); ASSERT_TRUE(stats_impl->total_bytes_read == msg_from_server.len); ASSERT_TRUE(stats_impl->total_bytes_written == msg_from_client.len); @@ -904,14 +1065,14 @@ static int s_open_channel_statistics_test(struct aws_allocator *allocator, void aws_channel_shutdown(server_args.channel, AWS_OP_SUCCESS); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_channel_shutdown_predicate, &server_args)); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_channel_shutdown_predicate, &client_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_shutdown_predicate, &server_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_shutdown_predicate, &client_args)); aws_server_bootstrap_destroy_socket_listener(local_server_tester.server_bootstrap, local_server_tester.listener); - ASSERT_SUCCESS(aws_condition_variable_wait_pred( - &c_tester.condition_variable, &c_tester.mutex, s_listener_destroy_predicate, &server_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_listener_destroy_predicate, &server_args)); aws_mutex_unlock(&c_tester.mutex); diff --git a/tests/tls_handler_test.c b/tests/tls_handler_test.c index 1a7f94ddf..7b501b348 100644 --- a/tests/tls_handler_test.c +++ b/tests/tls_handler_test.c @@ -515,11 +515,11 @@ static int s_tls_channel_echo_and_backpressure_test_fn(struct aws_allocator *all allocator, &local_server_tester, &incoming_args, &c_tester, true, "server.crt", "server.key")); /* make the windows small to make sure back pressure is honored. */ struct aws_channel_handler *outgoing_rw_handler = rw_handler_new( - allocator, s_tls_test_handle_read, s_tls_test_handle_write, true, write_tag.len / 2, &outgoing_rw_args); + allocator, s_tls_test_handle_read, s_tls_test_handle_write, write_tag.len / 2, &outgoing_rw_args); ASSERT_NOT_NULL(outgoing_rw_handler); struct aws_channel_handler *incoming_rw_handler = rw_handler_new( - allocator, s_tls_test_handle_read, s_tls_test_handle_write, true, read_tag.len / 2, &incoming_rw_args); + allocator, s_tls_test_handle_read, s_tls_test_handle_write, read_tag.len / 2, &incoming_rw_args); ASSERT_NOT_NULL(incoming_rw_handler); incoming_args.rw_handler = incoming_rw_handler; @@ -1717,11 +1717,11 @@ static int s_tls_channel_statistics_test(struct aws_allocator *allocator, void * allocator, &local_server_tester, &incoming_args, &c_tester, false, "server.crt", "server.key")); struct aws_channel_handler *outgoing_rw_handler = - rw_handler_new(allocator, s_tls_test_handle_read, s_tls_test_handle_write, true, 10000, &outgoing_rw_args); + rw_handler_new(allocator, s_tls_test_handle_read, s_tls_test_handle_write, 10000, &outgoing_rw_args); ASSERT_NOT_NULL(outgoing_rw_handler); struct aws_channel_handler *incoming_rw_handler = - rw_handler_new(allocator, s_tls_test_handle_read, s_tls_test_handle_write, true, 10000, &incoming_rw_args); + rw_handler_new(allocator, s_tls_test_handle_read, s_tls_test_handle_write, 10000, &incoming_rw_args); ASSERT_NOT_NULL(incoming_rw_handler); incoming_args.rw_handler = incoming_rw_handler; From 408970c376032b215466baa9eba8f530e85b15f2 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Tue, 28 May 2024 17:51:32 -0700 Subject: [PATCH 02/18] socket_channel_handler no longer shuts down the channel the moment an error event arrives from the file-descriptor, since the HANG_UP event may come when there's still a lot more data to read on the socket. --- source/posix/socket.c | 30 ++++++++++++++++------------ source/socket_channel_handler.c | 35 +++++++++++++++++++-------------- 2 files changed, 37 insertions(+), 28 deletions(-) diff --git a/source/posix/socket.c b/source/posix/socket.c index 0dac9442c..16972756e 100644 --- a/source/posix/socket.c +++ b/source/posix/socket.c @@ -1668,6 +1668,23 @@ static void s_on_socket_io_event( * subscribed is set to false. */ aws_ref_count_acquire(&socket_impl->internal_refcount); + /* NOTE: READABLE|WRITABLE|HANG_UP events might arrive simultaneously + * (e.g. peer sends last few bytes and immediately hangs up). + * Notify user of READABLE|WRITABLE events first, so they try to read any remaining bytes. */ + + if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) { + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd); + if (socket->readable_fn) { + socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data); + } + } + /* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not + * have been cleaned up, so this next branch is safe. */ + if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) { + AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd); + s_process_socket_write_requests(socket, NULL); + } + if (events & AWS_IO_EVENT_TYPE_REMOTE_HANG_UP || events & AWS_IO_EVENT_TYPE_CLOSED) { aws_raise_error(AWS_IO_SOCKET_CLOSED); AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: closed remotely", (void *)socket, socket->io_handle.data.fd); @@ -1688,19 +1705,6 @@ static void s_on_socket_io_event( goto end_check; } - if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) { - AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd); - if (socket->readable_fn) { - socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data); - } - } - /* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not - * have been cleaned up, so this next branch is safe. */ - if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) { - AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd); - s_process_socket_write_requests(socket, NULL); - } - end_check: aws_ref_count_release(&socket_impl->internal_refcount); } diff --git a/source/socket_channel_handler.c b/source/socket_channel_handler.c index 666acbdd3..e291176f1 100644 --- a/source/socket_channel_handler.c +++ b/source/socket_channel_handler.c @@ -122,6 +122,10 @@ static void s_on_readable_notification(struct aws_socket *socket, int error_code */ static void s_do_read(struct socket_handler *socket_handler) { + if (socket_handler->shutdown_in_progress) { + return; + } + size_t downstream_window = aws_channel_slot_downstream_read_window(socket_handler->slot); size_t max_to_read = downstream_window > socket_handler->max_rw_size ? socket_handler->max_rw_size : downstream_window; @@ -139,17 +143,20 @@ static void s_do_read(struct socket_handler *socket_handler) { size_t total_read = 0; size_t read = 0; - while (total_read < max_to_read && !socket_handler->shutdown_in_progress) { + int last_error = 0; + while (total_read < max_to_read) { size_t iter_max_read = max_to_read - total_read; struct aws_io_message *message = aws_channel_acquire_message_from_pool( socket_handler->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, iter_max_read); if (!message) { + last_error = aws_last_error(); break; } if (aws_socket_read(socket_handler->socket, &message->message_data, &read)) { + last_error = aws_last_error(); aws_mem_release(message->allocator, message); break; } @@ -162,6 +169,7 @@ static void s_do_read(struct socket_handler *socket_handler) { (unsigned long long)read); if (aws_channel_slot_send_message(socket_handler->slot, message, AWS_CHANNEL_DIR_READ)) { + last_error = aws_last_error(); aws_mem_release(message->allocator, message); break; } @@ -177,23 +185,22 @@ static void s_do_read(struct socket_handler *socket_handler) { /* resubscribe as long as there's no error, just return if we're in a would block scenario. */ if (total_read < max_to_read) { - int last_error = aws_last_error(); + AWS_ASSERT(last_error != 0); - if (last_error != AWS_IO_READ_WOULD_BLOCK && !socket_handler->shutdown_in_progress) { + if (last_error != AWS_IO_READ_WOULD_BLOCK) { aws_channel_shutdown(socket_handler->slot->channel, last_error); + } else { + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET_HANDLER, + "id=%p: out of data to read on socket. " + "Waiting on event-loop notification.", + (void *)socket_handler->slot->handler); } - - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET_HANDLER, - "id=%p: out of data to read on socket. " - "Waiting on event-loop notification.", - (void *)socket_handler->slot->handler); return; } /* in this case, everything was fine, but there's still pending reads. We need to schedule a task to do the read * again. */ - if (!socket_handler->shutdown_in_progress && total_read == socket_handler->max_rw_size && - !socket_handler->read_task_storage.task_fn) { + if (total_read == socket_handler->max_rw_size && !socket_handler->read_task_storage.task_fn) { AWS_LOGF_TRACE( AWS_LS_IO_SOCKET_HANDLER, @@ -210,6 +217,8 @@ static void s_do_read(struct socket_handler *socket_handler) { * If an error, start the channel shutdown process. */ static void s_on_readable_notification(struct aws_socket *socket, int error_code, void *user_data) { (void)socket; + /* TODO: explain */ + (void)error_code; struct socket_handler *socket_handler = user_data; AWS_LOGF_TRACE(AWS_LS_IO_SOCKET_HANDLER, "id=%p: socket is now readable", (void *)socket_handler->slot->handler); @@ -219,10 +228,6 @@ static void s_on_readable_notification(struct aws_socket *socket, int error_code * sure we read the ALERT, otherwise, we'll end up telling the user that the channel shutdown because of a socket * closure, when in reality it was a TLS error */ s_do_read(socket_handler); - - if (error_code && !socket_handler->shutdown_in_progress) { - aws_channel_shutdown(socket_handler->slot->channel, error_code); - } } /* Either the result of a context switch (for fairness in the event loop), or a window update. */ From aa524b4457d45a09418327653a992dfd8c35c3af Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Wed, 29 May 2024 09:01:46 -0700 Subject: [PATCH 03/18] ERROR_BROKEN_PIPE -> AWS_IO_SOCKET_CLOSED --- source/windows/iocp/socket.c | 1 + tests/tls_handler_test.c | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/windows/iocp/socket.c b/source/windows/iocp/socket.c index 6d879417b..da2215377 100644 --- a/source/windows/iocp/socket.c +++ b/source/windows/iocp/socket.c @@ -636,6 +636,7 @@ static int s_determine_socket_error(int error) { case IO_STATUS_TIMEOUT: return AWS_IO_SOCKET_TIMEOUT; case IO_PIPE_BROKEN: + case ERROR_BROKEN_PIPE: return AWS_IO_SOCKET_CLOSED; case STATUS_INVALID_ADDRESS_COMPONENT: case WSAEADDRNOTAVAIL: diff --git a/tests/tls_handler_test.c b/tests/tls_handler_test.c index 7b501b348..bee0f3999 100644 --- a/tests/tls_handler_test.c +++ b/tests/tls_handler_test.c @@ -518,8 +518,8 @@ static int s_tls_channel_echo_and_backpressure_test_fn(struct aws_allocator *all allocator, s_tls_test_handle_read, s_tls_test_handle_write, write_tag.len / 2, &outgoing_rw_args); ASSERT_NOT_NULL(outgoing_rw_handler); - struct aws_channel_handler *incoming_rw_handler = rw_handler_new( - allocator, s_tls_test_handle_read, s_tls_test_handle_write, read_tag.len / 2, &incoming_rw_args); + struct aws_channel_handler *incoming_rw_handler = + rw_handler_new(allocator, s_tls_test_handle_read, s_tls_test_handle_write, read_tag.len / 2, &incoming_rw_args); ASSERT_NOT_NULL(incoming_rw_handler); incoming_args.rw_handler = incoming_rw_handler; From 7293f4fae8cc3ffd860bbab20a11febcf1edf47d Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Wed, 29 May 2024 09:41:42 -0700 Subject: [PATCH 04/18] put back functionality where rw_handler_write() could happen synchronously --- tests/read_write_test_handler.c | 53 ++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/tests/read_write_test_handler.c b/tests/read_write_test_handler.c index 5b607ec98..b233b1d79 100644 --- a/tests/read_write_test_handler.c +++ b/tests/read_write_test_handler.c @@ -191,23 +191,30 @@ struct rw_handler_write_task_args { void *user_data; }; -static void s_rw_handler_write_task(struct aws_channel_task *task, void *arg, enum aws_task_status task_status) { - (void)task; - (void)task_status; - struct rw_handler_write_task_args *write_task_args = arg; +static void s_rw_handler_write_impl( + struct aws_channel_slot *slot, + struct aws_byte_buf *buffer, + aws_channel_on_message_write_completed_fn *on_completion, + void *user_data) { - struct aws_io_message *msg = aws_channel_acquire_message_from_pool( - write_task_args->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, write_task_args->buffer->len); + struct aws_io_message *msg = + aws_channel_acquire_message_from_pool(slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, buffer->len); - msg->on_completion = write_task_args->on_completion; - msg->user_data = write_task_args->user_data; + msg->on_completion = on_completion; + msg->user_data = user_data; - struct aws_byte_cursor write_buffer = aws_byte_cursor_from_buf(write_task_args->buffer); + struct aws_byte_cursor write_buffer = aws_byte_cursor_from_buf(buffer); AWS_FATAL_ASSERT(aws_byte_buf_append(&msg->message_data, &write_buffer) == AWS_OP_SUCCESS); - AWS_FATAL_ASSERT( - aws_channel_slot_send_message(write_task_args->slot, msg, AWS_CHANNEL_DIR_WRITE) == AWS_OP_SUCCESS); + AWS_FATAL_ASSERT(aws_channel_slot_send_message(slot, msg, AWS_CHANNEL_DIR_WRITE) == AWS_OP_SUCCESS); +} +static void s_rw_handler_write_task(struct aws_channel_task *task, void *arg, enum aws_task_status task_status) { + (void)task; + (void)task_status; + struct rw_handler_write_task_args *write_task_args = arg; + s_rw_handler_write_impl( + write_task_args->slot, write_task_args->buffer, write_task_args->on_completion, write_task_args->user_data); aws_mem_release(write_task_args->handler->alloc, write_task_args); } @@ -222,16 +229,20 @@ void rw_handler_write_with_callback( aws_channel_on_message_write_completed_fn *on_completion, void *user_data) { - struct rw_handler_write_task_args *write_task_args = - aws_mem_acquire(handler->alloc, sizeof(struct rw_handler_write_task_args)); - write_task_args->handler = handler; - write_task_args->buffer = buffer; - write_task_args->slot = slot; - write_task_args->on_completion = on_completion; - write_task_args->user_data = user_data; - aws_channel_task_init(&write_task_args->task, s_rw_handler_write_task, write_task_args, "rw_handler_write"); - - aws_channel_schedule_task_now(slot->channel, &write_task_args->task); + if (aws_channel_thread_is_callers_thread(slot->channel)) { + s_rw_handler_write_impl(slot, buffer, on_completion, user_data); + } else { + struct rw_handler_write_task_args *write_task_args = + aws_mem_acquire(handler->alloc, sizeof(struct rw_handler_write_task_args)); + write_task_args->handler = handler; + write_task_args->buffer = buffer; + write_task_args->slot = slot; + write_task_args->on_completion = on_completion; + write_task_args->user_data = user_data; + aws_channel_task_init(&write_task_args->task, s_rw_handler_write_task, write_task_args, "rw_handler_write"); + + aws_channel_schedule_task_now(slot->channel, &write_task_args->task); + } } struct increment_read_window_task_args { From 15ae9e6464fbada676e2d66b65cf6f0f9c04e6d2 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Wed, 29 May 2024 10:21:03 -0700 Subject: [PATCH 05/18] revert changes I didn't need to make --- tests/byo_crypto_test.c | 2 ++ tests/channel_test.c | 2 +- tests/read_write_test_handler.c | 15 ++++++++++----- tests/read_write_test_handler.h | 1 + tests/socket_handler_test.c | 20 +++++++++++--------- tests/tls_handler_test.c | 10 +++++----- 6 files changed, 30 insertions(+), 20 deletions(-) diff --git a/tests/byo_crypto_test.c b/tests/byo_crypto_test.c index 3f219e0a1..878889646 100644 --- a/tests/byo_crypto_test.c +++ b/tests/byo_crypto_test.c @@ -362,6 +362,7 @@ static int s_byo_tls_handler_test(struct aws_allocator *allocator, void *ctx) { allocator, s_byo_crypto_test_handle_read, s_byo_crypto_test_handle_write, + true, s_outgoing_initial_read_window, &outgoing_rw_args); ASSERT_NOT_NULL(outgoing_rw_handler); @@ -370,6 +371,7 @@ static int s_byo_tls_handler_test(struct aws_allocator *allocator, void *ctx) { allocator, s_byo_crypto_test_handle_read, s_byo_crypto_test_handle_write, + true, s_incoming_initial_read_window, &incoming_rw_args); ASSERT_NOT_NULL(outgoing_rw_handler); diff --git a/tests/channel_test.c b/tests/channel_test.c index 7812f8eed..9a730a351 100644 --- a/tests/channel_test.c +++ b/tests/channel_test.c @@ -267,7 +267,7 @@ static int s_test_channel_refcount(struct aws_allocator *allocator, void *ctx) { struct aws_channel_slot *slot = aws_channel_slot_new(channel); ASSERT_NOT_NULL(slot); - struct aws_channel_handler *handler = rw_handler_new(allocator, NULL, NULL, 10000, NULL); + struct aws_channel_handler *handler = rw_handler_new(allocator, NULL, NULL, false, 10000, NULL); struct aws_atomic_var destroy_called = AWS_ATOMIC_INIT_INT(0); struct aws_mutex destroy_mutex = AWS_MUTEX_INIT; diff --git a/tests/read_write_test_handler.c b/tests/read_write_test_handler.c index b233b1d79..f3ec37b9a 100644 --- a/tests/read_write_test_handler.c +++ b/tests/read_write_test_handler.c @@ -23,6 +23,7 @@ struct rw_test_handler_impl { struct aws_condition_variable *destroy_condition_variable; rw_handler_driver_fn *on_read; rw_handler_driver_fn *on_write; + bool event_loop_driven; size_t window; struct aws_condition_variable condition_variable; struct aws_mutex mutex; @@ -137,6 +138,7 @@ struct aws_channel_handler *rw_handler_new( struct aws_allocator *allocator, rw_handler_driver_fn *on_read, rw_handler_driver_fn *on_write, + bool event_loop_driven, size_t window, void *ctx) { @@ -149,6 +151,7 @@ struct aws_channel_handler *rw_handler_new( handler_impl->on_read = on_read; handler_impl->on_write = on_write; handler_impl->ctx = ctx; + handler_impl->event_loop_driven = event_loop_driven; handler_impl->window = window; handler_impl->condition_variable = (struct aws_condition_variable)AWS_CONDITION_VARIABLE_INIT; handler_impl->mutex = (struct aws_mutex)AWS_MUTEX_INIT; @@ -191,7 +194,7 @@ struct rw_handler_write_task_args { void *user_data; }; -static void s_rw_handler_write_impl( +static void s_rw_handler_write_now( struct aws_channel_slot *slot, struct aws_byte_buf *buffer, aws_channel_on_message_write_completed_fn *on_completion, @@ -213,7 +216,7 @@ static void s_rw_handler_write_task(struct aws_channel_task *task, void *arg, en (void)task; (void)task_status; struct rw_handler_write_task_args *write_task_args = arg; - s_rw_handler_write_impl( + s_rw_handler_write_now( write_task_args->slot, write_task_args->buffer, write_task_args->on_completion, write_task_args->user_data); aws_mem_release(write_task_args->handler->alloc, write_task_args); } @@ -228,9 +231,11 @@ void rw_handler_write_with_callback( struct aws_byte_buf *buffer, aws_channel_on_message_write_completed_fn *on_completion, void *user_data) { + + struct rw_test_handler_impl *handler_impl = handler->impl; - if (aws_channel_thread_is_callers_thread(slot->channel)) { - s_rw_handler_write_impl(slot, buffer, on_completion, user_data); + if (!handler_impl->event_loop_driven || aws_channel_thread_is_callers_thread(slot->channel)) { + s_rw_handler_write_now(slot, buffer, on_completion, user_data); } else { struct rw_handler_write_task_args *write_task_args = aws_mem_acquire(handler->alloc, sizeof(struct rw_handler_write_task_args)); @@ -272,7 +277,7 @@ void rw_handler_trigger_increment_read_window( struct rw_test_handler_impl *handler_impl = handler->impl; - if (aws_channel_thread_is_callers_thread(slot->channel)) { + if (!handler_impl->event_loop_driven || aws_channel_thread_is_callers_thread(slot->channel)) { handler_impl->window += window_update; aws_channel_slot_increment_read_window(slot, window_update); } else { diff --git a/tests/read_write_test_handler.h b/tests/read_write_test_handler.h index 484a32fc4..7234f7299 100644 --- a/tests/read_write_test_handler.h +++ b/tests/read_write_test_handler.h @@ -25,6 +25,7 @@ struct aws_channel_handler *rw_handler_new( struct aws_allocator *allocator, rw_handler_driver_fn *on_read, rw_handler_driver_fn *on_write, + bool event_loop_driven, size_t window, void *ctx); diff --git a/tests/socket_handler_test.c b/tests/socket_handler_test.c index fe3888785..568e7e2ba 100644 --- a/tests/socket_handler_test.c +++ b/tests/socket_handler_test.c @@ -228,7 +228,7 @@ static struct aws_byte_buf s_socket_test_handle_read( struct socket_test_rw_args *rw_args = (struct socket_test_rw_args *)user_data; aws_mutex_lock(rw_args->mutex); - AWS_FATAL_ASSERT(aws_byte_buf_write_from_whole_buffer(&rw_args->received_message, *data_read)); + AWS_FATAL_ASSERT(aws_byte_buf_write_from_whole_buffer(&rw_args->received_message, *data_read) == true); rw_args->amount_read += data_read->len; rw_args->invocation_happened = true; aws_condition_variable_notify_one(rw_args->condition_variable); @@ -350,11 +350,11 @@ static int s_socket_pinned_event_loop_test(struct aws_allocator *allocator, void s_socket_common_tester_init(allocator, &c_tester); struct aws_channel_handler *client_rw_handler = - rw_handler_new(allocator, s_socket_test_handle_write, s_socket_test_handle_write, SIZE_MAX, NULL); + rw_handler_new(allocator, s_socket_test_handle_write, s_socket_test_handle_write, true, SIZE_MAX, NULL); ASSERT_NOT_NULL(client_rw_handler); struct aws_channel_handler *server_rw_handler = - rw_handler_new(allocator, s_socket_test_handle_write, s_socket_test_handle_write, SIZE_MAX, NULL); + rw_handler_new(allocator, s_socket_test_handle_write, s_socket_test_handle_write, true, SIZE_MAX, NULL); ASSERT_NOT_NULL(server_rw_handler); struct socket_test_args server_args; @@ -555,6 +555,7 @@ static int s_socket_echo_and_backpressure_test(struct aws_allocator *allocator, allocator, s_socket_test_handle_read, s_socket_test_handle_write, + true, s_client_initial_read_window, &client_rw_args); ASSERT_NOT_NULL(client_rw_handler); @@ -563,6 +564,7 @@ static int s_socket_echo_and_backpressure_test(struct aws_allocator *allocator, allocator, s_socket_test_handle_read, s_socket_test_handle_write, + true, s_server_initial_read_window, &server_rw_args); ASSERT_NOT_NULL(server_rw_handler); @@ -691,11 +693,11 @@ static int s_socket_close_test(struct aws_allocator *allocator, void *ctx) { 0)); struct aws_channel_handler *client_rw_handler = - rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, 10000, &client_rw_args); + rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 10000, &client_rw_args); ASSERT_NOT_NULL(client_rw_handler); struct aws_channel_handler *server_rw_handler = - rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, 10000, &server_rw_args); + rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 10000, &server_rw_args); ASSERT_NOT_NULL(server_rw_handler); struct socket_test_args server_args; @@ -780,11 +782,11 @@ static int s_socket_handler_read_to_eof_after_peer_hangup_test(struct aws_alloca /* NOTE client start with window=0 */ struct aws_channel_handler *client_rw_handler = - rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, 0 /*window*/, &client_rw_args); + rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 0 /*window*/, &client_rw_args); ASSERT_NOT_NULL(client_rw_handler); struct aws_channel_handler *server_rw_handler = - rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, 10000, &server_rw_args); + rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 10000, &server_rw_args); ASSERT_NOT_NULL(server_rw_handler); struct socket_test_args server_args; @@ -992,11 +994,11 @@ static int s_open_channel_statistics_test(struct aws_allocator *allocator, void 0)); struct aws_channel_handler *client_rw_handler = - rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, 10000, &client_rw_args); + rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 10000, &client_rw_args); ASSERT_NOT_NULL(client_rw_handler); struct aws_channel_handler *server_rw_handler = - rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, 10000, &server_rw_args); + rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 10000, &server_rw_args); ASSERT_NOT_NULL(server_rw_handler); struct socket_test_args server_args; diff --git a/tests/tls_handler_test.c b/tests/tls_handler_test.c index bee0f3999..1a7f94ddf 100644 --- a/tests/tls_handler_test.c +++ b/tests/tls_handler_test.c @@ -515,11 +515,11 @@ static int s_tls_channel_echo_and_backpressure_test_fn(struct aws_allocator *all allocator, &local_server_tester, &incoming_args, &c_tester, true, "server.crt", "server.key")); /* make the windows small to make sure back pressure is honored. */ struct aws_channel_handler *outgoing_rw_handler = rw_handler_new( - allocator, s_tls_test_handle_read, s_tls_test_handle_write, write_tag.len / 2, &outgoing_rw_args); + allocator, s_tls_test_handle_read, s_tls_test_handle_write, true, write_tag.len / 2, &outgoing_rw_args); ASSERT_NOT_NULL(outgoing_rw_handler); - struct aws_channel_handler *incoming_rw_handler = - rw_handler_new(allocator, s_tls_test_handle_read, s_tls_test_handle_write, read_tag.len / 2, &incoming_rw_args); + struct aws_channel_handler *incoming_rw_handler = rw_handler_new( + allocator, s_tls_test_handle_read, s_tls_test_handle_write, true, read_tag.len / 2, &incoming_rw_args); ASSERT_NOT_NULL(incoming_rw_handler); incoming_args.rw_handler = incoming_rw_handler; @@ -1717,11 +1717,11 @@ static int s_tls_channel_statistics_test(struct aws_allocator *allocator, void * allocator, &local_server_tester, &incoming_args, &c_tester, false, "server.crt", "server.key")); struct aws_channel_handler *outgoing_rw_handler = - rw_handler_new(allocator, s_tls_test_handle_read, s_tls_test_handle_write, 10000, &outgoing_rw_args); + rw_handler_new(allocator, s_tls_test_handle_read, s_tls_test_handle_write, true, 10000, &outgoing_rw_args); ASSERT_NOT_NULL(outgoing_rw_handler); struct aws_channel_handler *incoming_rw_handler = - rw_handler_new(allocator, s_tls_test_handle_read, s_tls_test_handle_write, 10000, &incoming_rw_args); + rw_handler_new(allocator, s_tls_test_handle_read, s_tls_test_handle_write, true, 10000, &incoming_rw_args); ASSERT_NOT_NULL(incoming_rw_handler); incoming_args.rw_handler = incoming_rw_handler; From 577a5d71fa84108ecdf49dc885b2dd29b91009a5 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Wed, 29 May 2024 10:22:04 -0700 Subject: [PATCH 06/18] clang-format --- tests/read_write_test_handler.c | 2 +- tests/socket_handler_test.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/read_write_test_handler.c b/tests/read_write_test_handler.c index f3ec37b9a..959158c96 100644 --- a/tests/read_write_test_handler.c +++ b/tests/read_write_test_handler.c @@ -231,7 +231,7 @@ void rw_handler_write_with_callback( struct aws_byte_buf *buffer, aws_channel_on_message_write_completed_fn *on_completion, void *user_data) { - + struct rw_test_handler_impl *handler_impl = handler->impl; if (!handler_impl->event_loop_driven || aws_channel_thread_is_callers_thread(slot->channel)) { diff --git a/tests/socket_handler_test.c b/tests/socket_handler_test.c index 568e7e2ba..b940998f3 100644 --- a/tests/socket_handler_test.c +++ b/tests/socket_handler_test.c @@ -781,8 +781,8 @@ static int s_socket_handler_read_to_eof_after_peer_hangup_test(struct aws_alloca 0)); /* NOTE client start with window=0 */ - struct aws_channel_handler *client_rw_handler = - rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 0 /*window*/, &client_rw_args); + struct aws_channel_handler *client_rw_handler = rw_handler_new( + allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 0 /*window*/, &client_rw_args); ASSERT_NOT_NULL(client_rw_handler); struct aws_channel_handler *server_rw_handler = From c2539737bc02d85ca2e6e7f1238c6f7de6fc17c6 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Wed, 29 May 2024 11:41:52 -0700 Subject: [PATCH 07/18] test IPv4 & IPv6 too --- tests/CMakeLists.txt | 2 + tests/socket_handler_test.c | 96 ++++++++++++++++++++++++++----------- 2 files changed, 70 insertions(+), 28 deletions(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e1187680b..79a2695f2 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -124,6 +124,8 @@ add_test_case(pem_sanitize_wrong_format_rejected) add_test_case(socket_handler_echo_and_backpressure) add_test_case(socket_handler_close) add_test_case(socket_handler_read_to_eof_after_peer_hangup) +add_net_test_case(socket_handler_ipv4_read_to_eof_after_peer_hangup) +add_net_test_case(socket_handler_ipv6_read_to_eof_after_peer_hangup) add_test_case(socket_pinned_event_loop) add_net_test_case(socket_pinned_event_loop_dns_failure) diff --git a/tests/socket_handler_test.c b/tests/socket_handler_test.c index b940998f3..aaa6bd3d0 100644 --- a/tests/socket_handler_test.c +++ b/tests/socket_handler_test.c @@ -40,6 +40,7 @@ struct socket_common_tester { struct aws_mutex mutex; struct aws_condition_variable condition_variable; struct aws_event_loop_group *el_group; + struct aws_host_resolver *resolver; struct aws_atomic_var current_time_ns; struct aws_atomic_var stats_handler; @@ -55,6 +56,13 @@ static int s_socket_common_tester_init(struct aws_allocator *allocator, struct s aws_io_library_init(allocator); tester->el_group = aws_event_loop_group_new_default(allocator, 0, NULL); + + struct aws_host_resolver_default_options resolver_options = { + .el_group = tester->el_group, + .max_entries = 8, + }; + tester->resolver = aws_host_resolver_new_default(allocator, &resolver_options); + struct aws_mutex mutex = AWS_MUTEX_INIT; struct aws_condition_variable condition_variable = AWS_CONDITION_VARIABLE_INIT; tester->mutex = mutex; @@ -66,6 +74,7 @@ static int s_socket_common_tester_init(struct aws_allocator *allocator, struct s } static int s_socket_common_tester_clean_up(struct socket_common_tester *tester) { + aws_host_resolver_release(tester->resolver); aws_event_loop_group_release(tester->el_group); aws_mutex_clean_up(&tester->mutex); @@ -311,13 +320,27 @@ static int s_local_server_tester_init( struct local_server_tester *tester, struct socket_test_args *args, struct socket_common_tester *s_c_tester, + enum aws_socket_domain socket_domain, bool enable_back_pressure) { + AWS_ZERO_STRUCT(*tester); tester->socket_options.connect_timeout_ms = 3000; tester->socket_options.type = AWS_SOCKET_STREAM; - tester->socket_options.domain = AWS_SOCKET_LOCAL; - - aws_socket_endpoint_init_local_address_for_test(&tester->endpoint); + tester->socket_options.domain = socket_domain; + switch (socket_domain) { + case AWS_SOCKET_LOCAL: + aws_socket_endpoint_init_local_address_for_test(&tester->endpoint); + break; + case AWS_SOCKET_IPV4: + strcpy(tester->endpoint.address, "127.0.0.1"); + break; + case AWS_SOCKET_IPV6: + strcpy(tester->endpoint.address, "::1"); + break; + default: + ASSERT_TRUE(false); + break; + } tester->server_bootstrap = aws_server_bootstrap_new(allocator, s_c_tester->el_group); ASSERT_NOT_NULL(tester->server_bootstrap); @@ -336,6 +359,9 @@ static int s_local_server_tester_init( tester->listener = aws_server_bootstrap_new_socket_listener(&bootstrap_options); ASSERT_NOT_NULL(tester->listener); + /* find out which port the socket is bound to */ + ASSERT_SUCCESS(aws_socket_get_bound_address(tester->listener, &tester->endpoint)); + return AWS_OP_SUCCESS; } @@ -364,11 +390,12 @@ static int s_socket_pinned_event_loop_test(struct aws_allocator *allocator, void ASSERT_SUCCESS(s_socket_test_args_init(&client_args, &c_tester, client_rw_handler)); struct local_server_tester local_server_tester; - ASSERT_SUCCESS(s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, true)); + ASSERT_SUCCESS( + s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, AWS_SOCKET_LOCAL, true)); struct aws_client_bootstrap_options client_bootstrap_options = { .event_loop_group = c_tester.el_group, - .host_resolver = NULL, + .host_resolver = c_tester.resolver, }; struct aws_client_bootstrap *client_bootstrap = aws_client_bootstrap_new(allocator, &client_bootstrap_options); ASSERT_NOT_NULL(client_bootstrap); @@ -379,7 +406,7 @@ static int s_socket_pinned_event_loop_test(struct aws_allocator *allocator, void AWS_ZERO_STRUCT(client_channel_options); client_channel_options.bootstrap = client_bootstrap; client_channel_options.host_name = local_server_tester.endpoint.address; - client_channel_options.port = 0; + client_channel_options.port = local_server_tester.endpoint.port; client_channel_options.socket_options = &local_server_tester.socket_options; client_channel_options.setup_callback = s_socket_handler_test_client_setup_callback; client_channel_options.shutdown_callback = s_socket_handler_test_client_shutdown_callback; @@ -470,15 +497,8 @@ static int s_socket_pinned_event_loop_dns_failure_test(struct aws_allocator *all s_socket_common_tester_init(allocator, &c_tester); - struct aws_host_resolver_default_options resolver_options = { - .el_group = c_tester.el_group, - .max_entries = 8, - }; - struct aws_host_resolver *resolver = aws_host_resolver_new_default(allocator, &resolver_options); - struct aws_client_bootstrap_options client_bootstrap_options = { .event_loop_group = c_tester.el_group, - .host_resolver = resolver, }; struct aws_client_bootstrap *client_bootstrap = aws_client_bootstrap_new(allocator, &client_bootstrap_options); ASSERT_NOT_NULL(client_bootstrap); @@ -516,7 +536,6 @@ static int s_socket_pinned_event_loop_dns_failure_test(struct aws_allocator *all aws_mutex_unlock(&c_tester.mutex); aws_client_bootstrap_release(client_bootstrap); - aws_host_resolver_release(resolver); ASSERT_SUCCESS(s_socket_common_tester_clean_up(&c_tester)); return AWS_OP_SUCCESS; @@ -576,11 +595,12 @@ static int s_socket_echo_and_backpressure_test(struct aws_allocator *allocator, ASSERT_SUCCESS(s_socket_test_args_init(&client_args, &c_tester, client_rw_handler)); struct local_server_tester local_server_tester; - ASSERT_SUCCESS(s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, true)); + ASSERT_SUCCESS( + s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, AWS_SOCKET_LOCAL, true)); struct aws_client_bootstrap_options client_bootstrap_options = { .event_loop_group = c_tester.el_group, - .host_resolver = NULL, + .host_resolver = c_tester.resolver, }; struct aws_client_bootstrap *client_bootstrap = aws_client_bootstrap_new(allocator, &client_bootstrap_options); ASSERT_NOT_NULL(client_bootstrap); @@ -589,7 +609,7 @@ static int s_socket_echo_and_backpressure_test(struct aws_allocator *allocator, AWS_ZERO_STRUCT(client_channel_options); client_channel_options.bootstrap = client_bootstrap; client_channel_options.host_name = local_server_tester.endpoint.address; - client_channel_options.port = 0; + client_channel_options.port = local_server_tester.endpoint.port; client_channel_options.socket_options = &local_server_tester.socket_options; client_channel_options.setup_callback = s_socket_handler_test_client_setup_callback; client_channel_options.shutdown_callback = s_socket_handler_test_client_shutdown_callback; @@ -707,11 +727,12 @@ static int s_socket_close_test(struct aws_allocator *allocator, void *ctx) { ASSERT_SUCCESS(s_socket_test_args_init(&client_args, &c_tester, client_rw_handler)); struct local_server_tester local_server_tester; - ASSERT_SUCCESS(s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, false)); + ASSERT_SUCCESS( + s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, AWS_SOCKET_LOCAL, false)); struct aws_client_bootstrap_options client_bootstrap_options = { .event_loop_group = c_tester.el_group, - .host_resolver = NULL, + .host_resolver = c_tester.resolver, }; struct aws_client_bootstrap *client_bootstrap = aws_client_bootstrap_new(allocator, &client_bootstrap_options); ASSERT_NOT_NULL(client_bootstrap); @@ -720,7 +741,7 @@ static int s_socket_close_test(struct aws_allocator *allocator, void *ctx) { AWS_ZERO_STRUCT(client_channel_options); client_channel_options.bootstrap = client_bootstrap; client_channel_options.host_name = local_server_tester.endpoint.address; - client_channel_options.port = 0; + client_channel_options.port = local_server_tester.endpoint.port; client_channel_options.socket_options = &local_server_tester.socket_options; client_channel_options.setup_callback = s_socket_handler_test_client_setup_callback; client_channel_options.shutdown_callback = s_socket_handler_test_client_shutdown_callback; @@ -763,7 +784,11 @@ static int s_socket_close_test(struct aws_allocator *allocator, void *ctx) { AWS_TEST_CASE(socket_handler_close, s_socket_close_test) -static int s_socket_handler_read_to_eof_after_peer_hangup_test(struct aws_allocator *allocator, void *ctx) { +static int s_socket_read_to_eof_after_peer_hangup_test_common( + struct aws_allocator *allocator, + void *ctx, + enum aws_socket_domain socket_domain) { + (void)ctx; s_socket_common_tester_init(allocator, &c_tester); @@ -796,11 +821,12 @@ static int s_socket_handler_read_to_eof_after_peer_hangup_test(struct aws_alloca ASSERT_SUCCESS(s_socket_test_args_init(&client_args, &c_tester, client_rw_handler)); struct local_server_tester local_server_tester; - ASSERT_SUCCESS(s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, false)); + ASSERT_SUCCESS( + s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, socket_domain, false)); struct aws_client_bootstrap_options client_bootstrap_options = { .event_loop_group = c_tester.el_group, - .host_resolver = NULL, + .host_resolver = c_tester.resolver, }; struct aws_client_bootstrap *client_bootstrap = aws_client_bootstrap_new(allocator, &client_bootstrap_options); ASSERT_NOT_NULL(client_bootstrap); @@ -808,7 +834,7 @@ static int s_socket_handler_read_to_eof_after_peer_hangup_test(struct aws_alloca struct aws_socket_channel_bootstrap_options client_channel_options = { .bootstrap = client_bootstrap, .host_name = local_server_tester.endpoint.address, - .port = 0, + .port = local_server_tester.endpoint.port, .socket_options = &local_server_tester.socket_options, .setup_callback = s_socket_handler_test_client_setup_callback, .shutdown_callback = s_socket_handler_test_client_shutdown_callback, @@ -906,7 +932,20 @@ static int s_socket_handler_read_to_eof_after_peer_hangup_test(struct aws_alloca return AWS_OP_SUCCESS; } -AWS_TEST_CASE(socket_handler_read_to_eof_after_peer_hangup, s_socket_handler_read_to_eof_after_peer_hangup_test) +static int s_socket_read_to_eof_after_peer_hangup_test(struct aws_allocator *allocator, void *ctx) { + return s_socket_read_to_eof_after_peer_hangup_test_common(allocator, ctx, AWS_SOCKET_LOCAL); +} +AWS_TEST_CASE(socket_handler_read_to_eof_after_peer_hangup, s_socket_read_to_eof_after_peer_hangup_test) + +static int s_socket_ipv4_read_to_eof_after_peer_hangup_test(struct aws_allocator *allocator, void *ctx) { + return s_socket_read_to_eof_after_peer_hangup_test_common(allocator, ctx, AWS_SOCKET_IPV4); +} +AWS_TEST_CASE(socket_handler_ipv4_read_to_eof_after_peer_hangup, s_socket_ipv4_read_to_eof_after_peer_hangup_test) + +static int s_socket_ipv6_read_to_eof_after_peer_hangup_test(struct aws_allocator *allocator, void *ctx) { + return s_socket_read_to_eof_after_peer_hangup_test_common(allocator, ctx, AWS_SOCKET_IPV6); +} +AWS_TEST_CASE(socket_handler_ipv6_read_to_eof_after_peer_hangup, s_socket_ipv6_read_to_eof_after_peer_hangup_test) static void s_creation_callback_test_channel_creation_callback( struct aws_client_bootstrap *bootstrap, @@ -1008,12 +1047,13 @@ static int s_open_channel_statistics_test(struct aws_allocator *allocator, void ASSERT_SUCCESS(s_socket_test_args_init(&client_args, &c_tester, client_rw_handler)); struct local_server_tester local_server_tester; - ASSERT_SUCCESS(s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, false)); + ASSERT_SUCCESS( + s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, AWS_SOCKET_LOCAL, false)); struct aws_client_bootstrap_options client_bootstrap_options; AWS_ZERO_STRUCT(client_bootstrap_options); client_bootstrap_options.event_loop_group = c_tester.el_group; - client_bootstrap_options.host_resolver = NULL; + client_bootstrap_options.host_resolver = c_tester.resolver; struct aws_client_bootstrap *client_bootstrap = aws_client_bootstrap_new(allocator, &client_bootstrap_options); ASSERT_NOT_NULL(client_bootstrap); @@ -1022,7 +1062,7 @@ static int s_open_channel_statistics_test(struct aws_allocator *allocator, void AWS_ZERO_STRUCT(client_channel_options); client_channel_options.bootstrap = client_bootstrap; client_channel_options.host_name = local_server_tester.endpoint.address; - client_channel_options.port = 0; + client_channel_options.port = local_server_tester.endpoint.port; client_channel_options.socket_options = &local_server_tester.socket_options; client_channel_options.creation_callback = s_creation_callback_test_channel_creation_callback; client_channel_options.setup_callback = s_socket_handler_test_client_setup_callback; From 79d9fdf5b69bb130b22c765e2ae38d8c74aff7e1 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Wed, 29 May 2024 13:23:24 -0700 Subject: [PATCH 08/18] skip test if IPv6 isn't supported --- tests/socket_handler_test.c | 43 ++++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/tests/socket_handler_test.c b/tests/socket_handler_test.c index aaa6bd3d0..f903a2fa4 100644 --- a/tests/socket_handler_test.c +++ b/tests/socket_handler_test.c @@ -332,10 +332,10 @@ static int s_local_server_tester_init( aws_socket_endpoint_init_local_address_for_test(&tester->endpoint); break; case AWS_SOCKET_IPV4: - strcpy(tester->endpoint.address, "127.0.0.1"); + strncpy(tester->endpoint.address, "127.0.0.1", sizeof(tester->endpoint.address)); break; case AWS_SOCKET_IPV6: - strcpy(tester->endpoint.address, "::1"); + strncpy(tester->endpoint.address, "::1", sizeof(tester->endpoint.address)); break; default: ASSERT_TRUE(false); @@ -793,17 +793,18 @@ static int s_socket_read_to_eof_after_peer_hangup_test_common( s_socket_common_tester_init(allocator, &c_tester); const size_t total_bytes_to_send_from_server = g_aws_channel_max_fragment_size; - uint8_t *client_received_message = aws_mem_acquire(allocator, total_bytes_to_send_from_server); + + struct aws_byte_buf client_received_message; + ASSERT_SUCCESS(aws_byte_buf_init(&client_received_message, allocator, total_bytes_to_send_from_server)); + + struct aws_byte_buf msg_from_server; + ASSERT_SUCCESS(aws_byte_buf_init(&msg_from_server, allocator, total_bytes_to_send_from_server)); struct socket_test_rw_args server_rw_args; ASSERT_SUCCESS(s_rw_args_init(&server_rw_args, &c_tester, aws_byte_buf_from_empty_array(NULL, 0), 0)); struct socket_test_rw_args client_rw_args; - ASSERT_SUCCESS(s_rw_args_init( - &client_rw_args, - &c_tester, - aws_byte_buf_from_empty_array(client_received_message, total_bytes_to_send_from_server), - 0)); + ASSERT_SUCCESS(s_rw_args_init(&client_rw_args, &c_tester, client_received_message, 0)); /* NOTE client start with window=0 */ struct aws_channel_handler *client_rw_handler = rw_handler_new( @@ -820,9 +821,19 @@ static int s_socket_read_to_eof_after_peer_hangup_test_common( struct socket_test_args client_args; ASSERT_SUCCESS(s_socket_test_args_init(&client_args, &c_tester, client_rw_handler)); + bool skip_test = false; struct local_server_tester local_server_tester; - ASSERT_SUCCESS( - s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, socket_domain, false)); + if (s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, socket_domain, false)) { + /* Skip if test environment doesn't support IPv6 */ + if (aws_last_error() == AWS_IO_SOCKET_INVALID_ADDRESS) { + skip_test = true; + aws_channel_handler_destroy(client_rw_handler); + aws_channel_handler_destroy(server_rw_handler); + goto clean_up; + } else { + ASSERT_TRUE(false, "s_local_server_tester_init() failed"); + } + } struct aws_client_bootstrap_options client_bootstrap_options = { .event_loop_group = c_tester.el_group, @@ -856,9 +867,7 @@ static int s_socket_read_to_eof_after_peer_hangup_test_common( * before the client has fully read the data. This is tricky to do in a test. * * First, have the server send data... */ - struct aws_byte_buf msg_from_server; - ASSERT_SUCCESS(aws_byte_buf_init(&msg_from_server, allocator, total_bytes_to_send_from_server)); - msg_from_server.len = total_bytes_to_send_from_server; + ASSERT_TRUE(aws_byte_buf_write_u8_n(&msg_from_server, 's', total_bytes_to_send_from_server)); rw_handler_write_with_callback( server_rw_handler, server_args.rw_slot, @@ -922,15 +931,15 @@ static int s_socket_read_to_eof_after_peer_hangup_test_common( aws_mutex_unlock(&c_tester.mutex); - /* clean up */ +/* clean up */ +clean_up: ASSERT_SUCCESS(s_local_server_tester_clean_up(&local_server_tester)); - aws_mem_release(allocator, client_received_message); - + aws_byte_buf_clean_up(&client_received_message); aws_byte_buf_clean_up(&msg_from_server); aws_client_bootstrap_release(client_bootstrap); ASSERT_SUCCESS(s_socket_common_tester_clean_up(&c_tester)); - return AWS_OP_SUCCESS; + return skip_test ? AWS_OP_SKIP : AWS_OP_SUCCESS; } static int s_socket_read_to_eof_after_peer_hangup_test(struct aws_allocator *allocator, void *ctx) { return s_socket_read_to_eof_after_peer_hangup_test_common(allocator, ctx, AWS_SOCKET_LOCAL); From b2d60051af61b543938b535a23f9086a920d71ab Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Wed, 29 May 2024 13:35:11 -0700 Subject: [PATCH 09/18] fix warnings --- tests/socket_handler_test.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/socket_handler_test.c b/tests/socket_handler_test.c index f903a2fa4..ecfb25c53 100644 --- a/tests/socket_handler_test.c +++ b/tests/socket_handler_test.c @@ -17,6 +17,10 @@ #include "statistics_handler_test.h" #include +#ifdef _MSC_VER +# pragma warning(disable : 4996) /* allow strncpy() */ +#endif + #define NANOS_PER_SEC ((uint64_t)AWS_TIMESTAMP_NANOS) #define TIMEOUT (10 * NANOS_PER_SEC) @@ -499,6 +503,7 @@ static int s_socket_pinned_event_loop_dns_failure_test(struct aws_allocator *all struct aws_client_bootstrap_options client_bootstrap_options = { .event_loop_group = c_tester.el_group, + .host_resolver = c_tester.resolver, }; struct aws_client_bootstrap *client_bootstrap = aws_client_bootstrap_new(allocator, &client_bootstrap_options); ASSERT_NOT_NULL(client_bootstrap); @@ -821,6 +826,8 @@ static int s_socket_read_to_eof_after_peer_hangup_test_common( struct socket_test_args client_args; ASSERT_SUCCESS(s_socket_test_args_init(&client_args, &c_tester, client_rw_handler)); + struct aws_client_bootstrap *client_bootstrap = NULL; + bool skip_test = false; struct local_server_tester local_server_tester; if (s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, socket_domain, false)) { @@ -839,7 +846,7 @@ static int s_socket_read_to_eof_after_peer_hangup_test_common( .event_loop_group = c_tester.el_group, .host_resolver = c_tester.resolver, }; - struct aws_client_bootstrap *client_bootstrap = aws_client_bootstrap_new(allocator, &client_bootstrap_options); + client_bootstrap = aws_client_bootstrap_new(allocator, &client_bootstrap_options); ASSERT_NOT_NULL(client_bootstrap); struct aws_socket_channel_bootstrap_options client_channel_options = { From 3eb8d9bed3c1aa5f34bbf691974ff11e4014b3ed Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Wed, 29 May 2024 15:30:38 -0700 Subject: [PATCH 10/18] don't try to clean up after skip --- tests/socket_handler_test.c | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/tests/socket_handler_test.c b/tests/socket_handler_test.c index ecfb25c53..71f1705aa 100644 --- a/tests/socket_handler_test.c +++ b/tests/socket_handler_test.c @@ -826,17 +826,11 @@ static int s_socket_read_to_eof_after_peer_hangup_test_common( struct socket_test_args client_args; ASSERT_SUCCESS(s_socket_test_args_init(&client_args, &c_tester, client_rw_handler)); - struct aws_client_bootstrap *client_bootstrap = NULL; - - bool skip_test = false; struct local_server_tester local_server_tester; if (s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, socket_domain, false)) { - /* Skip if test environment doesn't support IPv6 */ + /* Skip test if server can't bind to address (e.g. Gith9ub's ubuntu runners don't allow IPv6) */ if (aws_last_error() == AWS_IO_SOCKET_INVALID_ADDRESS) { - skip_test = true; - aws_channel_handler_destroy(client_rw_handler); - aws_channel_handler_destroy(server_rw_handler); - goto clean_up; + return AWS_OP_SKIP; } else { ASSERT_TRUE(false, "s_local_server_tester_init() failed"); } @@ -846,7 +840,7 @@ static int s_socket_read_to_eof_after_peer_hangup_test_common( .event_loop_group = c_tester.el_group, .host_resolver = c_tester.resolver, }; - client_bootstrap = aws_client_bootstrap_new(allocator, &client_bootstrap_options); + struct aws_client_bootstrap *client_bootstrap = aws_client_bootstrap_new(allocator, &client_bootstrap_options); ASSERT_NOT_NULL(client_bootstrap); struct aws_socket_channel_bootstrap_options client_channel_options = { @@ -938,15 +932,14 @@ static int s_socket_read_to_eof_after_peer_hangup_test_common( aws_mutex_unlock(&c_tester.mutex); -/* clean up */ -clean_up: + /* clean up */ ASSERT_SUCCESS(s_local_server_tester_clean_up(&local_server_tester)); aws_byte_buf_clean_up(&client_received_message); aws_byte_buf_clean_up(&msg_from_server); aws_client_bootstrap_release(client_bootstrap); ASSERT_SUCCESS(s_socket_common_tester_clean_up(&c_tester)); - return skip_test ? AWS_OP_SKIP : AWS_OP_SUCCESS; + return AWS_OP_SUCCESS; } static int s_socket_read_to_eof_after_peer_hangup_test(struct aws_allocator *allocator, void *ctx) { return s_socket_read_to_eof_after_peer_hangup_test_common(allocator, ctx, AWS_SOCKET_LOCAL); From 11e6267b350bff23a1f28452f9a326982ec044b5 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Thu, 30 May 2024 13:21:53 -0700 Subject: [PATCH 11/18] log statement wasn't totally correct --- source/windows/iocp/socket.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/windows/iocp/socket.c b/source/windows/iocp/socket.c index da2215377..197bdcaaa 100644 --- a/source/windows/iocp/socket.c +++ b/source/windows/iocp/socket.c @@ -2971,7 +2971,7 @@ static int s_tcp_read(struct aws_socket *socket, struct aws_byte_buf *buffer, si AWS_LOGF_ERROR( AWS_LS_IO_SOCKET, - "id=%p handle=%p: ReadFile() failed with error %d", + "id=%p handle=%p: recv() failed with error %d", (void *)socket, (void *)socket->io_handle.data.handle, error); From 69eea42e491b217b9d320bb7665de670a763514f Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Fri, 31 May 2024 14:33:29 -0700 Subject: [PATCH 12/18] fix logging the wrong pointer --- source/socket_channel_handler.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/socket_channel_handler.c b/source/socket_channel_handler.c index e291176f1..187cf95db 100644 --- a/source/socket_channel_handler.c +++ b/source/socket_channel_handler.c @@ -178,7 +178,7 @@ static void s_do_read(struct socket_handler *socket_handler) { AWS_LOGF_TRACE( AWS_LS_IO_SOCKET_HANDLER, "id=%p: total read on this tick %llu", - (void *)&socket_handler->slot->handler, + (void *)socket_handler->slot->handler, (unsigned long long)total_read); socket_handler->stats.bytes_read += total_read; From 6c85d6452d66bad934a4d09ce9b0f89bcaa6be69 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Mon, 3 Jun 2024 10:28:41 -0700 Subject: [PATCH 13/18] Disable failing tests on Windows, with detailed comment explaining why --- tests/CMakeLists.txt | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 79a2695f2..2543742af 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -124,8 +124,28 @@ add_test_case(pem_sanitize_wrong_format_rejected) add_test_case(socket_handler_echo_and_backpressure) add_test_case(socket_handler_close) add_test_case(socket_handler_read_to_eof_after_peer_hangup) -add_net_test_case(socket_handler_ipv4_read_to_eof_after_peer_hangup) -add_net_test_case(socket_handler_ipv6_read_to_eof_after_peer_hangup) +# These tests fail on Windows due to some bug in our server code. +# I've lost days to this bug, and no one is using our Windows server funcionality, +# so disabling these tests on windows and moving along for now. +# They fail because the client ends up seeing WSAECONNRESET (Connection reset by peer) +# before receiving all the data. But it's totally the server's fault, not the client's. +# I tried the following: +# 1) Wrote 2 simple standalone Windows programs, server and client, using simple synchronous socket code. +# WORKED PERFECTLY. So it's not a fundamental issue with Windows. +# 2) Commented out server part of this failing test, and used the simple standalone server instead. +# WORKED PERFECTLY. So it's not a problem with our actual client code. +# 3) Copy/pasted the simple standlone server code into this test, and used that instead of our actual server code. +# WORKED PERFECTLY. So it's not a problem with the server and client sockets being in the same process. +# 4) Commented out the client part of this failing test, and used the simple standalone client instead. +# FAILED. The standalone client got WSAECONNRESET before receiving all the data. +# So it's something with our complicated non-blocking server code. +# The last interesting thing I noticed before giving up was: we call shutdown() immediately +# before calling closesocket() but shutdown() gets error WSAENOTCONN, even +# though, at that moment, the socket should be connected just fine. +if(NOT WINDOWS) + add_net_test_case(socket_handler_ipv4_read_to_eof_after_peer_hangup) + add_net_test_case(socket_handler_ipv6_read_to_eof_after_peer_hangup) +endif() add_test_case(socket_pinned_event_loop) add_net_test_case(socket_pinned_event_loop_dns_failure) From 7db6a7488bd2845c41a84ae60d225d6d3cb99ff2 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Mon, 3 Jun 2024 11:23:09 -0700 Subject: [PATCH 14/18] WIN32, not WINDOWS --- tests/CMakeLists.txt | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 2543742af..01f86115c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -124,11 +124,10 @@ add_test_case(pem_sanitize_wrong_format_rejected) add_test_case(socket_handler_echo_and_backpressure) add_test_case(socket_handler_close) add_test_case(socket_handler_read_to_eof_after_peer_hangup) -# These tests fail on Windows due to some bug in our server code. +# These tests fail on Windows due to some bug in our server code where, if the socket is closed +# immediately after data is written, that data does not flush cleanly to the client. # I've lost days to this bug, and no one is using our Windows server funcionality, -# so disabling these tests on windows and moving along for now. -# They fail because the client ends up seeing WSAECONNRESET (Connection reset by peer) -# before receiving all the data. But it's totally the server's fault, not the client's. +# so disabling these tests on Windows and moving along for now. # I tried the following: # 1) Wrote 2 simple standalone Windows programs, server and client, using simple synchronous socket code. # WORKED PERFECTLY. So it's not a fundamental issue with Windows. @@ -137,12 +136,12 @@ add_test_case(socket_handler_read_to_eof_after_peer_hangup) # 3) Copy/pasted the simple standlone server code into this test, and used that instead of our actual server code. # WORKED PERFECTLY. So it's not a problem with the server and client sockets being in the same process. # 4) Commented out the client part of this failing test, and used the simple standalone client instead. -# FAILED. The standalone client got WSAECONNRESET before receiving all the data. +# FAILED. The standalone client got WSAECONNRESET (Connection reset by peer) before receiving all the data. # So it's something with our complicated non-blocking server code. # The last interesting thing I noticed before giving up was: we call shutdown() immediately # before calling closesocket() but shutdown() gets error WSAENOTCONN, even # though, at that moment, the socket should be connected just fine. -if(NOT WINDOWS) +if(NOT WIN32) add_net_test_case(socket_handler_ipv4_read_to_eof_after_peer_hangup) add_net_test_case(socket_handler_ipv6_read_to_eof_after_peer_hangup) endif() From 5ab408611d078b3bc985ce052e22ca1a0dd585b3 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Mon, 3 Jun 2024 11:50:38 -0700 Subject: [PATCH 15/18] explain --- source/socket_channel_handler.c | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/source/socket_channel_handler.c b/source/socket_channel_handler.c index 187cf95db..88b81d503 100644 --- a/source/socket_channel_handler.c +++ b/source/socket_channel_handler.c @@ -217,16 +217,24 @@ static void s_do_read(struct socket_handler *socket_handler) { * If an error, start the channel shutdown process. */ static void s_on_readable_notification(struct aws_socket *socket, int error_code, void *user_data) { (void)socket; - /* TODO: explain */ - (void)error_code; struct socket_handler *socket_handler = user_data; - AWS_LOGF_TRACE(AWS_LS_IO_SOCKET_HANDLER, "id=%p: socket is now readable", (void *)socket_handler->slot->handler); - - /* read regardless so we can pick up data that was sent prior to the close. For example, peer sends a TLS ALERT - * then immediately closes the socket. On some platforms, we'll never see the readable flag. So we want to make + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET_HANDLER, + "id=%p: socket on-readable with error code %d(%s)", + (void *)socket_handler->slot->handler, + error_code, + aws_error_name(error_code)); + + /* Regardless of error code call read() until it reports error or EOF, + * so we can pick up data that was sent prior to the close. + * For example, if peer closes the socket immediately after sending the last + * bytes of data, the READABLE and HANGUP events arrive simultaneously. + * Another example, peer sends a TLS ALERT then immediately closes the socket. + * On some platforms, we'll never see the readable flag. So we want to make * sure we read the ALERT, otherwise, we'll end up telling the user that the channel shutdown because of a socket * closure, when in reality it was a TLS error */ + (void)error_code; s_do_read(socket_handler); } From bc9918a042f2d294874668c7899267f80cd5848b Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Mon, 3 Jun 2024 11:51:57 -0700 Subject: [PATCH 16/18] I am so done with you windows --- tests/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 01f86115c..67220907c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -123,7 +123,6 @@ add_test_case(pem_sanitize_wrong_format_rejected) add_test_case(socket_handler_echo_and_backpressure) add_test_case(socket_handler_close) -add_test_case(socket_handler_read_to_eof_after_peer_hangup) # These tests fail on Windows due to some bug in our server code where, if the socket is closed # immediately after data is written, that data does not flush cleanly to the client. # I've lost days to this bug, and no one is using our Windows server funcionality, @@ -142,6 +141,7 @@ add_test_case(socket_handler_read_to_eof_after_peer_hangup) # before calling closesocket() but shutdown() gets error WSAENOTCONN, even # though, at that moment, the socket should be connected just fine. if(NOT WIN32) + add_net_test_case(socket_handler_read_to_eof_after_peer_hangup) add_net_test_case(socket_handler_ipv4_read_to_eof_after_peer_hangup) add_net_test_case(socket_handler_ipv6_read_to_eof_after_peer_hangup) endif() From 56d92d3382da668bfbcd02b8d875741069a54216 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Mon, 3 Jun 2024 13:20:46 -0700 Subject: [PATCH 17/18] comments --- tests/socket_handler_test.c | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/tests/socket_handler_test.c b/tests/socket_handler_test.c index 71f1705aa..513ca570e 100644 --- a/tests/socket_handler_test.c +++ b/tests/socket_handler_test.c @@ -789,6 +789,15 @@ static int s_socket_close_test(struct aws_allocator *allocator, void *ctx) { AWS_TEST_CASE(socket_handler_close, s_socket_close_test) +/* This is a regression test. + * Once upon a time, if the socket-handler received READABLE and HANGUP events simultaneously, + * it would read one last time from the socket before closing it. But one read may + * not be enough to get all remaining data. The correct thing is to do is + * repeatedly read until the read() call itself reports EOF or an error. + * + * Anyway, this test establishes a connection between server and client. + * The server sends a big chunk of data, and closes the socket immediately + * after the write completes. The client should still be able to read all the data. */ static int s_socket_read_to_eof_after_peer_hangup_test_common( struct aws_allocator *allocator, void *ctx, @@ -811,7 +820,7 @@ static int s_socket_read_to_eof_after_peer_hangup_test_common( struct socket_test_rw_args client_rw_args; ASSERT_SUCCESS(s_rw_args_init(&client_rw_args, &c_tester, client_received_message, 0)); - /* NOTE client start with window=0 */ + /* NOTE: client starts with window=0, so we can VERY CAREFULLY control when it reads data from the socket */ struct aws_channel_handler *client_rw_handler = rw_handler_new( allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 0 /*window*/, &client_rw_args); ASSERT_NOT_NULL(client_rw_handler); @@ -897,7 +906,7 @@ static int s_socket_read_to_eof_after_peer_hangup_test_common( ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_shutdown_predicate, &server_args)); - /* Now sleep a moment to give the OS time to propagate the socket-close event to the client-side. */ + /* Now sleep a moment to 100% guarantee the OS propagates the socket-close event to the client-side. */ aws_mutex_unlock(&c_tester.mutex); aws_thread_current_sleep(NANOS_PER_SEC / 4); aws_mutex_lock(&c_tester.mutex); @@ -912,10 +921,10 @@ static int s_socket_read_to_eof_after_peer_hangup_test_common( " The server needs to finish sending data, and close the socket," " BEFORE the client reads all the data."); - /* Have the client open its window enough to receive the rest of the data. + /* Have the client open its window more-than-enough to receive the rest of the data. * If the client socket closes before all the data is received, then we still have the bug. */ rw_handler_trigger_increment_read_window( - client_args.rw_handler, client_args.rw_slot, total_bytes_to_send_from_server); + client_args.rw_handler, client_args.rw_slot, total_bytes_to_send_from_server * 3 /*more-than-enough*/); client_rw_args.expected_read = total_bytes_to_send_from_server; ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_socket_test_full_read_predicate, &client_rw_args)); From 3b3bdef8062d7caf0a69ea3817f3481982753856 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Tue, 4 Jun 2024 09:17:03 -0700 Subject: [PATCH 18/18] comments++ --- source/socket_channel_handler.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/socket_channel_handler.c b/source/socket_channel_handler.c index 88b81d503..c327f6f35 100644 --- a/source/socket_channel_handler.c +++ b/source/socket_channel_handler.c @@ -228,12 +228,18 @@ static void s_on_readable_notification(struct aws_socket *socket, int error_code /* Regardless of error code call read() until it reports error or EOF, * so we can pick up data that was sent prior to the close. + * * For example, if peer closes the socket immediately after sending the last * bytes of data, the READABLE and HANGUP events arrive simultaneously. + * * Another example, peer sends a TLS ALERT then immediately closes the socket. * On some platforms, we'll never see the readable flag. So we want to make * sure we read the ALERT, otherwise, we'll end up telling the user that the channel shutdown because of a socket - * closure, when in reality it was a TLS error */ + * closure, when in reality it was a TLS error + * + * It may take more than one read() to get all remaining data. + * Also, if the downstream read-window reaches 0, we need to patiently + * wait until the window opens before we can call read() again. */ (void)error_code; s_do_read(socket_handler); }