Skip to content

Commit

Permalink
impl(GCS+gRPC): add AsyncReadObjectRange (#9305)
Browse files Browse the repository at this point in the history
Implement a function to asynchronously read a portion of the object
contents.  The callers must set the size of this portion, which gives
them control over how much memory this call uses. Use this new function
from one of the integration tests.
  • Loading branch information
coryan authored Jun 19, 2022
1 parent b39b353 commit d12b9c4
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 8 deletions.
58 changes: 58 additions & 0 deletions google/cloud/storage/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,64 @@ class AsyncClient {
public:
~AsyncClient() = default;

/**
* Reads the contents of an object.
*
* When satisfied, the returned future has the contents of the given object
* between @p offset and @p offset + @p limit (exclusive).
*
* Be aware that this will accumulate all the bytes in memory, you need to
* consider whether @p limit is too large for your deployment environment.
*
* @param bucket_name the name of the bucket that contains the object.
* @param object_name the name of the object to be read.
* @param offset where to begin reading from the object, results in an error
* if the offset is larger than the object
* @param limit how much data to read starting at @p offset
* @param options a list of optional query parameters and/or request headers.
* Valid types for this operation include `DisableCrc32cChecksum`,
* `DisableMD5Hash`, `EncryptionKey`, `Generation`, `IfGenerationMatch`,
* `IfGenerationNotMatch`, `IfMetagenerationMatch`,
* `IfMetagenerationNotMatch`, `UserProject`, and `AcceptEncoding`.
*
* @par Idempotency
* This is a read-only operation and is always idempotent.
*/
template <typename... RequestOptions>
future<AsyncReadObjectRangeResponse> ReadObject(
std::string const& bucket_name, std::string const& object_name,
std::int64_t offset, std::int64_t limit, RequestOptions&&... options) {
struct HasReadRange
: public absl::disjunction<
std::is_same<storage::ReadRange, RequestOptions>...> {};
struct HasReadFromOffset
: public absl::disjunction<
std::is_same<storage::ReadFromOffset, RequestOptions>...> {};
struct HasReadLast
: public absl::disjunction<
std::is_same<storage::ReadLast, RequestOptions>...> {};

static_assert(!HasReadRange::value,
"Cannot use `ReadRange()` as a request option in "
"`AsyncClient::ReadObject()`, use the `offset` and `limit` "
"parameters instead.");
static_assert(!HasReadFromOffset::value,
"Cannot use `ReadFromOffset()` as a request option in "
"`AsyncClient::ReadObject()`, use the `offset` and `limit` "
"parameters instead.");
static_assert(!HasReadLast::value,
"Cannot use `ReadLast()` as a request option in "
"`AsyncClient::ReadObject()`, use the `offset` and `limit` "
"parameters instead.");

google::cloud::internal::OptionsSpan const span(
SpanOptions(std::forward<Options>(options)...));
storage::internal::ReadObjectRangeRequest request(bucket_name, object_name);
request.set_multiple_options(std::forward<Options>(options)...,
storage::ReadRange(offset, offset + limit));
return connection_->AsyncReadObjectRange(request);
}

/**
* Deletes an object.
*
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/storage/internal/async_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_CONNECTION_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_CONNECTION_H

#include "google/cloud/storage/async_object_responses.h"
#include "google/cloud/storage/internal/object_requests.h"
#include "google/cloud/completion_queue.h"
#include "google/cloud/future.h"
Expand Down Expand Up @@ -46,6 +47,10 @@ class AsyncConnection {

virtual Options options() const = 0;

virtual future<storage_experimental::AsyncReadObjectRangeResponse>
AsyncReadObjectRange(
storage::internal::ReadObjectRangeRequest const& request) = 0;

virtual future<Status> AsyncDeleteObject(
storage::internal::DeleteObjectRequest const& request) = 0;
};
Expand Down
26 changes: 26 additions & 0 deletions google/cloud/storage/internal/async_connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
// limitations under the License.

#include "google/cloud/storage/internal/async_connection_impl.h"
#include "google/cloud/storage/internal/async_accumulate_read_object.h"
#include "google/cloud/storage/internal/grpc_client.h"
#include "google/cloud/storage/internal/grpc_configure_client_context.h"
#include "google/cloud/storage/internal/grpc_object_request_parser.h"
#include "google/cloud/storage/internal/storage_stub_factory.h"
#include "google/cloud/storage/options.h"
#include "google/cloud/internal/async_retry_loop.h"

namespace google {
Expand All @@ -31,6 +33,30 @@ AsyncConnectionImpl::AsyncConnectionImpl(CompletionQueue cq,
stub_(std::move(stub)),
options_(std::move(options)) {}

future<storage_experimental::AsyncReadObjectRangeResponse>
AsyncConnectionImpl::AsyncReadObjectRange(
storage::internal::ReadObjectRangeRequest const& request) {
auto proto = storage::internal::GrpcObjectRequestParser::ToProto(request);
if (!proto) {
auto response = storage_experimental::AsyncReadObjectRangeResponse{};
response.status = std::move(proto).status();
return make_ready_future(std::move(response));
}

auto context_factory = [request]() {
auto context = absl::make_unique<grpc::ClientContext>();
ApplyQueryParameters(*context, request);
return context;
};
auto const& current = internal::CurrentOptions();
return storage_internal::AsyncAccumulateReadObjectFull(
cq_, stub_, std::move(context_factory), *std::move(proto), current)
.then([current](
future<storage_internal::AsyncAccumulateReadObjectResult> f) {
return ToResponse(f.get(), current);
});
}

future<Status> AsyncConnectionImpl::AsyncDeleteObject(
storage::internal::DeleteObjectRequest const& request) {
auto proto = storage::internal::GrpcObjectRequestParser::ToProto(request);
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/storage/internal/async_connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class AsyncConnectionImpl : public AsyncConnection {

Options options() const override { return options_; }

future<storage_experimental::AsyncReadObjectRangeResponse>
AsyncReadObjectRange(
storage::internal::ReadObjectRangeRequest const& request) override;

future<Status> AsyncDeleteObject(
storage::internal::DeleteObjectRequest const& request) override;

Expand Down
21 changes: 13 additions & 8 deletions google/cloud/storage/tests/async_client_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,16 @@ class AsyncClientIntegrationTest
: public google::cloud::storage::testing::StorageIntegrationTest {
protected:
void SetUp() override {
// TODO(#5673) - enable against production.
if (!UsingEmulator()) GTEST_SKIP();
project_id_ = GetEnv("GOOGLE_CLOUD_PROJECT").value_or("");
ASSERT_THAT(project_id_, Not(IsEmpty()))
<< "GOOGLE_CLOUD_PROJECT is not set";

bucket_name_ =
GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME").value_or("");
ASSERT_THAT(bucket_name_, Not(IsEmpty()))
<< "GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME is not set";
}

std::string project_id() const { return project_id_; }
std::string bucket_name() const { return bucket_name_; }
std::string const& bucket_name() const { return bucket_name_; }

private:
std::string project_id_;
std::string bucket_name_;
};

Expand All @@ -68,6 +61,18 @@ TEST_F(AsyncClientIntegrationTest, ObjectCRUD) {
ScheduleForDelete(*insert);

auto async = MakeAsyncClient();
auto pending0 =
async.ReadObject(bucket_name(), object_name, 0, LoremIpsum().size());
auto pending1 =
async.ReadObject(bucket_name(), object_name, 0, LoremIpsum().size());

for (auto* p : {&pending1, &pending0}) {
auto response = p->get();
EXPECT_STATUS_OK(response.status);
auto const full = std::accumulate(response.contents.begin(),
response.contents.end(), std::string{});
EXPECT_EQ(full, LoremIpsum());
}
auto status = async
.DeleteObject(bucket_name(), object_name,
gcs::Generation(insert->generation()))
Expand Down

0 comments on commit d12b9c4

Please sign in to comment.