Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TLS deliver buffer data during shutdown #650

Merged
merged 55 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
deccd6b
add test
TingDaoK Jun 27, 2024
113082f
a lot of lines
TingDaoK Jun 28, 2024
0034c24
use server lock
TingDaoK Jun 28, 2024
7219fea
woops... typo
TingDaoK Jun 28, 2024
0b7082b
make sure it shutdown
TingDaoK Jun 28, 2024
d90a40f
looks like tsan is not happy, let's notify within the lock
TingDaoK Jun 28, 2024
b2cad31
one possible way to handle this?
TingDaoK Jun 28, 2024
320272f
Mac & linux
TingDaoK Jul 9, 2024
a8ab76a
make sure server and client don't fall into the same thread
TingDaoK Jul 9, 2024
e8eccf3
windows
TingDaoK Jul 9, 2024
5277e9b
fix the build, but test finds another issue🥲
TingDaoK Jul 10, 2024
80a887c
Merge branch 'main' into socket-close
TingDaoK Jul 10, 2024
b9702f7
flush for all error code
TingDaoK Jul 10, 2024
4fcf74b
Merge branch 'socket-close' of github.com:awslabs/aws-c-io into socke…
TingDaoK Jul 10, 2024
ca84785
not socket erro
TingDaoK Jul 10, 2024
cd0a9ab
unless we finished negotiation
TingDaoK Jul 10, 2024
4fb3bde
kick off read
TingDaoK Jul 10, 2024
35ba824
fix build
TingDaoK Jul 10, 2024
0f9d185
Merge branch 'main' into socket-close
TingDaoK Jul 10, 2024
4f7cc91
read for all platforms
TingDaoK Jul 10, 2024
1b9c79c
fix build
TingDaoK Jul 10, 2024
e6a9caa
Merge branch 'main' into socket-close
TingDaoK Jul 11, 2024
f723101
break out inside the loop
TingDaoK Jul 12, 2024
4ea14b6
use latest builder?
TingDaoK Jul 12, 2024
c6bd4c8
unused line
TingDaoK Jul 12, 2024
bc91df5
only do the hack if the downstream window is larger than 0
TingDaoK Jul 12, 2024
7c1bf10
allow people to increment window during shutting down
TingDaoK Jul 12, 2024
3ad0547
Merge branch 'main' into socket-close
TingDaoK Jul 12, 2024
47ebf48
make sure it works after shutdown as well
TingDaoK Jul 12, 2024
2303ecc
adjust a bit
TingDaoK Jul 12, 2024
db3b36c
comments addressed
TingDaoK Jul 17, 2024
2cd59af
one more
TingDaoK Jul 17, 2024
f23300e
bring back the check for the next handler
TingDaoK Jul 17, 2024
e1ebefb
address comments and fix the skip
TingDaoK Jul 18, 2024
fa209e7
fix build
TingDaoK Jul 18, 2024
9f3bd93
revert the change
TingDaoK Jul 18, 2024
086eb7f
windows as well
TingDaoK Jul 18, 2024
6d02832
Merge branch 'main' into socket-close
TingDaoK Jul 18, 2024
bbf9fd1
format
TingDaoK Jul 18, 2024
1234fbc
man, i cannot write code without ide
TingDaoK Jul 18, 2024
f387e52
Apply suggestions from code review
TingDaoK Jul 24, 2024
ea1c052
comments addressed
TingDaoK Jul 24, 2024
cee28ae
protect against read after shutdown
TingDaoK Jul 24, 2024
a40c650
return a value
TingDaoK Jul 24, 2024
3c9261d
address comments
TingDaoK Jul 24, 2024
2c250e1
read state
TingDaoK Jul 24, 2024
8f212e5
fix build
TingDaoK Jul 24, 2024
9867c55
comments addressed
TingDaoK Jul 24, 2024
88dafd8
Merge branch 'main' into socket-close
TingDaoK Jul 24, 2024
f918526
try to make the loop more clear
TingDaoK Jul 25, 2024
e742c5a
oops
TingDaoK Jul 25, 2024
05b88dd
try to simplify some error-handling
graebm Jul 26, 2024
9d04799
fix my mistakes
graebm Jul 26, 2024
42d4400
trrrrivial
graebm Jul 26, 2024
463c563
respond to feedback
graebm Jul 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ jobs:
clang-sanitizers:
runs-on: ubuntu-22.04 # latest
strategy:
fail-fast: false
matrix:
sanitizers: [",thread", ",address,undefined"]
steps:
Expand Down
6 changes: 6 additions & 0 deletions include/aws/io/private/tls_channel_handler_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ struct aws_tls_channel_handler_shared {
struct aws_crt_statistics_tls stats;
};

enum aws_tls_handler_read_state {
AWS_TLS_HANDLER_OPEN,
AWS_TLS_HANDLER_READ_SHUTTING_DOWN,
AWS_TLS_HANDLER_READ_SHUT_DOWN_COMPLETE,
};

AWS_EXTERN_C_BEGIN

AWS_IO_API void aws_tls_channel_handler_shared_init(
Expand Down
4 changes: 2 additions & 2 deletions source/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ static void s_window_update_task(struct aws_channel_task *channel_task, void *ar

channel->window_update_scheduled = false;

if (status == AWS_TASK_STATUS_RUN_READY && channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
if (status == AWS_TASK_STATUS_RUN_READY && channel->channel_state < AWS_CHANNEL_SHUT_DOWN) {
graebm marked this conversation as resolved.
Show resolved Hide resolved
/* get the right-most slot to start the updates. */
struct aws_channel_slot *slot = channel->first;
while (slot->adj_right) {
Expand Down Expand Up @@ -858,7 +858,7 @@ static void s_window_update_task(struct aws_channel_task *channel_task, void *ar

int aws_channel_slot_increment_read_window(struct aws_channel_slot *slot, size_t window) {

if (slot->channel->read_back_pressure_enabled && slot->channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
if (slot->channel->read_back_pressure_enabled && slot->channel->channel_state < AWS_CHANNEL_SHUT_DOWN) {
slot->current_window_update_batch_size =
aws_add_size_saturating(slot->current_window_update_batch_size, window);

Expand Down
175 changes: 123 additions & 52 deletions source/darwin/secure_transport_tls_channel_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ struct secure_transport_handler {
bool negotiation_finished;
bool verify_peer;
bool read_task_pending;
enum aws_tls_handler_read_state read_state;
int delay_shutdown_error_code;
};

static OSStatus s_read_cb(SSLConnectionRef conn, void *data, size_t *len) {
Expand Down Expand Up @@ -548,6 +550,41 @@ static int s_process_write_message(
return AWS_OP_SUCCESS;
}

static void s_run_read(struct aws_channel_task *task, void *arg, aws_task_status status);

static void s_initialize_read_delay_shutdown(
struct aws_channel_handler *handler,
struct aws_channel_slot *slot,
int error_code) {
struct secure_transport_handler *secure_transport_handler = handler->impl;
/**
* In case of if we have any queued data in the handler after negotiation and we start to shutdown,
* make sure we pass those data down the pipeline before we complete the shutdown.
*/
AWS_LOGF_DEBUG(
AWS_LS_IO_TLS,
"id=%p: TLS handler still have pending data to be delivered during shutdown. Wait until downstream "
"reads the data.",
(void *)handler);
if (aws_channel_slot_downstream_read_window(slot) == 0) {
AWS_LOGF_WARN(
AWS_LS_IO_TLS,
"id=%p: TLS shutdown delayed. Pending data cannot be processed until the flow-control window opens. "
" Your application may hang if the read window never opens",
(void *)handler);
}
secure_transport_handler->read_state = AWS_TLS_HANDLER_READ_SHUTTING_DOWN;
secure_transport_handler->delay_shutdown_error_code = error_code;
if (!secure_transport_handler->read_task_pending) {
/* Kick off read, in case data arrives with TLS negotiation. Shutdown starts right after negotiation.
* Nothing will kick off read in that case. */
secure_transport_handler->read_task_pending = true;
aws_channel_task_init(
&secure_transport_handler->read_task, s_run_read, handler, "darwin_channel_handler_read_on_delay_shutdown");
aws_channel_schedule_task_now(slot->channel, &secure_transport_handler->read_task);
}
}

static int s_handle_shutdown(
struct aws_channel_handler *handler,
struct aws_channel_slot *slot,
Expand All @@ -556,24 +593,30 @@ static int s_handle_shutdown(
bool abort_immediately) {
struct secure_transport_handler *secure_transport_handler = handler->impl;

if (dir == AWS_CHANNEL_DIR_WRITE) {
if (dir == AWS_CHANNEL_DIR_READ) {
AWS_LOGF_DEBUG(
AWS_LS_IO_TLS, "id=%p: shutting down read direction with error %d.", (void *)handler, error_code);
if (!abort_immediately && secure_transport_handler->negotiation_finished &&
!aws_linked_list_empty(&secure_transport_handler->input_queue) && slot->adj_right) {
TingDaoK marked this conversation as resolved.
Show resolved Hide resolved
s_initialize_read_delay_shutdown(handler, slot, error_code);
/* Early out, not complete the shutdown process for the handler until the handler processes the pending
* data. */
return AWS_OP_SUCCESS;
}
secure_transport_handler->read_state = AWS_TLS_HANDLER_READ_SHUT_DOWN_COMPLETE;
} else {
/* Shutdown in write direction */
TingDaoK marked this conversation as resolved.
Show resolved Hide resolved
if (!abort_immediately && error_code != AWS_IO_SOCKET_CLOSED) {
AWS_LOGF_TRACE(AWS_LS_IO_TLS, "id=%p: shutting down write direction.", (void *)handler);
SSLClose(secure_transport_handler->ctx);
}
} else {
AWS_LOGF_DEBUG(
AWS_LS_IO_TLS,
"id=%p: shutting down read direction with error %d. Flushing queues.",
(void *)handler,
error_code);
while (!aws_linked_list_empty(&secure_transport_handler->input_queue)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&secure_transport_handler->input_queue);
struct aws_io_message *message = AWS_CONTAINER_OF(node, struct aws_io_message, queueing_handle);
aws_mem_release(message->allocator, message);
}
}

/* Flushing queues */
while (!aws_linked_list_empty(&secure_transport_handler->input_queue)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&secure_transport_handler->input_queue);
struct aws_io_message *message = AWS_CONTAINER_OF(node, struct aws_io_message, queueing_handle);
aws_mem_release(message->allocator, message);
}
return aws_channel_slot_on_handler_shutdown_complete(slot, dir, error_code, abort_immediately);
}

Expand All @@ -583,6 +626,12 @@ static int s_process_read_message(
struct aws_io_message *message) {

struct secure_transport_handler *secure_transport_handler = handler->impl;
if (secure_transport_handler->read_state == AWS_TLS_HANDLER_READ_SHUT_DOWN_COMPLETE) {
if (message) {
aws_mem_release(message->allocator, message);
}
return AWS_OP_SUCCESS;
TingDaoK marked this conversation as resolved.
Show resolved Hide resolved
}

if (message) {
aws_linked_list_push_back(&secure_transport_handler->input_queue, &message->queueing_handle);
Expand All @@ -608,59 +657,67 @@ static int s_process_read_message(
AWS_LS_IO_TLS, "id=%p: downstream window is %llu", (void *)handler, (unsigned long long)downstream_window);
size_t processed = 0;

OSStatus status = noErr;
while (processed < downstream_window && status == noErr) {
int shutdown_error_code = 0;
while (processed < downstream_window) {

struct aws_io_message *outgoing_read_message = aws_channel_acquire_message_from_pool(
slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, downstream_window - processed);

size_t read = 0;
status = SSLRead(
OSStatus status = SSLRead(
secure_transport_handler->ctx,
outgoing_read_message->message_data.buffer,
outgoing_read_message->message_data.capacity,
&read);

AWS_LOGF_TRACE(AWS_LS_IO_TLS, "id=%p: bytes read %llu", (void *)handler, (unsigned long long)read);
if (read <= 0) {
aws_mem_release(outgoing_read_message->allocator, outgoing_read_message);

if (status != errSSLWouldBlock) {
AWS_LOGF_ERROR(
AWS_LS_IO_TLS,
"id=%p: error reported during SSLRead. OSStatus code %d",
(void *)handler,
(int)status);
if (read > 0) {
processed += read;
outgoing_read_message->message_data.len = read;

if (status != errSSLClosedGraceful) {
aws_raise_error(AWS_IO_TLS_ERROR_READ_FAILURE);
aws_channel_shutdown(secure_transport_handler->parent_slot->channel, AWS_IO_TLS_ERROR_READ_FAILURE);
} else {
AWS_LOGF_TRACE(AWS_LS_IO_TLS, "id=%p: connection shutting down gracefully.", (void *)handler);
aws_channel_shutdown(secure_transport_handler->parent_slot->channel, AWS_ERROR_SUCCESS);
}
if (secure_transport_handler->on_data_read) {
secure_transport_handler->on_data_read(
handler, slot, &outgoing_read_message->message_data, secure_transport_handler->user_data);
}
continue;
};

processed += read;
outgoing_read_message->message_data.len = read;

if (secure_transport_handler->on_data_read) {
secure_transport_handler->on_data_read(
handler, slot, &outgoing_read_message->message_data, secure_transport_handler->user_data);
}

if (slot->adj_right) {
if (aws_channel_slot_send_message(slot, outgoing_read_message, AWS_CHANNEL_DIR_READ)) {
if (slot->adj_right) {
if (aws_channel_slot_send_message(slot, outgoing_read_message, AWS_CHANNEL_DIR_READ)) {
aws_mem_release(outgoing_read_message->allocator, outgoing_read_message);
shutdown_error_code = aws_last_error();
goto shutdown_channel;
}
/* outgoing message was pushed to the input_queue, so this handler owns it now */
} else {
TingDaoK marked this conversation as resolved.
Show resolved Hide resolved
aws_mem_release(outgoing_read_message->allocator, outgoing_read_message);
aws_channel_shutdown(secure_transport_handler->parent_slot->channel, aws_last_error());
/* incoming message was pushed to the input_queue, so this handler owns it now */
return AWS_OP_SUCCESS;
}
} else {
/* Nothing was read */
aws_mem_release(outgoing_read_message->allocator, outgoing_read_message);
}

switch (status) {
case errSSLWouldBlock:
if (secure_transport_handler->read_state == AWS_TLS_HANDLER_READ_SHUTTING_DOWN) {
/* Propagate the shutdown as we blocked now. */
goto shutdown_channel;
} else {
break;
}
case errSSLClosedGraceful:
AWS_LOGF_TRACE(AWS_LS_IO_TLS, "id=%p: connection shutting down gracefully.", (void *)handler);
goto shutdown_channel;
case noErr:
/* continue the while loop */
continue;
default:
/* unexpected error happened */
aws_raise_error(AWS_IO_TLS_ERROR_READ_FAILURE);
shutdown_error_code = AWS_IO_TLS_ERROR_READ_FAILURE;
goto shutdown_channel;
}

/* Break the while loop */
break;
}
AWS_LOGF_TRACE(
AWS_LS_IO_TLS,
Expand All @@ -669,6 +726,21 @@ static int s_process_read_message(
(unsigned long long)downstream_window - processed);

return AWS_OP_SUCCESS;

shutdown_channel:
if (secure_transport_handler->read_state == AWS_TLS_HANDLER_READ_SHUTTING_DOWN) {
if (secure_transport_handler->delay_shutdown_error_code != 0) {
/* Propagate the original error code if it is set. */
shutdown_error_code = secure_transport_handler->delay_shutdown_error_code;
}
/* Continue the shutdown process delayed before. */
secure_transport_handler->read_state = AWS_TLS_HANDLER_READ_SHUT_DOWN_COMPLETE;
aws_channel_slot_on_handler_shutdown_complete(slot, AWS_CHANNEL_DIR_READ, shutdown_error_code, false);
} else {
/* Starts the shutdown process */
aws_channel_shutdown(slot->channel, shutdown_error_code);
}
return AWS_OP_SUCCESS;
}

static void s_run_read(struct aws_channel_task *task, void *arg, aws_task_status status) {
Expand All @@ -683,6 +755,9 @@ static void s_run_read(struct aws_channel_task *task, void *arg, aws_task_status

static int s_increment_read_window(struct aws_channel_handler *handler, struct aws_channel_slot *slot, size_t size) {
struct secure_transport_handler *secure_transport_handler = handler->impl;
if (secure_transport_handler->read_state == AWS_TLS_HANDLER_READ_SHUT_DOWN_COMPLETE) {
return AWS_OP_SUCCESS;
}

AWS_LOGF_TRACE(
AWS_LS_IO_TLS, "id=%p: increment read window message received %llu", (void *)handler, (unsigned long long)size);
Expand All @@ -704,13 +779,9 @@ static int s_increment_read_window(struct aws_channel_handler *handler, struct a
aws_channel_slot_increment_read_window(slot, window_update_size);
}

if (secure_transport_handler->negotiation_finished && !secure_transport_handler->read_task.node.next) {
if (secure_transport_handler->negotiation_finished && !secure_transport_handler->read_task_pending) {
/* TLS requires full records before it can decrypt anything. As a result we need to check everything we've
* buffered instead of just waiting on a read from the socket, or we'll hit a deadlock.
*
* We have messages in a queue and they need to be run after the socket has popped (even if it didn't have data
* to read). Alternatively, s2n reads entire records at a time, so we'll need to grab whatever we can and we
* have no idea what's going on inside there. So we need to attempt another read.
*/
secure_transport_handler->read_task_pending = true;
aws_channel_task_init(
Expand Down
Loading