Skip to content

Commit

Permalink
async channel shutdown (#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK authored and graebm committed Aug 6, 2019
1 parent dd1cd58 commit ecea3d0
Showing 1 changed file with 52 additions and 62 deletions.
114 changes: 52 additions & 62 deletions source/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,83 +303,73 @@ struct channel_shutdown_task_args {

static int s_channel_shutdown(struct aws_channel *channel, int error_code, bool shutdown_immediately);

static void s_on_shutdown_completion_task(struct aws_task *task, void *arg, enum aws_task_status status);

static void s_shutdown_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {

(void)task;

(void)status;
struct shutdown_task *shutdown_task = arg;
struct aws_channel *channel = shutdown_task->channel;
int error_code = shutdown_task->error_code;
bool shutdown_immediately = shutdown_task->shutdown_immediately;
if (channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: beginning shutdown process", (void *)channel);

if (status == AWS_TASK_STATUS_RUN_READY) {
s_channel_shutdown(shutdown_task->channel, shutdown_task->error_code, shutdown_task->shutdown_immediately);
}
}

static void s_on_shutdown_completion_task(struct aws_task *task, void *arg, enum aws_task_status status);

static int s_channel_shutdown(struct aws_channel *channel, int error_code, bool shutdown_immediately) {
if (aws_channel_thread_is_callers_thread(channel)) {
if (channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: beginning shutdown process", (void *)channel);

struct aws_channel_slot *slot = channel->first;
channel->channel_state = AWS_CHANNEL_SHUTTING_DOWN;
struct aws_channel_slot *slot = channel->first;
channel->channel_state = AWS_CHANNEL_SHUTTING_DOWN;

if (slot) {
AWS_LOGF_TRACE(
AWS_LS_IO_CHANNEL,
"id=%p: shutting down slot %p (the first one) in the read direction",
(void *)channel,
(void *)slot);
if (slot) {
AWS_LOGF_TRACE(
AWS_LS_IO_CHANNEL,
"id=%p: shutting down slot %p (the first one) in the read direction",
(void *)channel,
(void *)slot);

return aws_channel_slot_shutdown(slot, AWS_CHANNEL_DIR_READ, error_code, shutdown_immediately);
}
aws_channel_slot_shutdown(slot, AWS_CHANNEL_DIR_READ, error_code, shutdown_immediately);
return;
}

channel->channel_state = AWS_CHANNEL_SHUT_DOWN;
AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: shutdown completed", (void *)channel);
channel->channel_state = AWS_CHANNEL_SHUT_DOWN;
AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: shutdown completed", (void *)channel);

aws_mutex_lock(&channel->cross_thread_tasks.lock);
channel->cross_thread_tasks.is_channel_shut_down = true;
aws_mutex_unlock(&channel->cross_thread_tasks.lock);
aws_mutex_lock(&channel->cross_thread_tasks.lock);
channel->cross_thread_tasks.is_channel_shut_down = true;
aws_mutex_unlock(&channel->cross_thread_tasks.lock);

if (channel->on_shutdown_completed) {
channel->shutdown_notify_task.task.fn = s_on_shutdown_completion_task;
channel->shutdown_notify_task.task.arg = channel;
channel->shutdown_notify_task.error_code = error_code;
aws_event_loop_schedule_task_now(channel->loop, &channel->shutdown_notify_task.task);
}
if (channel->on_shutdown_completed) {
channel->shutdown_notify_task.task.fn = s_on_shutdown_completion_task;
channel->shutdown_notify_task.task.arg = channel;
channel->shutdown_notify_task.error_code = error_code;
aws_event_loop_schedule_task_now(channel->loop, &channel->shutdown_notify_task.task);
}
} else {
AWS_LOGF_TRACE(
AWS_LS_IO_CHANNEL,
"id=%p: channel shutdown called from outside the "
"event-loop thread, scheduling task.",
(void *)channel);
}
}

bool need_to_schedule = true;
aws_mutex_lock(&channel->cross_thread_tasks.lock);
if (channel->cross_thread_tasks.shutdown_task.task.task_fn) {
need_to_schedule = false;
AWS_LOGF_DEBUG(
AWS_LS_IO_CHANNEL,
"id=%p: Channel shutdown is already pending, not scheduling another.",
(void *)channel);
static int s_channel_shutdown(struct aws_channel *channel, int error_code, bool shutdown_immediately) {
bool need_to_schedule = true;
aws_mutex_lock(&channel->cross_thread_tasks.lock);
if (channel->cross_thread_tasks.shutdown_task.task.task_fn) {
need_to_schedule = false;
AWS_LOGF_DEBUG(
AWS_LS_IO_CHANNEL, "id=%p: Channel shutdown is already pending, not scheduling another.", (void *)channel);

} else {
aws_channel_task_init(
&channel->cross_thread_tasks.shutdown_task.task,
s_shutdown_task,
&channel->cross_thread_tasks.shutdown_task,
"channel_cross_thread_shutdown");
channel->cross_thread_tasks.shutdown_task.shutdown_immediately = shutdown_immediately;
channel->cross_thread_tasks.shutdown_task.channel = channel;
channel->cross_thread_tasks.shutdown_task.error_code = error_code;
}
} else {
aws_channel_task_init(
&channel->cross_thread_tasks.shutdown_task.task,
s_shutdown_task,
&channel->cross_thread_tasks.shutdown_task,
"channel_shutdown");
channel->cross_thread_tasks.shutdown_task.shutdown_immediately = shutdown_immediately;
channel->cross_thread_tasks.shutdown_task.channel = channel;
channel->cross_thread_tasks.shutdown_task.error_code = error_code;
}

aws_mutex_unlock(&channel->cross_thread_tasks.lock);
aws_mutex_unlock(&channel->cross_thread_tasks.lock);

if (need_to_schedule) {
aws_channel_schedule_task_now(channel, &channel->cross_thread_tasks.shutdown_task.task);
}
if (need_to_schedule) {
AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: channel shutdown task is scheduled", (void *)channel);
aws_channel_schedule_task_now(channel, &channel->cross_thread_tasks.shutdown_task.task);
}

return AWS_OP_SUCCESS;
Expand Down

0 comments on commit ecea3d0

Please sign in to comment.