Skip to content

Commit

Permalink
impl(GCS+gRPC): convert AsyncReadObject() results (#9302)
Browse files Browse the repository at this point in the history
In the `storage` library we do not expose protos directly to the
application developer. This change introduces a function to convert the
protos accumulated by `AsyncAccumulateReadObjectFull()` into types that
are consistent with the public APIs.
  • Loading branch information
coryan authored Jun 18, 2022
1 parent 0f308e3 commit 40b66c4
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 0 deletions.
77 changes: 77 additions & 0 deletions google/cloud/storage/async_object_responses.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_ASYNC_OBJECT_RESPONSES_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_ASYNC_OBJECT_RESPONSES_H

#include "google/cloud/storage/object_metadata.h"
#include "google/cloud/storage/version.h"
#include "absl/types/optional.h"
#include <cstdint>
#include <map>
#include <string>
#include <vector>

namespace google {
namespace cloud {
namespace storage_experimental {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

/// Represents the response from reading a subset of an object.
struct AsyncReadObjectRangeResponse {
/**
* The final status of the download.
*
* Downloads can have partial failures, where only a subset of the data is
* successfully downloaded, and then the connection is interrupted. With the
* default configuration, the client library resumes the download. If,
* however, the `storage::RetryPolicy` is exhausted, only the partial results
* are returned, and the last error status is returned here.
*/
Status status;

/// If available, the full object metadata.
absl::optional<storage::ObjectMetadata> object_metadata;

/**
* The object contents.
*
* The library receives the object contents as a sequence of `std::string`.
* To avoid copies the library returns the sequence to the application. If
* you need to consolidate the contents use something like:
*
* @code
* AsyncReadObjectRangeResponse response = ...;
* auto all = std::accumulate(
* response.contents.begin(), response.contents.end(), std::string{},
* [](auto a, auto b) { a += b; return a; });
* @endcode
*/
std::vector<std::string> contents;

/**
* Per-request metadata and annotations.
*
* These are intended as debugging tools. They are subject to change without
* notice.
*/
std::multimap<std::string, std::string> request_metadata;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_experimental
} // namespace cloud
} // namespace google

#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_ASYNC_OBJECT_RESPONSES_H
1 change: 1 addition & 0 deletions google/cloud/storage/google_cloud_cpp_storage_grpc.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

google_cloud_cpp_storage_grpc_hdrs = [
"async_client.h",
"async_object_responses.h",
"grpc_plugin.h",
"internal/async_accumulate_read_object.h",
"internal/async_connection.h",
Expand Down
1 change: 1 addition & 0 deletions google/cloud/storage/google_cloud_cpp_storage_grpc.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ else ()
google_cloud_cpp_storage_grpc
async_client.cc
async_client.h
async_object_responses.h
grpc_plugin.cc
grpc_plugin.h
internal/async_accumulate_read_object.cc
Expand Down
30 changes: 30 additions & 0 deletions google/cloud/storage/internal/async_accumulate_read_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

#include "google/cloud/storage/internal/async_accumulate_read_object.h"
#include "google/cloud/storage/internal/grpc_object_metadata_parser.h"
#include <crc32c/crc32c.h>
#include <iterator>
#include <numeric>
#include <sstream>
Expand Down Expand Up @@ -322,6 +324,34 @@ future<AsyncAccumulateReadObjectResult> AsyncAccumulateReadObjectFull(
return handle->Invoke();
}

storage_experimental::AsyncReadObjectRangeResponse ToResponse(
AsyncAccumulateReadObjectResult accumulated, Options const& options) {
storage_experimental::AsyncReadObjectRangeResponse response;
response.status = std::move(accumulated.status);
response.request_metadata = std::move(accumulated.metadata);
response.contents.reserve(accumulated.payload.size());
for (auto& r : accumulated.payload) {
if (!r.has_checksummed_data()) continue;
auto& data = *r.mutable_checksummed_data();
if (data.has_crc32c() && crc32c::Crc32c(data.content()) != data.crc32c()) {
response.status = Status(StatusCode::kDataLoss,
"Mismatched CRC32C checksum in downloaded data");
return response;
}
response.contents.push_back(std::move(*data.mutable_content()));
}
response.object_metadata = [&] {
for (auto& r : accumulated.payload) {
if (!r.has_metadata()) continue;
return absl::make_optional(
storage::internal::GrpcObjectMetadataParser::FromProto(
*r.mutable_metadata(), options));
}
return absl::optional<storage::ObjectMetadata>{};
}();
return response;
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_internal
} // namespace cloud
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/storage/internal/async_accumulate_read_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_ACCUMULATE_READ_OBJECT_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_ACCUMULATE_READ_OBJECT_H

#include "google/cloud/storage/async_object_responses.h"
#include "google/cloud/storage/internal/storage_stub.h"
#include "google/cloud/storage/options.h"
#include "google/cloud/completion_queue.h"
Expand Down Expand Up @@ -169,6 +170,10 @@ future<AsyncAccumulateReadObjectResult> AsyncAccumulateReadObjectFull(
std::function<std::unique_ptr<grpc::ClientContext>()> context_factory,
google::storage::v2::ReadObjectRequest request, Options const& options);

/// Convert the proto into a representation more familiar to our customers.
storage_experimental::AsyncReadObjectRangeResponse ToResponse(
AsyncAccumulateReadObjectResult accumulated, Options const& options);

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_internal
} // namespace cloud
Expand Down
98 changes: 98 additions & 0 deletions google/cloud/storage/internal/async_accumulate_read_object_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,104 @@ TEST(AsyncAccumulateReadObjectTest, PermanentFailure) {
runner.join();
}

TEST(AsyncAccumulateReadObjectTest, ToResponse) {
// To generate the CRC32C checksums use:
// /bin/echo -n $content > foo.txt && gsutil hash foo.txt
// and then pipe the base64 encoded output, for example, the "How vexingly..."
// text yields:
// echo 'StZ/gA==' | openssl base64 -d | xxd
// Output: 00000000: 4ad6 7f80
auto constexpr kText0 = R"pb(
checksummed_data {
content: "The quick brown fox jumps over the lazy dog"
crc32c: 0x22620404
}
object_checksums { crc32c: 2345 md5_hash: "test-only-invalid" }
content_range { start: 1024 end: 2048 complete_length: 8192 }
metadata { bucket: "projects/_/buckets/bucket-name" name: "object-name" }
)pb";
auto constexpr kText1 = R"pb(
checksummed_data {
content: "How vexingly quick daft zebras jump!"
crc32c: 0x4ad67f80
}
object_checksums { crc32c: 2345 md5_hash: "test-only-invalid" }
content_range { start: 1024 end: 2048 complete_length: 8192 }
metadata { bucket: "projects/_/buckets/bucket-name" name: "object-name" }
)pb";

ReadObjectResponse r0;
ASSERT_TRUE(TextFormat::ParseFromString(kText0, &r0));
ReadObjectResponse r1;
ASSERT_TRUE(TextFormat::ParseFromString(kText1, &r1));

AsyncAccumulateReadObjectResult accumulated;
accumulated.status = Status(StatusCode::kUnavailable, "try-again");
accumulated.payload.push_back(r0);
accumulated.payload.push_back(r1);
accumulated.metadata.emplace("key", "v0");
accumulated.metadata.emplace("key", "v1");

auto const actual =
ToResponse(accumulated, Options{}.set<storage::RestEndpointOption>(
"https://storage.googleapis.com"));
EXPECT_THAT(actual.status, StatusIs(StatusCode::kUnavailable, "try-again"));
EXPECT_THAT(actual.contents,
ElementsAre("The quick brown fox jumps over the lazy dog",
"How vexingly quick daft zebras jump!"));
EXPECT_THAT(actual.request_metadata,
UnorderedElementsAre(Pair("key", "v0"), Pair("key", "v1")));
ASSERT_TRUE(actual.object_metadata.has_value());
EXPECT_EQ(actual.object_metadata->bucket(), "bucket-name");
EXPECT_EQ(actual.object_metadata->name(), "object-name");
}

TEST(AsyncAccumulateReadObjectTest, ToResponseDataLoss) {
auto constexpr kText0 = R"pb(
checksummed_data {
content: "The quick brown fox jumps over the lazy dog"
crc32c: 0x00000000
}
object_checksums { crc32c: 2345 md5_hash: "test-only-invalid" }
content_range { start: 1024 end: 2048 complete_length: 8192 }
metadata { bucket: "projects/_/buckets/bucket-name" name: "object-name" }
)pb";

ReadObjectResponse r0;
ASSERT_TRUE(TextFormat::ParseFromString(kText0, &r0));

AsyncAccumulateReadObjectResult accumulated;
accumulated.status = Status{};
accumulated.payload.push_back(r0);
accumulated.metadata.emplace("key", "v0");
accumulated.metadata.emplace("key", "v1");

auto const actual =
ToResponse(accumulated, Options{}.set<storage::RestEndpointOption>(
"https://storage.googleapis.com"));
EXPECT_THAT(actual.status, StatusIs(StatusCode::kDataLoss));
EXPECT_THAT(actual.contents, IsEmpty());
EXPECT_FALSE(actual.object_metadata.has_value());
EXPECT_THAT(actual.request_metadata,
UnorderedElementsAre(Pair("key", "v0"), Pair("key", "v1")));
}

TEST(AsyncAccumulateReadObjectTest, ToResponseError) {
AsyncAccumulateReadObjectResult accumulated;
accumulated.status = Status(StatusCode::kNotFound, "not found");
accumulated.metadata.emplace("key", "v0");
accumulated.metadata.emplace("key", "v1");

auto const actual =
ToResponse(accumulated, Options{}.set<storage::RestEndpointOption>(
"https://storage.googleapis.com"));
EXPECT_THAT(actual.status, StatusIs(StatusCode::kNotFound, "not found"));
EXPECT_THAT(actual.contents, IsEmpty());
EXPECT_FALSE(actual.object_metadata.has_value());
EXPECT_THAT(actual.request_metadata,
UnorderedElementsAre(Pair("key", "v0"), Pair("key", "v1")));
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_internal
Expand Down

0 comments on commit 40b66c4

Please sign in to comment.