Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Progress callbacks fire for all meta-request types #344

Merged
merged 11 commits into from
Aug 25, 2023
4 changes: 3 additions & 1 deletion include/aws/s3/private/s3_auto_ranged_get.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ struct aws_s3_auto_ranged_get {
uint64_t object_range_start;

/* The last byte of the data that will be retrieved from the object.
* (ignore this if object_range_empty) */
* (ignore this if object_range_empty)
* Note this is inclusive: https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests
* So if begin=0 and end=0 then 1 byte is being downloaded. */
uint64_t object_range_end;

/* The total number of parts that are being used in downloading the object range. Note that "part" here
Expand Down
56 changes: 52 additions & 4 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,27 @@ struct aws_s3_prepare_request_payload {
void *user_data;
};

/* An event to be delivered on the meta-request's io_event_loop thread. */
struct aws_s3_meta_request_event {
enum aws_s3_meta_request_event_type {
AWS_S3_META_REQUEST_EVENT_RESPONSE_BODY, /* body_callback */
AWS_S3_META_REQUEST_EVENT_PROGRESS, /* progress_callback */
/* TODO: AWS_S3_META_REQUEST_EVENT_TELEMETRY */
} type;

union {
/* data for AWS_S3_META_REQUEST_EVENT_RESPONSE_BODY */
struct {
struct aws_s3_request *completed_request;
} response_body;

/* data for AWS_S3_META_REQUEST_EVENT_PROGRESS */
struct {
struct aws_s3_meta_request_progress info;
} progress;
} u;
};

struct aws_s3_meta_request_vtable {
/* Update the meta request. out_request is required to be non-null. Returns true if there is any work in
* progress, false if there is not. */
Expand Down Expand Up @@ -179,11 +200,19 @@ struct aws_s3_meta_request {
* failed.)*/
uint32_t num_parts_delivery_completed;

/* Number of parts that have been successfully delivered to the caller. */
uint32_t num_parts_delivery_succeeded;
/* Task for delivering events on the meta-request's io_event_loop thread.
* We do this to ensure a meta-request's callbacks are fired sequentially and non-overlapping.
* If `event_delivery_array` has items in it, then this task is scheduled.
* If `event_delivery_active` is true, then this task is actively running.
* Delivery is not 100% complete until `event_delivery_array` is empty AND `event_delivery_active` is false
* (use aws_s3_meta_request_are_events_out_for_delivery_synced() to check) */
struct aws_task event_delivery_task;

/* Number of parts that have failed while trying to be delivered to the caller. */
uint32_t num_parts_delivery_failed;
/* Array of `struct aws_s3_meta_request_event` to deliver when the `event_delivery_task` runs. */
struct aws_array_list event_delivery_array;

/* When true, events are actively being delivered to the user. */
bool event_delivery_active;

/* The end finish result of the meta request. */
struct aws_s3_meta_request_result finish_result;
Expand All @@ -205,6 +234,14 @@ struct aws_s3_meta_request {

} client_process_work_threaded_data;

/* Anything in this structure should only ever be accessed by the meta-request from its io_event_loop thread. */
struct {
/* When delivering events, we swap contents with `synced_data.event_delivery_array`.
* This is an optimization, we could have just copied the array when the task runs,
* but swapping two array-lists back and forth avoids an allocation. */
struct aws_array_list event_delivery_array;
} io_threaded_data;

const bool should_compute_content_md5;

/* deep copy of the checksum config. */
Expand Down Expand Up @@ -316,6 +353,17 @@ void aws_s3_meta_request_stream_response_body_synced(
struct aws_s3_meta_request *meta_request,
struct aws_s3_request *request);

/* Add an event for delivery on the meta-request's io_event_loop thread.
* These events usually correspond to callbacks that must fire sequentially and non-overlapping,
* such as delivery of a part's response body. */
void aws_s3_meta_request_add_event_for_delivery_synced(
struct aws_s3_meta_request *meta_request,
const struct aws_s3_meta_request_event *event);

/* Returns whether any events are out for delivery.
* The meta-request's finish callback must not be invoked until this returns false. */
bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_request *meta_request);

/* Asynchronously read from the meta request's input stream. Should always be done outside of any mutex,
* as reading from the stream could cause user code to call back into aws-c-s3.
* This will fill the buffer to capacity, unless end of stream is reached.
Expand Down
4 changes: 2 additions & 2 deletions include/aws/s3/private/s3_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,10 @@ AWS_S3_API
void aws_s3_request_clean_up_send_data(struct aws_s3_request *request);

AWS_S3_API
void aws_s3_request_acquire(struct aws_s3_request *request);
struct aws_s3_request *aws_s3_request_acquire(struct aws_s3_request *request);

AWS_S3_API
void aws_s3_request_release(struct aws_s3_request *request);
struct aws_s3_request *aws_s3_request_release(struct aws_s3_request *request);

AWS_S3_API
struct aws_s3_request_metrics *aws_s3_request_metrics_new(
Expand Down
2 changes: 1 addition & 1 deletion include/aws/s3/private/s3_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#endif
#define KB_TO_BYTES(kb) ((kb)*1024)
#define MB_TO_BYTES(mb) ((mb)*1024 * 1024)
#define GB_TO_BYTES(gb) ((gb)*1024 * 1024 * 1024)
#define GB_TO_BYTES(gb) ((gb)*1024 * 1024 * 1024ULL)

#define MS_TO_NS(ms) ((uint64_t)(ms)*1000000)
#define SEC_TO_NS(ms) ((uint64_t)(ms)*1000000000)
Expand Down
7 changes: 6 additions & 1 deletion include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ struct aws_s3_meta_request_progress {
};

/**
* Invoked to report progress of multi-part upload and copy object requests.
* Invoked to report progress of a meta-request.
* For PutObject, progress refers to bytes uploaded.
* For CopyObject, progress refers to bytes copied.
* For GetObject, progress refers to bytes downloaded.
* For anything else, progress refers to response body bytes received.
*/
typedef void(aws_s3_meta_request_progress_fn)(
struct aws_s3_meta_request *meta_request,
Expand Down Expand Up @@ -534,6 +538,7 @@ struct aws_s3_meta_request_options {

/**
* Invoked to report progress of the meta request execution.
* See `aws_s3_meta_request_progress_fn`.
*/
aws_s3_meta_request_progress_fn *progress_callback;

Expand Down
24 changes: 21 additions & 3 deletions source/s3_auto_ranged_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ static bool s_s3_auto_ranged_get_update(
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY);

request->part_range_start = 0;
request->part_range_end = meta_request->part_size - 1;
request->part_range_end = meta_request->part_size - 1; /* range-end is inclusive */
request->discovers_object_size = true;

++auto_ranged_get->synced_data.num_parts_requested;
Expand Down Expand Up @@ -317,6 +317,10 @@ static bool s_s3_auto_ranged_get_update(
}

no_work_remaining:
/* If some events are still being delivered to caller, then wait for those to finish */
if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) {
graebm marked this conversation as resolved.
Show resolved Hide resolved
work_remaining = true;
}
graebm marked this conversation as resolved.
Show resolved Hide resolved

if (!work_remaining) {
aws_s3_meta_request_set_success_synced(meta_request, s_s3_auto_ranged_get_success_status(meta_request));
Expand Down Expand Up @@ -494,7 +498,7 @@ static int s_discover_object_range_and_content_length(
* object range and total object size. Otherwise, the size and range should be equal to the
* total_content_length. */
if (!auto_ranged_get->initial_message_has_range_header) {
object_range_end = total_content_length - 1;
object_range_end = total_content_length - 1; /* range-end is inclusive */
} else if (aws_s3_parse_content_range_response_header(
meta_request->allocator,
request->send_data.response_headers,
Expand Down Expand Up @@ -553,7 +557,7 @@ static int s_discover_object_range_and_content_length(

/* When discovering the object size via first-part, the object range is the entire object. */
object_range_start = 0;
object_range_end = total_content_length - 1;
object_range_end = total_content_length - 1; /* range-end is inclusive */

result = AWS_OP_SUCCESS;
break;
Expand Down Expand Up @@ -695,6 +699,20 @@ static void s_s3_auto_ranged_get_request_finished(
}
++auto_ranged_get->synced_data.num_parts_successful;

/* Send progress_callback for delivery on io_event_loop thread */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = request->send_data.response_body.len;
if (auto_ranged_get->synced_data.object_range_empty) {
event.u.progress.info.content_length = 0;
} else {
/* Note that range-end is inclusive */
event.u.progress.info.content_length = auto_ranged_get->synced_data.object_range_end + 1 -
auto_ranged_get->synced_data.object_range_start;
}
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

aws_s3_meta_request_stream_response_body_synced(meta_request, request);

AWS_LOGF_DEBUG(
Expand Down
31 changes: 24 additions & 7 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,10 @@ static bool s_s3_auto_ranged_put_update(
work_remaining = true;

no_work_remaining:
/* If some events are still being delivered to caller, then wait for those to finish */
if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) {
work_remaining = true;
}

if (!work_remaining) {
aws_s3_meta_request_set_success_synced(meta_request, AWS_S3_RESPONSE_STATUS_SUCCESS);
Expand Down Expand Up @@ -1598,6 +1602,8 @@ static void s_s3_auto_ranged_put_request_finished(
AWS_LS_S3_META_REQUEST, "id=%p Failed to parse list parts response.", (void *)meta_request);
error_code = AWS_ERROR_S3_LIST_PARTS_PARSE_FAILED;
} else if (!has_more_results) {
uint64_t bytes_previously_uploaded = 0;

for (size_t part_index = 0;
part_index < aws_array_list_length(&auto_ranged_put->synced_data.part_list);
part_index++) {
Expand All @@ -1607,6 +1613,8 @@ static void s_s3_auto_ranged_put_request_finished(
/* Update the number of parts sent/completed previously */
++auto_ranged_put->synced_data.num_parts_started;
++auto_ranged_put->synced_data.num_parts_completed;

bytes_previously_uploaded += part->size;
}
}

Expand All @@ -1616,6 +1624,14 @@ static void s_s3_auto_ranged_put_request_finished(
(void *)meta_request,
auto_ranged_put->synced_data.num_parts_completed,
auto_ranged_put->total_num_parts_from_content_length);

/* Deliver an initial progress_callback to report all previously uploaded parts. */
if (meta_request->progress_callback != NULL && bytes_previously_uploaded > 0) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = bytes_previously_uploaded;
event.u.progress.info.content_length = auto_ranged_put->content_length;
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}
}
}

Expand Down Expand Up @@ -1742,13 +1758,6 @@ static void s_s3_auto_ranged_put_request_finished(
etag = aws_strip_quotes(meta_request->allocator, etag_within_quotes);
}
}
if (error_code == AWS_ERROR_SUCCESS && meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_progress progress = {
.bytes_transferred = request->request_body.len,
.content_length = auto_ranged_put->content_length,
};
meta_request->progress_callback(meta_request, &progress, meta_request->user_data);
}
}

/* BEGIN CRITICAL SECTION */
Expand Down Expand Up @@ -1782,6 +1791,14 @@ static void s_s3_auto_ranged_put_request_finished(

++auto_ranged_put->synced_data.num_parts_successful;

/* Send progress_callback for delivery on io_event_loop thread */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = request->request_body.len;
event.u.progress.info.content_length = auto_ranged_put->content_length;
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

/* Store part's ETag */
struct aws_s3_mpu_part_info *part = NULL;
aws_array_list_get_at(&auto_ranged_put->synced_data.part_list, &part, part_index);
Expand Down
10 changes: 4 additions & 6 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1481,8 +1481,7 @@ static void s_s3_client_prepare_callback_queue_request(
request_is_noop = request->is_noop != 0;
s_s3_client_meta_request_finished_request(client, meta_request, request, error_code);

aws_s3_request_release(request);
request = NULL;
request = aws_s3_request_release(request);
}

/* BEGIN CRITICAL SECTION */
Expand Down Expand Up @@ -1522,8 +1521,7 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {
if (!request->always_send && aws_s3_meta_request_has_finish_result(request->meta_request)) {
s_s3_client_meta_request_finished_request(client, request->meta_request, request, AWS_ERROR_S3_CANCELED);

aws_s3_request_release(request);
request = NULL;
request = aws_s3_request_release(request);
} else if (
s_s3_client_get_num_requests_network_io(client, request->meta_request->type) < max_active_connections) {
s_s3_client_create_connection_for_request(client, request);
Expand Down Expand Up @@ -1856,8 +1854,8 @@ void aws_s3_client_notify_connection_finished(
}

if (connection->request != NULL) {
aws_s3_request_release(connection->request);
connection->request = NULL;

connection->request = aws_s3_request_release(connection->request);
}

aws_retry_token_release(connection->retry_token);
Expand Down
24 changes: 20 additions & 4 deletions source/s3_copy_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,10 @@ static bool s_s3_copy_object_update(
work_remaining = true;

no_work_remaining:
/* If some events are still being delivered to caller, then wait for those to finish */
if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) {
work_remaining = true;
}

if (!work_remaining) {
aws_s3_meta_request_set_success_synced(meta_request, AWS_S3_RESPONSE_STATUS_SUCCESS);
Expand Down Expand Up @@ -439,6 +443,7 @@ static struct aws_future_void *s_s3_copy_object_prepare_request(struct aws_s3_re
case AWS_S3_COPY_OBJECT_REQUEST_TAG_MULTIPART_COPY: {
/* Create a new uploadPartCopy message to upload a part. */
/* compute sub-request range */
/* note that range-end is inclusive */
uint64_t range_start = (request->part_number - 1) * copy_object->synced_data.part_size;
uint64_t range_end = range_start + copy_object->synced_data.part_size - 1;
if (range_end >= copy_object->synced_data.content_length) {
Expand Down Expand Up @@ -640,6 +645,15 @@ static void s_s3_copy_object_request_finished(

/* Signals completion of the meta request */
if (error_code == AWS_ERROR_SUCCESS) {

/* Send progress_callback for delivery on io_event_loop thread */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = copy_object->synced_data.content_length;
event.u.progress.info.content_length = copy_object->synced_data.content_length;
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

copy_object->synced_data.copy_request_bypass_completed = true;
} else {
/* Bypassed CopyObject request failed */
Expand Down Expand Up @@ -720,11 +734,13 @@ static void s_s3_copy_object_request_finished(
AWS_ASSERT(etag != NULL);

++copy_object->synced_data.num_parts_successful;

/* Send progress_callback for delivery on io_event_loop thread. */
if (meta_request->progress_callback != NULL) {
struct aws_s3_meta_request_progress progress = {
.bytes_transferred = copy_object->synced_data.part_size,
.content_length = copy_object->synced_data.content_length};
meta_request->progress_callback(meta_request, &progress, meta_request->user_data);
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS};
event.u.progress.info.bytes_transferred = copy_object->synced_data.part_size;
event.u.progress.info.content_length = copy_object->synced_data.content_length;
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}

struct aws_s3_mpu_part_info *part = NULL;
Expand Down
Loading