Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Progress callbacks fire for all meta-request types #344

Merged
merged 11 commits into from
Aug 25, 2023
56 changes: 52 additions & 4 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,25 @@ struct aws_s3_prepare_request_payload {
void *user_data;
};

/* An event to be delivered on the meta-request's io_event_loop thread. */
struct aws_s3_meta_request_event {
enum aws_s3_meta_request_event_type {
AWS_S3_META_REQUEST_EVENT_RESPONSE_BODY, /* body_callback */
AWS_S3_META_REQUEST_EVENT_PROGRESS, /* progress_callback */
/* TODO: AWS_S3_META_REQUEST_EVENT_TELEMETRY */
} type;

union {
struct {
struct aws_s3_request *request;
graebm marked this conversation as resolved.
Show resolved Hide resolved
} response_body;

struct {
struct aws_s3_meta_request_progress info;
} progress;
} u;
};
graebm marked this conversation as resolved.
Show resolved Hide resolved

struct aws_s3_meta_request_vtable {
/* Update the meta request. out_request is required to be non-null. Returns true if there is any work in
* progress, false if there is not. */
Expand Down Expand Up @@ -179,11 +198,19 @@ struct aws_s3_meta_request {
* failed.)*/
uint32_t num_parts_delivery_completed;

/* Number of parts that have been successfully delivered to the caller. */
uint32_t num_parts_delivery_succeeded;
/* Task for delivering events on the meta-request's io_event_loop thread.
* We do this to ensure a meta-request's callbacks are fired sequentially and non-overlapping.
* If `event_delivery_array` has items in it, then this task is scheduled.
* If `event_delivery_active` is true, then this task is actively running.
* Delivery is not 100% complete until `event_delivery_array` is empty AND `event_delivery_active` is false
* (use aws_s3_meta_request_are_events_out_for_delivery_synced() to check) */
struct aws_task event_delivery_task;

/* Array of `struct aws_s3_meta_request_event` to deliver when the `event_delivery_task` runs. */
struct aws_array_list event_delivery_array;

/* Number of parts that have failed while trying to be delivered to the caller. */
uint32_t num_parts_delivery_failed;
/* When true, events are actively being delivered to the user. */
bool event_delivery_active;

/* The end finish result of the meta request. */
struct aws_s3_meta_request_result finish_result;
Expand All @@ -205,6 +232,14 @@ struct aws_s3_meta_request {

} client_process_work_threaded_data;

/* Anything in this structure should only ever be accessed by the meta-request from its io_event_loop thread. */
struct {
/* When delivering events, we swap contents with `synced_data.event_delivery_array`.
* This is an optimization, we could have just copied the array when the task runs,
* but swapping two array-lists back and forth avoids an allocation. */
struct aws_array_list event_delivery_array;
} io_threaded_data;

const bool should_compute_content_md5;

/* deep copy of the checksum config. */
Expand Down Expand Up @@ -316,6 +351,19 @@ void aws_s3_meta_request_stream_response_body_synced(
struct aws_s3_meta_request *meta_request,
struct aws_s3_request *request);

/* Add an event for delivery on the meta-request's io_event_loop thread.
* These events usually correspond to callbacks that must fire sequentially and non-overlapping,
* such as delivery of a part's response body. */
AWS_S3_API
graebm marked this conversation as resolved.
Show resolved Hide resolved
void aws_s3_meta_request_add_event_for_delivery_synced(
struct aws_s3_meta_request *meta_request,
const struct aws_s3_meta_request_event *event);

/* Returns whether any events are out for delivery.
* The meta-request's finish callback must not be invoked until this returns false. */
AWS_S3_API
bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_request *meta_request);

/* Asynchronously read from the meta request's input stream. Should always be done outside of any mutex,
* as reading from the stream could cause user code to call back into aws-c-s3.
* This will fill the buffer to capacity, unless end of stream is reached.
Expand Down
4 changes: 2 additions & 2 deletions include/aws/s3/private/s3_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,10 @@ AWS_S3_API
void aws_s3_request_clean_up_send_data(struct aws_s3_request *request);

AWS_S3_API
void aws_s3_request_acquire(struct aws_s3_request *request);
struct aws_s3_request *aws_s3_request_acquire(struct aws_s3_request *request);

AWS_S3_API
void aws_s3_request_release(struct aws_s3_request *request);
struct aws_s3_request *aws_s3_request_release(struct aws_s3_request *request);

AWS_S3_API
struct aws_s3_request_metrics *aws_s3_request_metrics_new(
Expand Down
2 changes: 1 addition & 1 deletion include/aws/s3/private/s3_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#endif
#define KB_TO_BYTES(kb) ((kb)*1024)
#define MB_TO_BYTES(mb) ((mb)*1024 * 1024)
#define GB_TO_BYTES(gb) ((gb)*1024 * 1024 * 1024)
#define GB_TO_BYTES(gb) ((gb)*1024 * 1024 * 1024ULL)

#define MS_TO_NS(ms) ((uint64_t)(ms)*1000000)
#define SEC_TO_NS(ms) ((uint64_t)(ms)*1000000000)
Expand Down
7 changes: 6 additions & 1 deletion include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ struct aws_s3_meta_request_progress {
};

/**
* Invoked to report progress of multi-part upload and copy object requests.
* Invoked to report progress of a meta-request.
* For PutObject, progress refers to bytes uploaded.
* For CopyObject, progress refers to bytes copied.
* For GetObject, progress refers to bytes downloaded.
* For anything else, progress refers to response bytes received.
graebm marked this conversation as resolved.
Show resolved Hide resolved
*/
typedef void(aws_s3_meta_request_progress_fn)(
struct aws_s3_meta_request *meta_request,
Expand Down Expand Up @@ -534,6 +538,7 @@ struct aws_s3_meta_request_options {

/**
* Invoked to report progress of the meta request execution.
* See `aws_s3_meta_request_progress_fn`.
*/
aws_s3_meta_request_progress_fn *progress_callback;

Expand Down
16 changes: 16 additions & 0 deletions source/s3_auto_ranged_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ static bool s_s3_auto_ranged_get_update(
}

no_work_remaining:
/* If some events are still being delivered to caller, then wait for those to finish */
if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) {
graebm marked this conversation as resolved.
Show resolved Hide resolved
work_remaining = true;
}
graebm marked this conversation as resolved.
Show resolved Hide resolved

if (!work_remaining) {
aws_s3_meta_request_set_success_synced(meta_request, s_s3_auto_ranged_get_success_status(meta_request));
Expand Down Expand Up @@ -695,6 +699,18 @@ static void s_s3_auto_ranged_get_request_finished(
}
++auto_ranged_get->synced_data.num_parts_successful;

/* Send progress_callback for delivery on io_event_loop thread */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = request->send_data.response_body.len;
event.u.progress.info.content_length =
auto_ranged_get->synced_data.object_range_empty
? 0
: (auto_ranged_get->synced_data.object_range_end + 1 -
graebm marked this conversation as resolved.
Show resolved Hide resolved
auto_ranged_get->synced_data.object_range_start);
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

aws_s3_meta_request_stream_response_body_synced(meta_request, request);

AWS_LOGF_DEBUG(
Expand Down
31 changes: 24 additions & 7 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,10 @@ static bool s_s3_auto_ranged_put_update(
work_remaining = true;

no_work_remaining:
/* If some events are still being delivered to caller, then wait for those to finish */
if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) {
work_remaining = true;
}

if (!work_remaining) {
aws_s3_meta_request_set_success_synced(meta_request, AWS_S3_RESPONSE_STATUS_SUCCESS);
Expand Down Expand Up @@ -1598,6 +1602,8 @@ static void s_s3_auto_ranged_put_request_finished(
AWS_LS_S3_META_REQUEST, "id=%p Failed to parse list parts response.", (void *)meta_request);
error_code = AWS_ERROR_S3_LIST_PARTS_PARSE_FAILED;
} else if (!has_more_results) {
uint64_t bytes_previously_uploaded = 0;

for (size_t part_index = 0;
part_index < aws_array_list_length(&auto_ranged_put->synced_data.part_list);
part_index++) {
Expand All @@ -1607,6 +1613,8 @@ static void s_s3_auto_ranged_put_request_finished(
/* Update the number of parts sent/completed previously */
++auto_ranged_put->synced_data.num_parts_started;
++auto_ranged_put->synced_data.num_parts_completed;

bytes_previously_uploaded += part->size;
}
}

Expand All @@ -1616,6 +1624,14 @@ static void s_s3_auto_ranged_put_request_finished(
(void *)meta_request,
auto_ranged_put->synced_data.num_parts_completed,
auto_ranged_put->total_num_parts_from_content_length);

/* Deliver an initial progress_callback to report all previously uploaded parts. */
if (meta_request->progress_callback != NULL && bytes_previously_uploaded > 0) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = bytes_previously_uploaded;
event.u.progress.info.content_length = auto_ranged_put->content_length;
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}
}
}

Expand Down Expand Up @@ -1742,13 +1758,6 @@ static void s_s3_auto_ranged_put_request_finished(
etag = aws_strip_quotes(meta_request->allocator, etag_within_quotes);
}
}
if (error_code == AWS_ERROR_SUCCESS && meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_progress progress = {
.bytes_transferred = request->request_body.len,
.content_length = auto_ranged_put->content_length,
};
meta_request->progress_callback(meta_request, &progress, meta_request->user_data);
}
}

/* BEGIN CRITICAL SECTION */
Expand Down Expand Up @@ -1782,6 +1791,14 @@ static void s_s3_auto_ranged_put_request_finished(

++auto_ranged_put->synced_data.num_parts_successful;

/* Send progress_callback for delivery on io_event_loop thread */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = request->request_body.len;
event.u.progress.info.content_length = auto_ranged_put->content_length;
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

/* Store part's ETag */
struct aws_s3_mpu_part_info *part = NULL;
aws_array_list_get_at(&auto_ranged_put->synced_data.part_list, &part, part_index);
Expand Down
10 changes: 4 additions & 6 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1481,8 +1481,7 @@ static void s_s3_client_prepare_callback_queue_request(
request_is_noop = request->is_noop != 0;
s_s3_client_meta_request_finished_request(client, meta_request, request, error_code);

aws_s3_request_release(request);
request = NULL;
request = aws_s3_request_release(request);
}

/* BEGIN CRITICAL SECTION */
Expand Down Expand Up @@ -1522,8 +1521,7 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {
if (!request->always_send && aws_s3_meta_request_has_finish_result(request->meta_request)) {
s_s3_client_meta_request_finished_request(client, request->meta_request, request, AWS_ERROR_S3_CANCELED);

aws_s3_request_release(request);
request = NULL;
request = aws_s3_request_release(request);
} else if (
s_s3_client_get_num_requests_network_io(client, request->meta_request->type) < max_active_connections) {
s_s3_client_create_connection_for_request(client, request);
Expand Down Expand Up @@ -1856,8 +1854,8 @@ void aws_s3_client_notify_connection_finished(
}

if (connection->request != NULL) {
aws_s3_request_release(connection->request);
connection->request = NULL;

connection->request = aws_s3_request_release(connection->request);
}

aws_retry_token_release(connection->retry_token);
Expand Down
23 changes: 19 additions & 4 deletions source/s3_copy_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,10 @@ static bool s_s3_copy_object_update(
work_remaining = true;

no_work_remaining:
/* If some events are still being delivered to caller, then wait for those to finish */
if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) {
work_remaining = true;
}

if (!work_remaining) {
aws_s3_meta_request_set_success_synced(meta_request, AWS_S3_RESPONSE_STATUS_SUCCESS);
Expand Down Expand Up @@ -640,6 +644,15 @@ static void s_s3_copy_object_request_finished(

/* Signals completion of the meta request */
if (error_code == AWS_ERROR_SUCCESS) {

/* Send progress_callback for delivery on io_event_loop thread */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = copy_object->synced_data.content_length;
event.u.progress.info.content_length = copy_object->synced_data.content_length;
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

copy_object->synced_data.copy_request_bypass_completed = true;
} else {
/* Bypassed CopyObject request failed */
Expand Down Expand Up @@ -720,11 +733,13 @@ static void s_s3_copy_object_request_finished(
AWS_ASSERT(etag != NULL);

++copy_object->synced_data.num_parts_successful;

/* Send progress_callback for delivery on io_event_loop thread. */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_progress progress = {
.bytes_transferred = copy_object->synced_data.part_size,
.content_length = copy_object->synced_data.content_length};
meta_request->progress_callback(meta_request, &progress, meta_request->user_data);
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = copy_object->synced_data.part_size;
event.u.progress.info.content_length = copy_object->synced_data.content_length;
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

struct aws_s3_mpu_part_info *part = NULL;
Expand Down
23 changes: 23 additions & 0 deletions source/s3_default_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ static bool s_s3_meta_request_default_update(
work_remaining = true;

no_work_remaining:
/* If some events are still being delivered to caller, then wait for those to finish */
if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) {
work_remaining = true;
}

if (!work_remaining) {
aws_s3_meta_request_set_success_synced(
Expand Down Expand Up @@ -366,6 +370,25 @@ static void s_s3_meta_request_default_request_finished(
meta_request_default->synced_data.request_error_code = error_code;

if (error_code == AWS_ERROR_SUCCESS) {
/* Send progress_callback for delivery on io_event_loop thread.
* For default meta-requests, we invoke the progress_callback once, after the sole HTTP request completes.
* This is simpler than reporting incremental progress as the response body is received,
* or the request body is streamed out, since then we'd also need to handle retries that reset
* progress back to 0% (our existing API only lets us report forward progress). */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
if (meta_request->type == AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) {
graebm marked this conversation as resolved.
Show resolved Hide resolved
/* For uploads, report request body size */
event.u.progress.info.bytes_transferred = request->request_body.len;
event.u.progress.info.content_length = request->request_body.len;
} else {
/* For anything else, report response body size */
event.u.progress.info.bytes_transferred = request->send_data.response_body.len;
event.u.progress.info.content_length = request->send_data.response_body.len;
}
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

aws_s3_meta_request_stream_response_body_synced(meta_request, request);
} else {
aws_s3_meta_request_set_fail_synced(meta_request, request, error_code);
Expand Down
Loading