diff --git a/include/aws/s3/private/s3_auto_ranged_get.h b/include/aws/s3/private/s3_auto_ranged_get.h index efebaf98e..01072a8a9 100644 --- a/include/aws/s3/private/s3_auto_ranged_get.h +++ b/include/aws/s3/private/s3_auto_ranged_get.h @@ -25,7 +25,9 @@ struct aws_s3_auto_ranged_get { uint64_t object_range_start; /* The last byte of the data that will be retrieved from the object. - * (ignore this if object_range_empty) */ + * (ignore this if object_range_empty) + * Note this is inclusive: https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests + * So if begin=0 and end=0 then 1 byte is being downloaded. */ uint64_t object_range_end; /* The total number of parts that are being used in downloading the object range. Note that "part" here diff --git a/include/aws/s3/private/s3_meta_request_impl.h b/include/aws/s3/private/s3_meta_request_impl.h index 214d28c54..13e0e5dea 100644 --- a/include/aws/s3/private/s3_meta_request_impl.h +++ b/include/aws/s3/private/s3_meta_request_impl.h @@ -54,6 +54,27 @@ struct aws_s3_prepare_request_payload { void *user_data; }; +/* An event to be delivered on the meta-request's io_event_loop thread. */ +struct aws_s3_meta_request_event { + enum aws_s3_meta_request_event_type { + AWS_S3_META_REQUEST_EVENT_RESPONSE_BODY, /* body_callback */ + AWS_S3_META_REQUEST_EVENT_PROGRESS, /* progress_callback */ + /* TODO: AWS_S3_META_REQUEST_EVENT_TELEMETRY */ + } type; + + union { + /* data for AWS_S3_META_REQUEST_EVENT_RESPONSE_BODY */ + struct { + struct aws_s3_request *completed_request; + } response_body; + + /* data for AWS_S3_META_REQUEST_EVENT_PROGRESS */ + struct { + struct aws_s3_meta_request_progress info; + } progress; + } u; +}; + struct aws_s3_meta_request_vtable { /* Update the meta request. out_request is required to be non-null. Returns true if there is any work in * progress, false if there is not. */ @@ -179,11 +200,19 @@ struct aws_s3_meta_request { * failed.)*/ uint32_t num_parts_delivery_completed; - /* Number of parts that have been successfully delivered to the caller. */ - uint32_t num_parts_delivery_succeeded; + /* Task for delivering events on the meta-request's io_event_loop thread. + * We do this to ensure a meta-request's callbacks are fired sequentially and non-overlapping. + * If `event_delivery_array` has items in it, then this task is scheduled. + * If `event_delivery_active` is true, then this task is actively running. + * Delivery is not 100% complete until `event_delivery_array` is empty AND `event_delivery_active` is false + * (use aws_s3_meta_request_are_events_out_for_delivery_synced() to check) */ + struct aws_task event_delivery_task; - /* Number of parts that have failed while trying to be delivered to the caller. */ - uint32_t num_parts_delivery_failed; + /* Array of `struct aws_s3_meta_request_event` to deliver when the `event_delivery_task` runs. */ + struct aws_array_list event_delivery_array; + + /* When true, events are actively being delivered to the user. */ + bool event_delivery_active; /* The end finish result of the meta request. */ struct aws_s3_meta_request_result finish_result; @@ -205,6 +234,14 @@ struct aws_s3_meta_request { } client_process_work_threaded_data; + /* Anything in this structure should only ever be accessed by the meta-request from its io_event_loop thread. */ + struct { + /* When delivering events, we swap contents with `synced_data.event_delivery_array`. + * This is an optimization, we could have just copied the array when the task runs, + * but swapping two array-lists back and forth avoids an allocation. */ + struct aws_array_list event_delivery_array; + } io_threaded_data; + const bool should_compute_content_md5; /* deep copy of the checksum config. */ @@ -316,6 +353,17 @@ void aws_s3_meta_request_stream_response_body_synced( struct aws_s3_meta_request *meta_request, struct aws_s3_request *request); +/* Add an event for delivery on the meta-request's io_event_loop thread. + * These events usually correspond to callbacks that must fire sequentially and non-overlapping, + * such as delivery of a part's response body. */ +void aws_s3_meta_request_add_event_for_delivery_synced( + struct aws_s3_meta_request *meta_request, + const struct aws_s3_meta_request_event *event); + +/* Returns whether any events are out for delivery. + * The meta-request's finish callback must not be invoked until this returns false. */ +bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_request *meta_request); + /* Asynchronously read from the meta request's input stream. Should always be done outside of any mutex, * as reading from the stream could cause user code to call back into aws-c-s3. * This will fill the buffer to capacity, unless end of stream is reached. diff --git a/include/aws/s3/private/s3_request.h b/include/aws/s3/private/s3_request.h index b88aa55ca..620077f62 100644 --- a/include/aws/s3/private/s3_request.h +++ b/include/aws/s3/private/s3_request.h @@ -224,10 +224,10 @@ AWS_S3_API void aws_s3_request_clean_up_send_data(struct aws_s3_request *request); AWS_S3_API -void aws_s3_request_acquire(struct aws_s3_request *request); +struct aws_s3_request *aws_s3_request_acquire(struct aws_s3_request *request); AWS_S3_API -void aws_s3_request_release(struct aws_s3_request *request); +struct aws_s3_request *aws_s3_request_release(struct aws_s3_request *request); AWS_S3_API struct aws_s3_request_metrics *aws_s3_request_metrics_new( diff --git a/include/aws/s3/private/s3_util.h b/include/aws/s3/private/s3_util.h index e95ba40bc..7dbcce5f2 100644 --- a/include/aws/s3/private/s3_util.h +++ b/include/aws/s3/private/s3_util.h @@ -24,7 +24,7 @@ #endif #define KB_TO_BYTES(kb) ((kb)*1024) #define MB_TO_BYTES(mb) ((mb)*1024 * 1024) -#define GB_TO_BYTES(gb) ((gb)*1024 * 1024 * 1024) +#define GB_TO_BYTES(gb) ((gb)*1024 * 1024 * 1024ULL) #define MS_TO_NS(ms) ((uint64_t)(ms)*1000000) #define SEC_TO_NS(ms) ((uint64_t)(ms)*1000000000) diff --git a/include/aws/s3/s3_client.h b/include/aws/s3/s3_client.h index 264e99558..9f7970a6a 100644 --- a/include/aws/s3/s3_client.h +++ b/include/aws/s3/s3_client.h @@ -177,7 +177,11 @@ struct aws_s3_meta_request_progress { }; /** - * Invoked to report progress of multi-part upload and copy object requests. + * Invoked to report progress of a meta-request. + * For PutObject, progress refers to bytes uploaded. + * For CopyObject, progress refers to bytes copied. + * For GetObject, progress refers to bytes downloaded. + * For anything else, progress refers to response body bytes received. */ typedef void(aws_s3_meta_request_progress_fn)( struct aws_s3_meta_request *meta_request, @@ -534,6 +538,7 @@ struct aws_s3_meta_request_options { /** * Invoked to report progress of the meta request execution. + * See `aws_s3_meta_request_progress_fn`. */ aws_s3_meta_request_progress_fn *progress_callback; diff --git a/source/s3_auto_ranged_get.c b/source/s3_auto_ranged_get.c index 10632104e..1a32cfb10 100644 --- a/source/s3_auto_ranged_get.c +++ b/source/s3_auto_ranged_get.c @@ -193,7 +193,7 @@ static bool s_s3_auto_ranged_get_update( AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY); request->part_range_start = 0; - request->part_range_end = meta_request->part_size - 1; + request->part_range_end = meta_request->part_size - 1; /* range-end is inclusive */ request->discovers_object_size = true; ++auto_ranged_get->synced_data.num_parts_requested; @@ -317,6 +317,10 @@ static bool s_s3_auto_ranged_get_update( } no_work_remaining: + /* If some events are still being delivered to caller, then wait for those to finish */ + if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) { + work_remaining = true; + } if (!work_remaining) { aws_s3_meta_request_set_success_synced(meta_request, s_s3_auto_ranged_get_success_status(meta_request)); @@ -494,7 +498,7 @@ static int s_discover_object_range_and_content_length( * object range and total object size. Otherwise, the size and range should be equal to the * total_content_length. */ if (!auto_ranged_get->initial_message_has_range_header) { - object_range_end = total_content_length - 1; + object_range_end = total_content_length - 1; /* range-end is inclusive */ } else if (aws_s3_parse_content_range_response_header( meta_request->allocator, request->send_data.response_headers, @@ -553,7 +557,7 @@ static int s_discover_object_range_and_content_length( /* When discovering the object size via first-part, the object range is the entire object. */ object_range_start = 0; - object_range_end = total_content_length - 1; + object_range_end = total_content_length - 1; /* range-end is inclusive */ result = AWS_OP_SUCCESS; break; @@ -695,6 +699,20 @@ static void s_s3_auto_ranged_get_request_finished( } ++auto_ranged_get->synced_data.num_parts_successful; + /* Send progress_callback for delivery on io_event_loop thread */ + if (meta_request->progress_callback != NULL) { + struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS}; + event.u.progress.info.bytes_transferred = request->send_data.response_body.len; + if (auto_ranged_get->synced_data.object_range_empty) { + event.u.progress.info.content_length = 0; + } else { + /* Note that range-end is inclusive */ + event.u.progress.info.content_length = auto_ranged_get->synced_data.object_range_end + 1 - + auto_ranged_get->synced_data.object_range_start; + } + aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event); + } + aws_s3_meta_request_stream_response_body_synced(meta_request, request); AWS_LOGF_DEBUG( diff --git a/source/s3_auto_ranged_put.c b/source/s3_auto_ranged_put.c index 11e24e03a..e959c789e 100644 --- a/source/s3_auto_ranged_put.c +++ b/source/s3_auto_ranged_put.c @@ -693,6 +693,10 @@ static bool s_s3_auto_ranged_put_update( work_remaining = true; no_work_remaining: + /* If some events are still being delivered to caller, then wait for those to finish */ + if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) { + work_remaining = true; + } if (!work_remaining) { aws_s3_meta_request_set_success_synced(meta_request, AWS_S3_RESPONSE_STATUS_SUCCESS); @@ -1598,6 +1602,8 @@ static void s_s3_auto_ranged_put_request_finished( AWS_LS_S3_META_REQUEST, "id=%p Failed to parse list parts response.", (void *)meta_request); error_code = AWS_ERROR_S3_LIST_PARTS_PARSE_FAILED; } else if (!has_more_results) { + uint64_t bytes_previously_uploaded = 0; + for (size_t part_index = 0; part_index < aws_array_list_length(&auto_ranged_put->synced_data.part_list); part_index++) { @@ -1607,6 +1613,8 @@ static void s_s3_auto_ranged_put_request_finished( /* Update the number of parts sent/completed previously */ ++auto_ranged_put->synced_data.num_parts_started; ++auto_ranged_put->synced_data.num_parts_completed; + + bytes_previously_uploaded += part->size; } } @@ -1616,6 +1624,14 @@ static void s_s3_auto_ranged_put_request_finished( (void *)meta_request, auto_ranged_put->synced_data.num_parts_completed, auto_ranged_put->total_num_parts_from_content_length); + + /* Deliver an initial progress_callback to report all previously uploaded parts. */ + if (meta_request->progress_callback != NULL && bytes_previously_uploaded > 0) { + struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS}; + event.u.progress.info.bytes_transferred = bytes_previously_uploaded; + event.u.progress.info.content_length = auto_ranged_put->content_length; + aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event); + } } } @@ -1742,13 +1758,6 @@ static void s_s3_auto_ranged_put_request_finished( etag = aws_strip_quotes(meta_request->allocator, etag_within_quotes); } } - if (error_code == AWS_ERROR_SUCCESS && meta_request->progress_callback != NULL) { - struct aws_s3_meta_request_progress progress = { - .bytes_transferred = request->request_body.len, - .content_length = auto_ranged_put->content_length, - }; - meta_request->progress_callback(meta_request, &progress, meta_request->user_data); - } } /* BEGIN CRITICAL SECTION */ @@ -1782,6 +1791,14 @@ static void s_s3_auto_ranged_put_request_finished( ++auto_ranged_put->synced_data.num_parts_successful; + /* Send progress_callback for delivery on io_event_loop thread */ + if (meta_request->progress_callback != NULL) { + struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS}; + event.u.progress.info.bytes_transferred = request->request_body.len; + event.u.progress.info.content_length = auto_ranged_put->content_length; + aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event); + } + /* Store part's ETag */ struct aws_s3_mpu_part_info *part = NULL; aws_array_list_get_at(&auto_ranged_put->synced_data.part_list, &part, part_index); diff --git a/source/s3_client.c b/source/s3_client.c index cbd98b8dd..abe9b6e44 100644 --- a/source/s3_client.c +++ b/source/s3_client.c @@ -1481,8 +1481,7 @@ static void s_s3_client_prepare_callback_queue_request( request_is_noop = request->is_noop != 0; s_s3_client_meta_request_finished_request(client, meta_request, request, error_code); - aws_s3_request_release(request); - request = NULL; + request = aws_s3_request_release(request); } /* BEGIN CRITICAL SECTION */ @@ -1522,8 +1521,7 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) { if (!request->always_send && aws_s3_meta_request_has_finish_result(request->meta_request)) { s_s3_client_meta_request_finished_request(client, request->meta_request, request, AWS_ERROR_S3_CANCELED); - aws_s3_request_release(request); - request = NULL; + request = aws_s3_request_release(request); } else if ( s_s3_client_get_num_requests_network_io(client, request->meta_request->type) < max_active_connections) { s_s3_client_create_connection_for_request(client, request); @@ -1856,8 +1854,8 @@ void aws_s3_client_notify_connection_finished( } if (connection->request != NULL) { - aws_s3_request_release(connection->request); - connection->request = NULL; + + connection->request = aws_s3_request_release(connection->request); } aws_retry_token_release(connection->retry_token); diff --git a/source/s3_copy_object.c b/source/s3_copy_object.c index cc19e3f49..2f70ac407 100644 --- a/source/s3_copy_object.c +++ b/source/s3_copy_object.c @@ -336,6 +336,10 @@ static bool s_s3_copy_object_update( work_remaining = true; no_work_remaining: + /* If some events are still being delivered to caller, then wait for those to finish */ + if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) { + work_remaining = true; + } if (!work_remaining) { aws_s3_meta_request_set_success_synced(meta_request, AWS_S3_RESPONSE_STATUS_SUCCESS); @@ -439,6 +443,7 @@ static struct aws_future_void *s_s3_copy_object_prepare_request(struct aws_s3_re case AWS_S3_COPY_OBJECT_REQUEST_TAG_MULTIPART_COPY: { /* Create a new uploadPartCopy message to upload a part. */ /* compute sub-request range */ + /* note that range-end is inclusive */ uint64_t range_start = (request->part_number - 1) * copy_object->synced_data.part_size; uint64_t range_end = range_start + copy_object->synced_data.part_size - 1; if (range_end >= copy_object->synced_data.content_length) { @@ -640,6 +645,15 @@ static void s_s3_copy_object_request_finished( /* Signals completion of the meta request */ if (error_code == AWS_ERROR_SUCCESS) { + + /* Send progress_callback for delivery on io_event_loop thread */ + if (meta_request->progress_callback != NULL) { + struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS}; + event.u.progress.info.bytes_transferred = copy_object->synced_data.content_length; + event.u.progress.info.content_length = copy_object->synced_data.content_length; + aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event); + } + copy_object->synced_data.copy_request_bypass_completed = true; } else { /* Bypassed CopyObject request failed */ @@ -720,11 +734,13 @@ static void s_s3_copy_object_request_finished( AWS_ASSERT(etag != NULL); ++copy_object->synced_data.num_parts_successful; + + /* Send progress_callback for delivery on io_event_loop thread. */ if (meta_request->progress_callback != NULL) { - struct aws_s3_meta_request_progress progress = { - .bytes_transferred = copy_object->synced_data.part_size, - .content_length = copy_object->synced_data.content_length}; - meta_request->progress_callback(meta_request, &progress, meta_request->user_data); + struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS}; + event.u.progress.info.bytes_transferred = copy_object->synced_data.part_size; + event.u.progress.info.content_length = copy_object->synced_data.content_length; + aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event); } struct aws_s3_mpu_part_info *part = NULL; diff --git a/source/s3_default_meta_request.c b/source/s3_default_meta_request.c index a94c3c39e..e12a2f0e8 100644 --- a/source/s3_default_meta_request.c +++ b/source/s3_default_meta_request.c @@ -192,6 +192,10 @@ static bool s_s3_meta_request_default_update( work_remaining = true; no_work_remaining: + /* If some events are still being delivered to caller, then wait for those to finish */ + if (!work_remaining && aws_s3_meta_request_are_events_out_for_delivery_synced(meta_request)) { + work_remaining = true; + } if (!work_remaining) { aws_s3_meta_request_set_success_synced( @@ -366,6 +370,25 @@ static void s_s3_meta_request_default_request_finished( meta_request_default->synced_data.request_error_code = error_code; if (error_code == AWS_ERROR_SUCCESS) { + /* Send progress_callback for delivery on io_event_loop thread. + * For default meta-requests, we invoke the progress_callback once, after the sole HTTP request completes. + * This is simpler than reporting incremental progress as the response body is received, + * or the request body is streamed out, since then we'd also need to handle retries that reset + * progress back to 0% (our existing API only lets us report forward progress). */ + if (meta_request->progress_callback != NULL) { + struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_PROGRESS}; + if (meta_request->type == AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) { + /* For uploads, report request body size */ + event.u.progress.info.bytes_transferred = request->request_body.len; + event.u.progress.info.content_length = request->request_body.len; + } else { + /* For anything else, report response body size */ + event.u.progress.info.bytes_transferred = request->send_data.response_body.len; + event.u.progress.info.content_length = request->send_data.response_body.len; + } + aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event); + } + aws_s3_meta_request_stream_response_body_synced(meta_request, request); } else { aws_s3_meta_request_set_fail_synced(meta_request, request, error_code); diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index 23e426c0e..9b9761116 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -25,6 +25,7 @@ static const size_t s_dynamic_body_initial_buf_size = KB_TO_BYTES(1); static const size_t s_default_body_streaming_priority_queue_size = 16; +static const size_t s_default_event_delivery_array_size = 16; static int s_s3_request_priority_queue_pred(const void *a, const void *b); static void s_s3_meta_request_destroy(void *user_data); @@ -223,6 +224,18 @@ int aws_s3_meta_request_init_base( goto error; } + aws_array_list_init_dynamic( + &meta_request->synced_data.event_delivery_array, + meta_request->allocator, + s_default_event_delivery_array_size, + sizeof(struct aws_s3_meta_request_event)); + + aws_array_list_init_dynamic( + &meta_request->io_threaded_data.event_delivery_array, + meta_request->allocator, + s_default_event_delivery_array_size, + sizeof(struct aws_s3_meta_request_event)); + *((size_t *)&meta_request->part_size) = part_size; *((bool *)&meta_request->should_compute_content_md5) = should_compute_content_md5; checksum_config_init(&meta_request->checksum_config, options->checksum_config); @@ -353,6 +366,16 @@ void aws_s3_meta_request_set_fail_synced( AWS_PRECONDITION(meta_request); ASSERT_SYNCED_DATA_LOCK_HELD(meta_request); + /* Protect against bugs */ + if (error_code == AWS_ERROR_SUCCESS) { + AWS_ASSERT(false); + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, + "id=%p Meta request failed but error code not set, AWS_ERROR_UNKNOWN will be reported", + (void *)meta_request); + error_code = AWS_ERROR_UNKNOWN; + } + if (meta_request->synced_data.finish_result_set) { return; } @@ -447,7 +470,15 @@ static void s_s3_meta_request_destroy(void *user_data) { aws_s3_endpoint_release(meta_request->endpoint); meta_request->client = aws_s3_client_release(meta_request->client); + AWS_ASSERT(aws_priority_queue_size(&meta_request->synced_data.pending_body_streaming_requests) == 0); aws_priority_queue_clean_up(&meta_request->synced_data.pending_body_streaming_requests); + + AWS_ASSERT(aws_array_list_length(&meta_request->synced_data.event_delivery_array) == 0); + aws_array_list_clean_up(&meta_request->synced_data.event_delivery_array); + + AWS_ASSERT(aws_array_list_length(&meta_request->io_threaded_data.event_delivery_array) == 0); + aws_array_list_clean_up(&meta_request->io_threaded_data.event_delivery_array); + aws_s3_meta_request_result_clean_up(meta_request, &meta_request->synced_data.finish_result); if (meta_request->vtable != NULL) { @@ -1309,12 +1340,6 @@ void aws_s3_meta_request_finished_request( meta_request->vtable->finished_request(meta_request, request, error_code); } -struct s3_stream_response_body_payload { - struct aws_s3_meta_request *meta_request; - struct aws_linked_list requests; - struct aws_task task; -}; - /* Pushes a request into the body streaming priority queue. Derived meta request types should not call this--they * should instead call aws_s3_meta_request_stream_response_body_synced.*/ static void s_s3_meta_request_body_streaming_push_synced( @@ -1327,19 +1352,17 @@ static void s_s3_meta_request_body_streaming_push_synced( static struct aws_s3_request *s_s3_meta_request_body_streaming_pop_next_synced( struct aws_s3_meta_request *meta_request); -static void s_s3_meta_request_body_streaming_task(struct aws_task *task, void *arg, enum aws_task_status task_status); +static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *arg, enum aws_task_status task_status); void aws_s3_meta_request_stream_response_body_synced( struct aws_s3_meta_request *meta_request, struct aws_s3_request *request) { + ASSERT_SYNCED_DATA_LOCK_HELD(meta_request); AWS_PRECONDITION(meta_request); AWS_PRECONDITION(request); AWS_PRECONDITION(request->part_number > 0); - struct aws_linked_list streaming_requests; - aws_linked_list_init(&streaming_requests); - /* Push it into the priority queue. */ s_s3_meta_request_body_streaming_push_synced(meta_request, request); @@ -1347,49 +1370,62 @@ void aws_s3_meta_request_stream_response_body_synced( AWS_PRECONDITION(client); aws_atomic_fetch_add(&client->stats.num_requests_stream_queued_waiting, 1); - /* Grab the next request that can be streamed back to the caller. */ - struct aws_s3_request *next_streaming_request = s_s3_meta_request_body_streaming_pop_next_synced(meta_request); + /* Grab any requests that can be streamed back to the caller + * and send them for delivery on io_event_loop thread. */ uint32_t num_streaming_requests = 0; + struct aws_s3_request *next_streaming_request; + while ((next_streaming_request = s_s3_meta_request_body_streaming_pop_next_synced(meta_request)) != NULL) { + struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_RESPONSE_BODY}; + event.u.response_body.completed_request = next_streaming_request; + aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event); - /* Grab any additional requests that could be streamed to the caller. */ - while (next_streaming_request != NULL) { - aws_atomic_fetch_sub(&client->stats.num_requests_stream_queued_waiting, 1); - - aws_linked_list_push_back(&streaming_requests, &next_streaming_request->node); ++num_streaming_requests; - next_streaming_request = s_s3_meta_request_body_streaming_pop_next_synced(meta_request); } - if (aws_linked_list_empty(&streaming_requests)) { + if (num_streaming_requests == 0) { return; } aws_atomic_fetch_add(&client->stats.num_requests_streaming, num_streaming_requests); + aws_atomic_fetch_sub(&client->stats.num_requests_stream_queued_waiting, num_streaming_requests); meta_request->synced_data.num_parts_delivery_sent += num_streaming_requests; +} - struct s3_stream_response_body_payload *payload = - aws_mem_calloc(client->allocator, 1, sizeof(struct s3_stream_response_body_payload)); +void aws_s3_meta_request_add_event_for_delivery_synced( + struct aws_s3_meta_request *meta_request, + const struct aws_s3_meta_request_event *event) { - aws_s3_meta_request_acquire(meta_request); - payload->meta_request = meta_request; + ASSERT_SYNCED_DATA_LOCK_HELD(meta_request); - aws_linked_list_init(&payload->requests); - aws_linked_list_swap_contents(&payload->requests, &streaming_requests); + aws_array_list_push_back(&meta_request->synced_data.event_delivery_array, event); - aws_task_init( - &payload->task, s_s3_meta_request_body_streaming_task, payload, "s_s3_meta_request_body_streaming_task"); - aws_event_loop_schedule_task_now(meta_request->io_event_loop, &payload->task); + /* If the array was empty before, schedule task to deliver all events in the array. + * If the array already had things in it, then the task is already scheduled and will run soon. */ + if (aws_array_list_length(&meta_request->synced_data.event_delivery_array) == 1) { + aws_s3_meta_request_acquire(meta_request); + + aws_task_init( + &meta_request->synced_data.event_delivery_task, + s_s3_meta_request_event_delivery_task, + meta_request, + "s3_meta_request_event_delivery"); + aws_event_loop_schedule_task_now(meta_request->io_event_loop, &meta_request->synced_data.event_delivery_task); + } +} + +bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_request *meta_request) { + ASSERT_SYNCED_DATA_LOCK_HELD(meta_request); + return aws_array_list_length(&meta_request->synced_data.event_delivery_array) > 0 || + meta_request->synced_data.event_delivery_active; } -static void s_s3_meta_request_body_streaming_task(struct aws_task *task, void *arg, enum aws_task_status task_status) { +/* Deliver events in event_delivery_array. + * This task runs on the meta-request's io_event_loop thread. */ +static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *arg, enum aws_task_status task_status) { (void)task; (void)task_status; - - struct s3_stream_response_body_payload *payload = arg; - AWS_PRECONDITION(payload); - - struct aws_s3_meta_request *meta_request = payload->meta_request; + struct aws_s3_meta_request *meta_request = arg; AWS_PRECONDITION(meta_request); AWS_PRECONDITION(meta_request->vtable); @@ -1399,40 +1435,98 @@ static void s_s3_meta_request_body_streaming_task(struct aws_task *task, void *a /* Client owns this event loop group. A cancel should not be possible. */ AWS_ASSERT(task_status == AWS_TASK_STATUS_RUN_READY); - struct aws_linked_list completed_requests; - aws_linked_list_init(&completed_requests); + /* Swap contents of synced_data.event_delivery_array into this pre-allocated array-list, then process events */ + struct aws_array_list *event_delivery_array = &meta_request->io_threaded_data.event_delivery_array; + AWS_FATAL_ASSERT(aws_array_list_length(event_delivery_array) == 0); + /* If an error occurs, don't fire callbacks anymore. */ int error_code = AWS_ERROR_SUCCESS; - uint32_t num_successful = 0; - uint32_t num_failed = 0; + uint32_t num_parts_delivered = 0; - while (!aws_linked_list_empty(&payload->requests)) { - struct aws_linked_list_node *request_node = aws_linked_list_pop_front(&payload->requests); - struct aws_s3_request *request = AWS_CONTAINER_OF(request_node, struct aws_s3_request, node); - AWS_ASSERT(meta_request == request->meta_request); - struct aws_byte_cursor body_buffer_byte_cursor = aws_byte_cursor_from_buf(&request->send_data.response_body); + /* BEGIN CRITICAL SECTION */ + { + aws_s3_meta_request_lock_synced_data(meta_request); - AWS_ASSERT(request->part_number >= 1); + aws_array_list_swap_contents(event_delivery_array, &meta_request->synced_data.event_delivery_array); + meta_request->synced_data.event_delivery_active = true; - if (aws_s3_meta_request_has_finish_result(meta_request)) { - ++num_failed; - } else { - if (body_buffer_byte_cursor.len > 0 && error_code == AWS_ERROR_SUCCESS && meta_request->body_callback && - meta_request->body_callback( - meta_request, &body_buffer_byte_cursor, request->part_range_start, meta_request->user_data)) { - error_code = aws_last_error_or_unknown(); - } - - if (error_code == AWS_ERROR_SUCCESS) { - ++num_successful; - } else { - ++num_failed; - } + if (aws_s3_meta_request_has_finish_result_synced(meta_request)) { + error_code = AWS_ERROR_S3_CANCELED; } - aws_atomic_fetch_sub(&client->stats.num_requests_streaming, 1); - aws_s3_request_release(request); + aws_s3_meta_request_unlock_synced_data(meta_request); + } + /* END CRITICAL SECTION */ + + /* Deliver all events */ + for (size_t event_i = 0; event_i < aws_array_list_length(event_delivery_array); ++event_i) { + struct aws_s3_meta_request_event event; + aws_array_list_get_at(event_delivery_array, &event, event_i); + switch (event.type) { + + case AWS_S3_META_REQUEST_EVENT_RESPONSE_BODY: { + struct aws_s3_request *request = event.u.response_body.completed_request; + AWS_ASSERT(meta_request == request->meta_request); + struct aws_byte_cursor response_body = aws_byte_cursor_from_buf(&request->send_data.response_body); + + AWS_ASSERT(request->part_number >= 1); + + if (error_code == AWS_ERROR_SUCCESS && response_body.len > 0 && meta_request->body_callback != NULL) { + if (meta_request->body_callback( + meta_request, &response_body, request->part_range_start, meta_request->user_data)) { + + error_code = aws_last_error_or_unknown(); + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, + "id=%p Response body callback raised error %d (%s).", + (void *)meta_request, + error_code, + aws_error_str(error_code)); + } + } + + ++num_parts_delivered; + aws_s3_request_release(request); + } break; + + case AWS_S3_META_REQUEST_EVENT_PROGRESS: { + if (error_code == AWS_ERROR_SUCCESS && meta_request->progress_callback != NULL) { + /* Don't report 0 byte progress events. + * The reasoning behind this is: + * + * In some code paths, when no data is transferred, there are no progress events, + * but in other code paths there might be one progress event of 0 bytes. + * We want to be consistent, either: + * - REPORT AT LEAST ONCE: even if no data is being transferred. + * This would require finding every code path where no progress events are sent, + * and sending an appropriate progress event, even if it's for 0 bytes. + * One example of ending early is: when resuming a paused upload, + * we do ListParts on the UploadID, and if that 404s we assume the + * previous "paused" meta-request actually completed, + * and so we immediately end the "resuming" meta-request + * as successful without sending any further HTTP requests. + * It would be tough to accurately report progress here because + * we don't know the total size, since we never read the request body, + * and didn't get any info about the previous upload. + * OR + * - NEVER REPORT ZERO BYTES: even if that means no progress events at all. + * This is easy to do. We'd only send progress events when data is transferred, + * and if a 0 byte event slips through somehow, just check before firing the callback. + * Since the NEVER REPORT ZERO BYTES path is simpler to implement, we went with that. */ + if (event.u.progress.info.bytes_transferred > 0) { + meta_request->progress_callback(meta_request, &event.u.progress.info, meta_request->user_data); + } + } + } break; + + default: + AWS_FATAL_ASSERT(false); + } } + + /* Done delivering events */ + aws_array_list_clear(event_delivery_array); + /* BEGIN CRITICAL SECTION */ { aws_s3_meta_request_lock_synced_data(meta_request); @@ -1440,14 +1534,11 @@ static void s_s3_meta_request_body_streaming_task(struct aws_task *task, void *a aws_s3_meta_request_set_fail_synced(meta_request, NULL, error_code); } - meta_request->synced_data.num_parts_delivery_completed += (num_failed + num_successful); - meta_request->synced_data.num_parts_delivery_failed += num_failed; - meta_request->synced_data.num_parts_delivery_succeeded += num_successful; + meta_request->synced_data.num_parts_delivery_completed += num_parts_delivered; + meta_request->synced_data.event_delivery_active = false; aws_s3_meta_request_unlock_synced_data(meta_request); } /* END CRITICAL SECTION */ - aws_mem_release(client->allocator, payload); - payload = NULL; aws_s3_client_schedule_process_work(client); aws_s3_meta_request_release(meta_request); diff --git a/source/s3_request.c b/source/s3_request.c index e91229dbf..38e184d5a 100644 --- a/source/s3_request.c +++ b/source/s3_request.c @@ -102,18 +102,18 @@ void aws_s3_request_clean_up_send_data(struct aws_s3_request *request) { AWS_ZERO_STRUCT(request->send_data); } -void aws_s3_request_acquire(struct aws_s3_request *request) { - AWS_PRECONDITION(request); - - aws_ref_count_acquire(&request->ref_count); +struct aws_s3_request *aws_s3_request_acquire(struct aws_s3_request *request) { + if (request != NULL) { + aws_ref_count_acquire(&request->ref_count); + } + return request; } -void aws_s3_request_release(struct aws_s3_request *request) { - if (request == NULL) { - return; +struct aws_s3_request *aws_s3_request_release(struct aws_s3_request *request) { + if (request != NULL) { + aws_ref_count_release(&request->ref_count); } - - aws_ref_count_release(&request->ref_count); + return NULL; } static void s_s3_request_destroy(void *user_data) { diff --git a/source/s3_util.c b/source/s3_util.c index dec258a29..9621ab1f6 100644 --- a/source/s3_util.c +++ b/source/s3_util.c @@ -470,7 +470,7 @@ uint32_t aws_s3_get_num_parts(size_t part_size, uint64_t object_range_start, uin /* If the range has room for a second part, calculate the additional amount of parts. */ if (second_part_start <= object_range_end) { - uint64_t aligned_range_remainder = object_range_end + 1 - second_part_start; + uint64_t aligned_range_remainder = object_range_end + 1 - second_part_start; /* range-end is inclusive */ num_parts += (uint32_t)(aligned_range_remainder / (uint64_t)part_size); if ((aligned_range_remainder % part_size) > 0) { @@ -515,7 +515,7 @@ void aws_s3_get_part_range( /* Else, find the next part by adding the object range + total number of whole parts before this one + initial * part size*/ *out_part_range_start = object_range_start + ((uint64_t)(part_index - 1)) * part_size_uint64 + first_part_size; - *out_part_range_end = *out_part_range_start + part_size_uint64 - 1; + *out_part_range_end = *out_part_range_start + part_size_uint64 - 1; /* range-end is inclusive */ } /* Cap the part's range end using the object's range end. */ diff --git a/tests/s3_data_plane_tests.c b/tests/s3_data_plane_tests.c index 2cb2d2686..2d168ed72 100644 --- a/tests/s3_data_plane_tests.c +++ b/tests/s3_data_plane_tests.c @@ -2207,8 +2207,6 @@ static int s_test_s3_put_object_async_singlepart(struct aws_allocator *allocator }; ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(NULL, &put_options, &test_results)); - ASSERT_UINT_EQUALS(MB_TO_BYTES(4), aws_atomic_load_int(&test_results.total_bytes_uploaded)); - aws_s3_meta_request_test_results_clean_up(&test_results); return 0; } @@ -2232,8 +2230,6 @@ static int s_test_s3_put_object_async_multipart(struct aws_allocator *allocator, }; ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(NULL, &put_options, &test_results)); - ASSERT_UINT_EQUALS(MB_TO_BYTES(16), aws_atomic_load_int(&test_results.total_bytes_uploaded)); - aws_s3_meta_request_test_results_clean_up(&test_results); return 0; } @@ -2260,8 +2256,6 @@ static int s_test_s3_put_object_async_read_completes_synchronously(struct aws_al }; ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(NULL, &put_options, &test_results)); - ASSERT_UINT_EQUALS(MB_TO_BYTES(10), aws_atomic_load_int(&test_results.total_bytes_uploaded)); - aws_s3_meta_request_test_results_clean_up(&test_results); return 0; } @@ -2286,8 +2280,6 @@ static int s_test_s3_put_object_async_small_reads(struct aws_allocator *allocato }; ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(NULL, &put_options, &test_results)); - ASSERT_UINT_EQUALS(MB_TO_BYTES(10), aws_atomic_load_int(&test_results.total_bytes_uploaded)); - aws_s3_meta_request_test_results_clean_up(&test_results); return 0; } @@ -2311,8 +2303,6 @@ static int s_test_s3_put_object_small_reads(struct aws_allocator *allocator, voi }; ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(NULL, &put_options, &test_results)); - ASSERT_UINT_EQUALS(MB_TO_BYTES(10), aws_atomic_load_int(&test_results.total_bytes_uploaded)); - aws_s3_meta_request_test_results_clean_up(&test_results); return 0; } @@ -2339,8 +2329,6 @@ static int s_test_s3_put_object_async_no_content_length_partial_part(struct aws_ }; ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(NULL, &put_options, &test_results)); - ASSERT_UINT_EQUALS(MB_TO_BYTES(3), aws_atomic_load_int(&test_results.total_bytes_uploaded)); - aws_s3_meta_request_test_results_clean_up(&test_results); return 0; } @@ -2365,8 +2353,6 @@ static int s_test_s3_put_object_async_no_content_length_1part(struct aws_allocat }; ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(NULL, &put_options, &test_results)); - ASSERT_UINT_EQUALS(MB_TO_BYTES(8), aws_atomic_load_int(&test_results.total_bytes_uploaded)); - aws_s3_meta_request_test_results_clean_up(&test_results); return 0; } @@ -2395,8 +2381,6 @@ static int s_test_s3_put_object_async_no_content_length_empty_part2(struct aws_a }; ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(NULL, &put_options, &test_results)); - ASSERT_UINT_EQUALS(MB_TO_BYTES(8), aws_atomic_load_int(&test_results.total_bytes_uploaded)); - aws_s3_meta_request_test_results_clean_up(&test_results); return 0; } @@ -2421,8 +2405,6 @@ static int s_test_s3_put_object_async_no_content_length_2parts(struct aws_alloca }; ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(NULL, &put_options, &test_results)); - ASSERT_UINT_EQUALS(MB_TO_BYTES(16), aws_atomic_load_int(&test_results.total_bytes_uploaded)); - aws_s3_meta_request_test_results_clean_up(&test_results); return 0; } @@ -5289,6 +5271,8 @@ struct copy_object_test_data { bool headers_callback_was_invoked; int meta_request_error_code; int response_status_code; + uint64_t progress_callback_content_length; + uint64_t progress_callback_total_bytes_transferred; }; static void s_copy_object_meta_request_finish( @@ -5336,6 +5320,20 @@ static int s_copy_object_meta_request_headers_callback( return AWS_OP_SUCCESS; } +static void s_copy_object_meta_request_progress_callback( + struct aws_s3_meta_request *meta_request, + const struct aws_s3_meta_request_progress *progress, + void *user_data) { + + (void)meta_request; + struct copy_object_test_data *test_data = user_data; + + aws_mutex_lock(&test_data->mutex); + test_data->progress_callback_content_length = progress->content_length; + test_data->progress_callback_total_bytes_transferred += progress->bytes_transferred; + aws_mutex_unlock(&test_data->mutex); +} + static bool s_copy_test_completion_predicate(void *arg) { struct copy_object_test_data *test_data = arg; return test_data->execution_completed; @@ -5346,7 +5344,8 @@ static int s_test_s3_copy_object_from_x_amz_copy_source( struct aws_byte_cursor x_amz_copy_source, struct aws_byte_cursor destination_key, int expected_error_code, - int expected_response_status) { + int expected_response_status, + uint64_t expected_size) { struct aws_s3_tester tester; AWS_ZERO_STRUCT(tester); @@ -5387,6 +5386,7 @@ static int s_test_s3_copy_object_from_x_amz_copy_source( .signing_config = client_config.signing_config, .finish_callback = s_copy_object_meta_request_finish, .headers_callback = s_copy_object_meta_request_headers_callback, + .progress_callback = s_copy_object_meta_request_progress_callback, .message = message, .shutdown_callback = NULL, .type = AWS_S3_META_REQUEST_TYPE_COPY_OBJECT, @@ -5404,6 +5404,12 @@ static int s_test_s3_copy_object_from_x_amz_copy_source( ASSERT_INT_EQUALS(expected_error_code, test_data.meta_request_error_code); ASSERT_INT_EQUALS(expected_response_status, test_data.response_status_code); + /* assert that progress_callback matches the expected size*/ + if (test_data.meta_request_error_code == AWS_ERROR_SUCCESS) { + ASSERT_UINT_EQUALS(expected_size, test_data.progress_callback_total_bytes_transferred); + ASSERT_UINT_EQUALS(expected_size, test_data.progress_callback_content_length); + } + /* assert headers callback was invoked */ ASSERT_TRUE(test_data.headers_callback_was_invoked); @@ -5422,7 +5428,8 @@ static int s_test_s3_copy_object_helper( struct aws_byte_cursor source_key, struct aws_byte_cursor destination_key, int expected_error_code, - int expected_response_status) { + int expected_response_status, + uint64_t expected_size) { struct aws_byte_cursor source_bucket = g_test_bucket_name; @@ -5439,7 +5446,7 @@ static int s_test_s3_copy_object_helper( struct aws_byte_cursor x_amz_copy_source = aws_byte_cursor_from_c_str(copy_source_value); return s_test_s3_copy_object_from_x_amz_copy_source( - allocator, x_amz_copy_source, destination_key, expected_error_code, expected_response_status); + allocator, x_amz_copy_source, destination_key, expected_error_code, expected_response_status, expected_size); } AWS_TEST_CASE(test_s3_copy_small_object, s_test_s3_copy_small_object) @@ -5449,7 +5456,7 @@ static int s_test_s3_copy_small_object(struct aws_allocator *allocator, void *ct struct aws_byte_cursor source_key = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("pre-existing-1MB"); struct aws_byte_cursor destination_key = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("copies/destination_1MB"); return s_test_s3_copy_object_helper( - allocator, source_key, destination_key, AWS_ERROR_SUCCESS, AWS_HTTP_STATUS_CODE_200_OK); + allocator, source_key, destination_key, AWS_ERROR_SUCCESS, AWS_HTTP_STATUS_CODE_200_OK, MB_TO_BYTES(1)); } AWS_TEST_CASE(test_s3_copy_small_object_special_char, s_test_s3_copy_small_object_special_char) @@ -5460,7 +5467,7 @@ static int s_test_s3_copy_small_object_special_char(struct aws_allocator *alloca struct aws_byte_cursor destination_key = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("copies/destination_1MB_@"); return s_test_s3_copy_object_helper( - allocator, source_key, destination_key, AWS_ERROR_SUCCESS, AWS_HTTP_STATUS_CODE_200_OK); + allocator, source_key, destination_key, AWS_ERROR_SUCCESS, AWS_HTTP_STATUS_CODE_200_OK, MB_TO_BYTES(1)); } AWS_TEST_CASE(test_s3_multipart_copy_large_object_special_char, s_test_s3_multipart_copy_large_object_special_char) @@ -5471,7 +5478,7 @@ static int s_test_s3_multipart_copy_large_object_special_char(struct aws_allocat struct aws_byte_cursor destination_key = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("copies/destination_2GB-@"); return s_test_s3_copy_object_helper( - allocator, source_key, destination_key, AWS_ERROR_SUCCESS, AWS_HTTP_STATUS_CODE_200_OK); + allocator, source_key, destination_key, AWS_ERROR_SUCCESS, AWS_HTTP_STATUS_CODE_200_OK, GB_TO_BYTES(2)); } AWS_TEST_CASE(test_s3_multipart_copy_large_object, s_test_s3_multipart_copy_large_object) @@ -5481,7 +5488,7 @@ static int s_test_s3_multipart_copy_large_object(struct aws_allocator *allocator struct aws_byte_cursor source_key = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("pre-existing-2GB"); struct aws_byte_cursor destination_key = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("copies/destination_2GB"); return s_test_s3_copy_object_helper( - allocator, source_key, destination_key, AWS_ERROR_SUCCESS, AWS_HTTP_STATUS_CODE_200_OK); + allocator, source_key, destination_key, AWS_ERROR_SUCCESS, AWS_HTTP_STATUS_CODE_200_OK, GB_TO_BYTES(2)); } AWS_TEST_CASE(test_s3_copy_object_invalid_source_key, s_test_s3_copy_object_invalid_source_key) @@ -5495,7 +5502,8 @@ static int s_test_s3_copy_object_invalid_source_key(struct aws_allocator *alloca source_key, destination_key, AWS_ERROR_S3_INVALID_RESPONSE_STATUS, - AWS_HTTP_STATUS_CODE_404_NOT_FOUND); + AWS_HTTP_STATUS_CODE_404_NOT_FOUND, + 0 /* expected_size is ignored */); } /** @@ -5525,7 +5533,7 @@ static int s_test_s3_copy_source_prefixed_by_slash(struct aws_allocator *allocat struct aws_byte_cursor x_amz_copy_source = aws_byte_cursor_from_c_str(copy_source_value); return s_test_s3_copy_object_from_x_amz_copy_source( - allocator, x_amz_copy_source, destination_key, AWS_ERROR_SUCCESS, AWS_HTTP_STATUS_CODE_200_OK); + allocator, x_amz_copy_source, destination_key, AWS_ERROR_SUCCESS, AWS_HTTP_STATUS_CODE_200_OK, MB_TO_BYTES(1)); } /** @@ -5555,7 +5563,12 @@ static int s_test_s3_copy_source_prefixed_by_slash_multipart(struct aws_allocato struct aws_byte_cursor x_amz_copy_source = aws_byte_cursor_from_c_str(copy_source_value); return s_test_s3_copy_object_from_x_amz_copy_source( - allocator, x_amz_copy_source, destination_key, AWS_ERROR_SUCCESS, AWS_HTTP_STATUS_CODE_200_OK); + allocator, + x_amz_copy_source, + destination_key, + AWS_ERROR_SUCCESS, + AWS_HTTP_STATUS_CODE_200_OK, + MB_TO_BYTES(256)); } static int s_s3_get_object_mrap_helper(struct aws_allocator *allocator, bool multipart) { @@ -5767,8 +5780,8 @@ static void s_put_pause_resume_meta_request_finish( void *user_data) { (void)meta_request; - - struct put_object_pause_resume_test_data *test_data = user_data; + struct aws_s3_tester *tester = user_data; + struct put_object_pause_resume_test_data *test_data = tester->user_data; /* if error response body is available, dump it to test result to help investigation of failed tests */ if (meta_request_result->error_response_body != NULL && meta_request_result->error_response_body->len > 0) { @@ -5792,40 +5805,71 @@ static bool s_put_pause_resume_test_completion_predicate(void *arg) { return test_data->execution_completed; } -static void s_pause_meta_request_progress( +/* Patched version of aws_s3_meta_request_vtable->finished_request() for pause/resume tests. + * It can pause the meta-request immediately after a part completes. + * We use a patched vtable, instead of the progress_callback, because + * the progress_callback fires on another thread, which might be too late to + * prevent more parts from being sent. */ +static void s_meta_request_finished_request_patched_for_pause_resume_tests( struct aws_s3_meta_request *meta_request, - const struct aws_s3_meta_request_progress *progress, - void *user_data) { + struct aws_s3_request *request, + int error_code) { AWS_ASSERT(meta_request); - AWS_ASSERT(progress); - AWS_ASSERT(user_data); - - struct put_object_pause_resume_test_data *test_data = user_data; + struct aws_s3_tester *tester = meta_request->user_data; + struct put_object_pause_resume_test_data *test_data = tester->user_data; + AWS_ASSERT(test_data); - aws_atomic_fetch_add(&test_data->total_bytes_uploaded, (size_t)progress->bytes_transferred); + if ((error_code == AWS_ERROR_SUCCESS) && (meta_request->type == AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) && + (request->request_tag == AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART)) { - size_t total_bytes_uploaded = aws_atomic_load_int(&test_data->total_bytes_uploaded); - size_t offset_to_pause = aws_atomic_load_int(&test_data->request_pause_offset); + aws_atomic_fetch_add(&test_data->total_bytes_uploaded, request->request_body.len); - if (total_bytes_uploaded >= offset_to_pause) { - /* offset of the upload at which we should pause was reached. let's pause the upload */ + size_t total_bytes_uploaded = aws_atomic_load_int(&test_data->total_bytes_uploaded); + size_t offset_to_pause = aws_atomic_load_int(&test_data->request_pause_offset); - size_t expected = false; - bool request_pause = aws_atomic_compare_exchange_int(&test_data->pause_requested, &expected, true); - if (!request_pause) { + if (total_bytes_uploaded >= offset_to_pause) { + /* offset of the upload at which we should pause was reached. let's pause the upload */ /* if the meta request has already been paused previously, do nothing. */ - return; + size_t expected = false; + bool request_pause = aws_atomic_compare_exchange_int(&test_data->pause_requested, &expected, true); + if (request_pause) { + struct aws_s3_meta_request_resume_token *resume_token = NULL; + int pause_result = aws_s3_meta_request_pause(meta_request, &resume_token); + struct aws_byte_cursor upload_id = aws_s3_meta_request_resume_token_upload_id(resume_token); + /* Make Sure we have upload ID */ + AWS_FATAL_ASSERT(aws_byte_cursor_eq_c_str(&upload_id, "") == false); + aws_atomic_store_int(&test_data->pause_result, pause_result); + aws_atomic_store_ptr(&test_data->persistable_state_ptr, resume_token); + } } - - struct aws_s3_meta_request_resume_token *resume_token = NULL; - int pause_result = aws_s3_meta_request_pause(meta_request, &resume_token); - struct aws_byte_cursor upload_id = aws_s3_meta_request_resume_token_upload_id(resume_token); - /* Make Sure we have upload ID */ - AWS_FATAL_ASSERT(aws_byte_cursor_eq_c_str(&upload_id, "") == false); - aws_atomic_store_int(&test_data->pause_result, pause_result); - aws_atomic_store_ptr(&test_data->persistable_state_ptr, resume_token); } + + /* Continue with original vtable function... */ + struct aws_s3_meta_request_vtable *original_meta_request_vtable = + aws_s3_tester_get_meta_request_vtable_patch(tester, 0)->original_vtable; + + original_meta_request_vtable->finished_request(meta_request, request, error_code); +} + +static struct aws_s3_meta_request *s_meta_request_factory_patch_for_pause_resume_tests( + struct aws_s3_client *client, + const struct aws_s3_meta_request_options *options) { + + AWS_ASSERT(client != NULL); + struct aws_s3_tester *tester = client->shutdown_callback_user_data; + AWS_ASSERT(tester != NULL); + + struct aws_s3_client_vtable *original_client_vtable = + aws_s3_tester_get_client_vtable_patch(tester, 0)->original_vtable; + + struct aws_s3_meta_request *meta_request = original_client_vtable->meta_request_factory(client, options); + + struct aws_s3_meta_request_vtable *patched_meta_request_vtable = + aws_s3_tester_patch_meta_request_vtable(tester, meta_request, NULL); + patched_meta_request_vtable->finished_request = s_meta_request_finished_request_patched_for_pause_resume_tests; + + return meta_request; } /* total length of the object to simulate for upload */ @@ -5953,6 +5997,9 @@ static int s_test_s3_put_pause_resume_helper( struct aws_s3_client *client = aws_s3_client_new(allocator, &client_config); + struct aws_s3_client_vtable *patched_client_vtable = aws_s3_tester_patch_client_vtable(tester, client, NULL); + patched_client_vtable->meta_request_factory = s_meta_request_factory_patch_for_pause_resume_tests; + struct aws_byte_cursor destination_bucket = g_test_bucket_name; char endpoint[1024]; @@ -5977,18 +6024,19 @@ static int s_test_s3_put_pause_resume_helper( aws_mutex_init(&test_data->mutex); test_data->execution_completed = false; + tester->user_data = test_data; + struct aws_s3_checksum_config checksum_config = { .checksum_algorithm = checksum_algorithm, .location = checksum_algorithm == AWS_SCA_NONE ? AWS_SCL_NONE : AWS_SCL_TRAILER, }; struct aws_s3_meta_request_options meta_request_options = { - .user_data = test_data, + .user_data = tester, .body_callback = NULL, .signing_config = client_config.signing_config, .finish_callback = s_put_pause_resume_meta_request_finish, .headers_callback = NULL, - .progress_callback = s_pause_meta_request_progress, .upload_review_callback = s_pause_resume_upload_review_callback, .message = message, .shutdown_callback = NULL, @@ -6018,6 +6066,11 @@ static int s_test_s3_put_pause_resume_helper( aws_mutex_clean_up(&test_data->mutex); aws_http_message_destroy(message); + /* release this client with its crazy patched vtables */ + client = aws_s3_client_release(client); + aws_s3_tester_wait_for_client_shutdown(tester); + tester->bound_to_client = false; + if (expected_error_code == AWS_ERROR_SUCCESS) { /* get the file and verify it matches what we uploaded */ struct aws_s3_tester_meta_request_options options = { @@ -6025,7 +6078,6 @@ static int s_test_s3_put_pause_resume_helper( .meta_request_type = AWS_S3_META_REQUEST_TYPE_GET_OBJECT, .validate_type = AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_SUCCESS, .body_callback = s_pause_resume_receive_body_callback, - .client = client, .get_options = { .object_path = destination_key, @@ -6039,8 +6091,6 @@ static int s_test_s3_put_pause_resume_helper( aws_s3_meta_request_test_results_clean_up(&results); } - client = aws_s3_client_release(client); - return 0; } @@ -6069,7 +6119,7 @@ static int s_test_s3_put_pause_resume_happy_path(struct aws_allocator *allocator struct aws_input_stream *initial_upload_stream = aws_s3_test_input_stream_new(allocator, s_pause_resume_object_length_128MB); - /* starts the upload request that will be paused by s_pause_meta_request_progress() */ + /* starts the upload request that will be paused */ ASSERT_SUCCESS(s_test_s3_put_pause_resume_helper( &tester, allocator, @@ -6082,11 +6132,6 @@ static int s_test_s3_put_pause_resume_happy_path(struct aws_allocator *allocator AWS_ERROR_S3_PAUSED, 0)); - if (tester.bound_to_client) { - aws_s3_tester_wait_for_client_shutdown(&tester); - tester.bound_to_client = false; - } - aws_input_stream_destroy(initial_upload_stream); /* new stream used to resume upload. it begins at the offset specified in the persistable state */ @@ -6151,7 +6196,7 @@ static int s_test_s3_put_pause_resume_all_parts_done(struct aws_allocator *alloc struct aws_input_stream *initial_upload_stream = aws_s3_test_input_stream_new(allocator, s_pause_resume_object_length_128MB); - /* starts the upload request that will be paused by s_pause_meta_request_progress() */ + /* starts the upload request that will be paused */ ASSERT_SUCCESS(s_test_s3_put_pause_resume_helper( &tester, allocator, @@ -6163,10 +6208,6 @@ static int s_test_s3_put_pause_resume_all_parts_done(struct aws_allocator *alloc AWS_SCA_NONE, AWS_ERROR_S3_PAUSED, 0)); - if (tester.bound_to_client) { - aws_s3_tester_wait_for_client_shutdown(&tester); - tester.bound_to_client = false; - } aws_input_stream_destroy(initial_upload_stream); @@ -6234,7 +6275,7 @@ static int s_test_s3_put_pause_resume_invalid_resume_data(struct aws_allocator * struct aws_input_stream *initial_upload_stream = aws_s3_test_input_stream_new(allocator, s_pause_resume_object_length_128MB); - /* starts the upload request that will be paused by s_pause_meta_request_progress() */ + /* starts the upload request that will be paused */ ASSERT_SUCCESS(s_test_s3_put_pause_resume_helper( &tester, allocator, @@ -6246,10 +6287,6 @@ static int s_test_s3_put_pause_resume_invalid_resume_data(struct aws_allocator * AWS_SCA_CRC32, AWS_ERROR_S3_PAUSED, 0)); - if (tester.bound_to_client) { - aws_s3_tester_wait_for_client_shutdown(&tester); - tester.bound_to_client = false; - } aws_input_stream_destroy(initial_upload_stream); @@ -6316,7 +6353,7 @@ static int s_test_s3_put_pause_resume_invalid_resume_stream(struct aws_allocator struct aws_input_stream *initial_upload_stream = aws_s3_test_input_stream_new(allocator, s_pause_resume_object_length_128MB); - /* starts the upload request that will be paused by s_pause_meta_request_progress() */ + /* starts the upload request that will be paused */ ASSERT_SUCCESS(s_test_s3_put_pause_resume_helper( &tester, allocator, @@ -6328,10 +6365,6 @@ static int s_test_s3_put_pause_resume_invalid_resume_stream(struct aws_allocator AWS_SCA_CRC32, AWS_ERROR_S3_PAUSED, 0)); - if (tester.bound_to_client) { - aws_s3_tester_wait_for_client_shutdown(&tester); - tester.bound_to_client = false; - } aws_input_stream_release(initial_upload_stream); @@ -6404,7 +6437,7 @@ static int s_test_s3_put_pause_resume_invalid_content_length(struct aws_allocato struct aws_input_stream *initial_upload_stream = aws_s3_test_input_stream_new(allocator, s_pause_resume_object_length_128MB); - /* starts the upload request that will be paused by s_pause_meta_request_progress() */ + /* starts the upload request that will be paused */ ASSERT_SUCCESS(s_test_s3_put_pause_resume_helper( &tester, allocator, @@ -6416,10 +6449,6 @@ static int s_test_s3_put_pause_resume_invalid_content_length(struct aws_allocato AWS_SCA_CRC32, AWS_ERROR_S3_PAUSED, 0)); - if (tester.bound_to_client) { - aws_s3_tester_wait_for_client_shutdown(&tester); - tester.bound_to_client = false; - } aws_input_stream_release(initial_upload_stream); diff --git a/tests/s3_mock_server_tests.c b/tests/s3_mock_server_tests.c index 5737b3f2d..bda02a354 100644 --- a/tests/s3_mock_server_tests.c +++ b/tests/s3_mock_server_tests.c @@ -589,8 +589,7 @@ TEST_CASE(resume_first_part_not_completed_mock_server) { ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, &out_results)); /* Make Sure we only uploaded 2 parts. */ - size_t total_bytes_uploaded = aws_atomic_load_int(&out_results.total_bytes_uploaded); - ASSERT_UINT_EQUALS(2 * MB_TO_BYTES(8), total_bytes_uploaded); + /* TODO: monitor telemetry ensure this happened */ aws_s3_meta_request_test_results_clean_up(&out_results); aws_s3_meta_request_resume_token_release(token); @@ -646,8 +645,7 @@ TEST_CASE(resume_multi_page_list_parts_mock_server) { ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, &out_results)); /* Make Sure we only uploaded 2 parts. */ - size_t total_bytes_uploaded = aws_atomic_load_int(&out_results.total_bytes_uploaded); - ASSERT_UINT_EQUALS(2 * MB_TO_BYTES(8), total_bytes_uploaded); + /* TODO: monitor telemetry ensure this happened */ aws_s3_meta_request_test_results_clean_up(&out_results); aws_s3_meta_request_resume_token_release(token); @@ -755,8 +753,7 @@ TEST_CASE(resume_after_finished_mock_server) { ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, &out_results)); /* The error code should be success, but there are no headers and stuff as no request was made. */ ASSERT_UINT_EQUALS(AWS_ERROR_SUCCESS, out_results.finished_error_code); - size_t total_bytes_uploaded = aws_atomic_load_int(&out_results.total_bytes_uploaded); - ASSERT_UINT_EQUALS(0, total_bytes_uploaded); + /* TODO: monitor telemetry to ensure no actual data was sent */ aws_s3_meta_request_test_results_clean_up(&out_results); aws_s3_meta_request_resume_token_release(token); @@ -834,9 +831,6 @@ TEST_CASE(endpoint_override_mock_server) { .mock_server = true, }; - struct aws_s3_meta_request_test_results out_results; - aws_s3_meta_request_test_results_init(&out_results, allocator); - /* Put together a simple S3 Put Object request. */ struct aws_input_stream *input_stream = aws_s3_test_input_stream_new(allocator, put_options.put_options.object_size_mb); @@ -846,7 +840,7 @@ TEST_CASE(endpoint_override_mock_server) { /* 1. Create request without host and use endpoint override for the host info */ put_options.message = message; - ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, &out_results)); + ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, NULL)); /* 2. Create request with host info missmatch endpoint override */ struct aws_http_header host_header = { @@ -856,12 +850,11 @@ TEST_CASE(endpoint_override_mock_server) { ASSERT_SUCCESS(aws_http_message_add_header(message, host_header)); put_options.message = message; put_options.validate_type = AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_FAILURE; - ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, &out_results)); + ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, NULL)); /* Clean up */ aws_http_message_destroy(message); aws_input_stream_release(input_stream); - aws_s3_meta_request_test_results_clean_up(&out_results); aws_s3_client_release(client); aws_s3_tester_clean_up(&tester); diff --git a/tests/s3_tester.c b/tests/s3_tester.c index 9c235a1c9..2ccf3d70e 100644 --- a/tests/s3_tester.c +++ b/tests/s3_tester.c @@ -245,7 +245,19 @@ static void s_s3_test_meta_request_progress( struct aws_s3_meta_request_test_results *meta_request_test_results = user_data; - aws_atomic_fetch_add(&meta_request_test_results->total_bytes_uploaded, (size_t)progress->bytes_transferred); + meta_request_test_results->progress.total_bytes_transferred += progress->bytes_transferred; + + /* Once content_length is reported, it shouldn't change */ + if (meta_request_test_results->progress.content_length == 0) { + meta_request_test_results->progress.content_length = progress->content_length; + } else { + AWS_FATAL_ASSERT(meta_request_test_results->progress.content_length == progress->content_length); + } + + /* If content_length is known, we shouldn't go over it */ + if (progress->content_length != 0) { + AWS_FATAL_ASSERT(meta_request_test_results->progress.total_bytes_transferred <= progress->content_length); + } if (meta_request_test_results->progress_callback != NULL) { meta_request_test_results->progress_callback(meta_request, progress, user_data); @@ -474,7 +486,6 @@ void aws_s3_meta_request_test_results_init( AWS_ZERO_STRUCT(*test_meta_request); test_meta_request->allocator = allocator; aws_atomic_init_int(&test_meta_request->received_body_size_delta, 0); - aws_atomic_init_int(&test_meta_request->total_bytes_uploaded, 0); aws_array_list_init_dynamic( &test_meta_request->synced_data.metrics, allocator, 4, sizeof(struct aws_s3_request_metrics *)); } @@ -1392,6 +1403,7 @@ int aws_s3_tester_send_meta_request_with_options( struct aws_input_stream *input_stream = NULL; struct aws_async_input_stream *async_stream = NULL; + size_t upload_size_bytes = 0; if (meta_request_options.message == NULL) { const struct aws_byte_cursor *bucket_name = options->bucket_name; @@ -1433,17 +1445,17 @@ int aws_s3_tester_send_meta_request_with_options( options->default_type_options.mode == AWS_S3_TESTER_DEFAULT_TYPE_MODE_PUT)) { uint32_t object_size_mb = options->put_options.object_size_mb; - size_t object_size_bytes = (size_t)object_size_mb * 1024ULL * 1024ULL; + upload_size_bytes = (size_t)object_size_mb * 1024ULL * 1024ULL; /* This doesn't do what we think it should because * g_min_upload_part_size overrides client->part_size */ if (options->put_options.ensure_multipart) { - if (object_size_bytes == 0) { - object_size_bytes = client->part_size * 2; - object_size_mb = (uint32_t)(object_size_bytes / 1024 / 1024); + if (upload_size_bytes == 0) { + upload_size_bytes = client->part_size * 2; + object_size_mb = (uint32_t)(upload_size_bytes / 1024 / 1024); } - ASSERT_TRUE(object_size_bytes > client->part_size); + ASSERT_TRUE(upload_size_bytes > client->part_size); } struct aws_byte_buf object_path_buffer; @@ -1503,7 +1515,7 @@ int aws_s3_tester_send_meta_request_with_options( struct aws_async_input_stream_tester_options stream_options = { .base = { - .autogen_length = object_size_bytes, + .autogen_length = upload_size_bytes, .eof_requires_extra_read = options->put_options.eof_requires_extra_read, .max_bytes_per_read = options->put_options.max_bytes_per_read, }, @@ -1572,7 +1584,7 @@ int aws_s3_tester_send_meta_request_with_options( &host_cur, g_test_body_content_type, test_object_path, - object_size_bytes, + upload_size_bytes, options->sse_type); } @@ -1614,6 +1626,14 @@ int aws_s3_tester_send_meta_request_with_options( aws_string_destroy(host_name); } else { aws_http_message_acquire(meta_request_options.message); + + if (options->meta_request_type == AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) { + /* Figure out how much is being uploaded from pre-existing message */ + struct aws_input_stream *mystery_stream = aws_http_message_get_body_stream(meta_request_options.message); + if (mystery_stream != NULL) { + ASSERT_SUCCESS(aws_input_stream_get_length(mystery_stream, (int64_t *)&upload_size_bytes)); + } + } } struct aws_s3_meta_request_test_results meta_request_test_results; @@ -1648,21 +1668,6 @@ int aws_s3_tester_send_meta_request_with_options( ASSERT_TRUE(aws_s3_meta_request_is_finished(meta_request)); } - /* If total_bytes_uploaded wasn't set by the progress callback, - * then set it based on number of bytes read from input-stream. - * (progress callback isn't currently hooked up for single part upload) */ - if ((out_results->finished_error_code == AWS_ERROR_SUCCESS) && - (aws_atomic_load_int(&out_results->total_bytes_uploaded) == 0)) { - - uint64_t bytes_read_from_stream = 0; - if (input_stream != NULL) { - bytes_read_from_stream = aws_input_stream_tester_total_bytes_read(input_stream); - } else if (async_stream != NULL) { - bytes_read_from_stream = aws_async_input_stream_tester_total_bytes_read(async_stream); - } - aws_atomic_store_int(&out_results->total_bytes_uploaded, (size_t)bytes_read_from_stream); - } - switch (options->validate_type) { case AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_SUCCESS: ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, out_results->finished_error_code); @@ -1671,6 +1676,18 @@ int aws_s3_tester_send_meta_request_with_options( ASSERT_SUCCESS(aws_s3_tester_validate_get_object_results(out_results, options->sse_type)); } else if (meta_request_options.type == AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) { ASSERT_SUCCESS(aws_s3_tester_validate_put_object_results(out_results, options->sse_type)); + + /* Expected number of bytes should have been read from stream, and reported via progress callbacks */ + if (input_stream != NULL) { + ASSERT_UINT_EQUALS(upload_size_bytes, aws_input_stream_tester_total_bytes_read(input_stream)); + } else if (async_stream != NULL) { + ASSERT_UINT_EQUALS(upload_size_bytes, aws_async_input_stream_tester_total_bytes_read(async_stream)); + } + + ASSERT_UINT_EQUALS(upload_size_bytes, out_results->progress.total_bytes_transferred); + if (!options->put_options.skip_content_length) { + ASSERT_UINT_EQUALS(upload_size_bytes, out_results->progress.content_length); + } } break; case AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_FAILURE: @@ -1857,6 +1874,8 @@ int aws_s3_tester_validate_get_object_results( meta_request_test_results->received_body_size); ASSERT_TRUE(content_length == meta_request_test_results->received_body_size); + ASSERT_UINT_EQUALS(content_length, meta_request_test_results->progress.total_bytes_transferred); + ASSERT_UINT_EQUALS(content_length, meta_request_test_results->progress.content_length); return AWS_OP_SUCCESS; } diff --git a/tests/s3_tester.h b/tests/s3_tester.h index f6c6872c6..b70bdeece 100644 --- a/tests/s3_tester.h +++ b/tests/s3_tester.h @@ -233,10 +233,11 @@ struct aws_s3_meta_request_test_results { int finished_error_code; enum aws_s3_checksum_algorithm algorithm; - /* accumulator of amount of bytes uploaded. - * Currently, this only works for MPU and Copy meta-requests. - * It's powered by the progress_callback which isn't invoked for all types */ - struct aws_atomic_var total_bytes_uploaded; + /* Record data from progress_callback() */ + struct { + uint64_t content_length; /* Remember progress->content_length */ + uint64_t total_bytes_transferred; /* Accumulator for progress->bytes_transferred */ + } progress; /* Protected the tester->synced_data.lock */ struct {