Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(spanner): add Options save/restore to PartialResultSetSource #8355

Merged
merged 2 commits into from
Feb 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/spanner/internal/partial_result_set_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ PartialResultSetSource::~PartialResultSetSource() {
// If there is actual data in the streaming RPC Finish() can deadlock, so
// before trying to read the final status we need to cancel the streaming
// RPC.
internal::OptionsSpan span(options_);
reader_->TryCancel();
// The user didn't iterate over all the data; finish the stream on their
// behalf, but we have no way to communicate error status.
Expand All @@ -95,6 +96,7 @@ PartialResultSetSource::~PartialResultSetSource() {
}

Status PartialResultSetSource::ReadFromStream() {
internal::OptionsSpan span(options_);
auto result_set = reader_->Read();
if (!result_set) {
// Read() returns false for end of stream, whether we read all the data or
Expand Down
4 changes: 3 additions & 1 deletion google/cloud/spanner/internal/partial_result_set_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "google/cloud/spanner/results.h"
#include "google/cloud/spanner/value.h"
#include "google/cloud/spanner/version.h"
#include "google/cloud/options.h"
#include "google/cloud/status.h"
#include "google/cloud/status_or.h"
#include "absl/types/optional.h"
Expand Down Expand Up @@ -60,10 +61,11 @@ class PartialResultSetSource : public ResultSourceInterface {
private:
explicit PartialResultSetSource(
std::unique_ptr<PartialResultSetReader> reader)
: reader_(std::move(reader)) {}
: options_(internal::CurrentOptions()), reader_(std::move(reader)) {}

Status ReadFromStream();

Options options_;
std::unique_ptr<PartialResultSetReader> reader_;
absl::optional<google::spanner::v1::ResultSetMetadata> metadata_;
absl::optional<google::spanner::v1::ResultSetStats> stats_;
Expand Down
236 changes: 150 additions & 86 deletions google/cloud/spanner/internal/partial_result_set_source_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "google/cloud/spanner/row.h"
#include "google/cloud/spanner/testing/mock_partial_result_set_reader.h"
#include "google/cloud/spanner/value.h"
#include "google/cloud/options.h"
#include "google/cloud/testing_util/is_proto_equal.h"
#include "google/cloud/testing_util/status_matchers.h"
#include "absl/memory/memory.h"
Expand All @@ -40,7 +41,46 @@ using ::google::cloud::testing_util::IsProtoEqual;
using ::google::cloud::testing_util::StatusIs;
using ::google::protobuf::TextFormat;
using ::testing::HasSubstr;
using ::testing::Return;
using ::testing::UnitTest;

std::string CurrentTestName() {
return UnitTest::GetInstance()->current_test_info()->name();
}

struct StringOption {
using Type = std::string;
};

// Create the `PartialResultSetSource` within an `OptionsSpan` that has its
// `StringOption` set to the current test name, so that we might check that
// all `PartialResultSetReader` calls happen within a matching span.
StatusOr<std::unique_ptr<ResultSourceInterface>> CreatePartialResultSetSource(
std::unique_ptr<PartialResultSetReader> reader) {
internal::OptionsSpan span(Options{}.set<StringOption>(CurrentTestName()));
return PartialResultSetSource::Create(std::move(reader));
}

// Returns a functor that expects the current `StringOption` to match the test
// name.
std::function<void()> VoidMock() {
return [] {
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(),
CurrentTestName());
};
}

// Returns a functor that will return the argument after expecting that the
// current `StringOption` matches the test name.
template <typename T>
std::function<T()> ResultMock(T const& result) {
return [result]() {
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(),
CurrentTestName());
return result;
};
}

using ReadResult = absl::optional<spanner_proto::PartialResultSet>;

MATCHER_P(IsValidAndEquals, expected,
"Verifies that a StatusOr<Row> contains the given Row") {
Expand All @@ -51,12 +91,14 @@ MATCHER_P(IsValidAndEquals, expected,
TEST(PartialResultSetSourceTest, InitialReadFailure) {
auto grpc_reader = absl::make_unique<MockPartialResultSetReader>();
EXPECT_CALL(*grpc_reader, Read())
.WillOnce(Return(absl::optional<spanner_proto::PartialResultSet>{}));
Status finish_status(StatusCode::kInvalidArgument, "invalid");
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(finish_status));

auto reader = PartialResultSetSource::Create(std::move(grpc_reader));
EXPECT_THAT(reader, StatusIs(StatusCode::kInvalidArgument));
.WillOnce(ResultMock<ReadResult>(absl::nullopt));
EXPECT_CALL(*grpc_reader, Finish())
.WillOnce(ResultMock(Status(StatusCode::kInvalidArgument, "invalid")));
EXPECT_CALL(*grpc_reader, TryCancel()).Times(0);

internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto reader = CreatePartialResultSetSource(std::move(grpc_reader));
EXPECT_THAT(reader, StatusIs(StatusCode::kInvalidArgument, "invalid"));
}

/**
Expand All @@ -79,29 +121,33 @@ TEST(PartialResultSetSourceTest, ReadSuccessThenFailure) {
spanner_proto::PartialResultSet response;
ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
EXPECT_CALL(*grpc_reader, Read())
.WillOnce(Return(response))
.WillOnce(Return(absl::optional<spanner_proto::PartialResultSet>{}));
Status finish_status(StatusCode::kCancelled, "cancelled");
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(finish_status));
.WillOnce(ResultMock<ReadResult>(response))
.WillOnce(ResultMock<ReadResult>(absl::nullopt));
EXPECT_CALL(*grpc_reader, Finish())
.WillOnce(ResultMock(Status(StatusCode::kCancelled, "cancelled")));
EXPECT_CALL(*grpc_reader, TryCancel()).Times(0);

// The first call to NextRow() yields a row but the second gives an error.
auto reader = PartialResultSetSource::Create(std::move(grpc_reader));
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto reader = CreatePartialResultSetSource(std::move(grpc_reader));
EXPECT_STATUS_OK(reader.status());
EXPECT_THAT((*reader)->NextRow(),
IsValidAndEquals(MakeTestRow({{"AnInt", spanner::Value(80)}})));
auto row = (*reader)->NextRow();
EXPECT_THAT(row, StatusIs(StatusCode::kCancelled));
EXPECT_THAT(row, StatusIs(StatusCode::kCancelled, "cancelled"));
}

/// @test Verify the behavior when the first response does not contain metadata.
TEST(PartialResultSetSourceTest, MissingMetadata) {
auto grpc_reader = absl::make_unique<MockPartialResultSetReader>();
spanner_proto::PartialResultSet response;
EXPECT_CALL(*grpc_reader, Read()).WillOnce(Return(response));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(Status()));
EXPECT_CALL(*grpc_reader, Read())
.WillOnce(ResultMock<ReadResult>(spanner_proto::PartialResultSet()));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status()));
// The destructor should try to cancel the RPC to avoid deadlocks.
EXPECT_CALL(*grpc_reader, TryCancel()).Times(1);
EXPECT_CALL(*grpc_reader, TryCancel()).WillOnce(VoidMock());

auto reader = PartialResultSetSource::Create(std::move(grpc_reader));
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto reader = CreatePartialResultSetSource(std::move(grpc_reader));
EXPECT_THAT(reader, StatusIs(StatusCode::kInternal,
"response contained no metadata"));
}
Expand All @@ -116,11 +162,13 @@ TEST(PartialResultSetSourceTest, MissingRowTypeNoData) {
spanner_proto::PartialResultSet response;
ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
EXPECT_CALL(*grpc_reader, Read())
.WillOnce(Return(response))
.WillOnce(Return(absl::optional<spanner_proto::PartialResultSet>{}));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(Status()));
.WillOnce(ResultMock<ReadResult>(response))
.WillOnce(ResultMock<ReadResult>(absl::nullopt));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status()));
EXPECT_CALL(*grpc_reader, TryCancel()).Times(0);

auto reader = PartialResultSetSource::Create(std::move(grpc_reader));
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto reader = CreatePartialResultSetSource(std::move(grpc_reader));
ASSERT_STATUS_OK(reader);
EXPECT_THAT((*reader)->NextRow(), IsValidAndEquals(spanner::Row{}));
}
Expand All @@ -136,12 +184,13 @@ TEST(PartialResultSetSourceTest, MissingRowTypeWithData) {
values: { string_value: "10" })pb";
spanner_proto::PartialResultSet response;
ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
EXPECT_CALL(*grpc_reader, Read()).WillOnce(Return(response));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(Status()));
EXPECT_CALL(*grpc_reader, Read()).WillOnce(ResultMock<ReadResult>(response));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status()));
// The destructor should try to cancel the RPC to avoid deadlocks.
EXPECT_CALL(*grpc_reader, TryCancel()).Times(1);
EXPECT_CALL(*grpc_reader, TryCancel()).WillOnce(VoidMock());

auto reader = PartialResultSetSource::Create(std::move(grpc_reader));
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto reader = CreatePartialResultSetSource(std::move(grpc_reader));
ASSERT_STATUS_OK(reader);
StatusOr<spanner::Row> row = (*reader)->NextRow();
EXPECT_THAT(row, StatusIs(StatusCode::kInternal,
Expand Down Expand Up @@ -190,11 +239,13 @@ TEST(PartialResultSetSourceTest, SingleResponse) {
spanner_proto::PartialResultSet response;
ASSERT_TRUE(TextFormat::ParseFromString(kText, &response));
EXPECT_CALL(*grpc_reader, Read())
.WillOnce(Return(response))
.WillOnce(Return(absl::optional<spanner_proto::PartialResultSet>{}));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(Status()));
.WillOnce(ResultMock<ReadResult>(response))
.WillOnce(ResultMock<ReadResult>(absl::nullopt));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status()));
EXPECT_CALL(*grpc_reader, TryCancel()).Times(0);

auto reader = PartialResultSetSource::Create(std::move(grpc_reader));
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto reader = CreatePartialResultSetSource(std::move(grpc_reader));
EXPECT_STATUS_OK(reader.status());

// Verify the returned metadata is correct.
Expand Down Expand Up @@ -308,15 +359,17 @@ TEST(PartialResultSetSourceTest, MultipleResponses) {
ASSERT_TRUE(TextFormat::ParseFromString(text[i], &response[i]));
}
EXPECT_CALL(*grpc_reader, Read())
.WillOnce(Return(response[0]))
.WillOnce(Return(response[1]))
.WillOnce(Return(response[2]))
.WillOnce(Return(response[3]))
.WillOnce(Return(response[4]))
.WillOnce(Return(absl::optional<spanner_proto::PartialResultSet>{}));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(Status()));

auto reader = PartialResultSetSource::Create(std::move(grpc_reader));
.WillOnce(ResultMock<ReadResult>(response[0]))
.WillOnce(ResultMock<ReadResult>(response[1]))
.WillOnce(ResultMock<ReadResult>(response[2]))
.WillOnce(ResultMock<ReadResult>(response[3]))
.WillOnce(ResultMock<ReadResult>(response[4]))
.WillOnce(ResultMock<ReadResult>(absl::nullopt));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status()));
EXPECT_CALL(*grpc_reader, TryCancel()).Times(0);

internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto reader = CreatePartialResultSetSource(std::move(grpc_reader));
EXPECT_STATUS_OK(reader.status());

// Verify the returned rows are correct.
Expand Down Expand Up @@ -365,13 +418,15 @@ TEST(PartialResultSetSourceTest, ResponseWithNoValues) {
ASSERT_TRUE(TextFormat::ParseFromString(text[i], &response[i]));
}
EXPECT_CALL(*grpc_reader, Read())
.WillOnce(Return(response[0]))
.WillOnce(Return(response[1]))
.WillOnce(Return(response[2]))
.WillOnce(Return(absl::optional<spanner_proto::PartialResultSet>{}));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(Status()));

auto reader = PartialResultSetSource::Create(std::move(grpc_reader));
.WillOnce(ResultMock<ReadResult>(response[0]))
.WillOnce(ResultMock<ReadResult>(response[1]))
.WillOnce(ResultMock<ReadResult>(response[2]))
.WillOnce(ResultMock<ReadResult>(absl::nullopt));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status()));
EXPECT_CALL(*grpc_reader, TryCancel()).Times(0);

internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto reader = CreatePartialResultSetSource(std::move(grpc_reader));
EXPECT_STATUS_OK(reader.status());

// Verify the returned row is correct.
Expand Down Expand Up @@ -426,15 +481,17 @@ TEST(PartialResultSetSourceTest, ChunkedStringValueWellFormed) {
ASSERT_TRUE(TextFormat::ParseFromString(text[i], &response[i]));
}
EXPECT_CALL(*grpc_reader, Read())
.WillOnce(Return(response[0]))
.WillOnce(Return(response[1]))
.WillOnce(Return(response[2]))
.WillOnce(Return(response[3]))
.WillOnce(Return(response[4]))
.WillOnce(Return(absl::optional<spanner_proto::PartialResultSet>{}));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(Status()));

auto reader = PartialResultSetSource::Create(std::move(grpc_reader));
.WillOnce(ResultMock<ReadResult>(response[0]))
.WillOnce(ResultMock<ReadResult>(response[1]))
.WillOnce(ResultMock<ReadResult>(response[2]))
.WillOnce(ResultMock<ReadResult>(response[3]))
.WillOnce(ResultMock<ReadResult>(response[4]))
.WillOnce(ResultMock<ReadResult>(absl::nullopt));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status()));
EXPECT_CALL(*grpc_reader, TryCancel()).Times(0);

internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto reader = CreatePartialResultSetSource(std::move(grpc_reader));
EXPECT_STATUS_OK(reader.status());

// Verify the returned values are correct.
Expand Down Expand Up @@ -476,13 +533,14 @@ TEST(PartialResultSetSourceTest, ChunkedValueSetNoValue) {
ASSERT_TRUE(TextFormat::ParseFromString(text[i], &response[i]));
}
EXPECT_CALL(*grpc_reader, Read())
.WillOnce(Return(response[0]))
.WillOnce(Return(response[1]));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(Status()));
.WillOnce(ResultMock<ReadResult>(response[0]))
.WillOnce(ResultMock<ReadResult>(response[1]));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status()));
// The destructor should try to cancel the RPC to avoid deadlocks.
EXPECT_CALL(*grpc_reader, TryCancel()).Times(1);
EXPECT_CALL(*grpc_reader, TryCancel()).WillOnce(VoidMock());

auto reader = PartialResultSetSource::Create(std::move(grpc_reader));
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto reader = CreatePartialResultSetSource(std::move(grpc_reader));
EXPECT_STATUS_OK(reader.status());

// Trying to read the next row should fail.
Expand Down Expand Up @@ -519,13 +577,14 @@ TEST(PartialResultSetSourceTest, ChunkedValueSetNoFollowingValue) {
ASSERT_TRUE(TextFormat::ParseFromString(text[i], &response[i]));
}
EXPECT_CALL(*grpc_reader, Read())
.WillOnce(Return(response[0]))
.WillOnce(Return(response[1]));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(Status()));
.WillOnce(ResultMock<ReadResult>(response[0]))
.WillOnce(ResultMock<ReadResult>(response[1]));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status()));
// The destructor should try to cancel the RPC to avoid deadlocks.
EXPECT_CALL(*grpc_reader, TryCancel()).Times(1);
EXPECT_CALL(*grpc_reader, TryCancel()).WillOnce(VoidMock());

auto reader = PartialResultSetSource::Create(std::move(grpc_reader));
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto reader = CreatePartialResultSetSource(std::move(grpc_reader));
EXPECT_STATUS_OK(reader.status());

// Trying to read the next row should fail.
Expand Down Expand Up @@ -562,12 +621,14 @@ TEST(PartialResultSetSourceTest, ChunkedValueSetAtEndOfStream) {
ASSERT_TRUE(TextFormat::ParseFromString(text[i], &response[i]));
}
EXPECT_CALL(*grpc_reader, Read())
.WillOnce(Return(response[0]))
.WillOnce(Return(response[1]))
.WillOnce(Return(absl::optional<spanner_proto::PartialResultSet>{}));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(Status()));

auto reader = PartialResultSetSource::Create(std::move(grpc_reader));
.WillOnce(ResultMock<ReadResult>(response[0]))
.WillOnce(ResultMock<ReadResult>(response[1]))
.WillOnce(ResultMock<ReadResult>(absl::nullopt));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status()));
EXPECT_CALL(*grpc_reader, TryCancel()).Times(0);

internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto reader = CreatePartialResultSetSource(std::move(grpc_reader));
EXPECT_STATUS_OK(reader.status());

// Trying to read the next row should fail.
Expand Down Expand Up @@ -607,14 +668,15 @@ TEST(PartialResultSetSourceTest, ChunkedValueMergeFailure) {
ASSERT_TRUE(TextFormat::ParseFromString(text[i], &response[i]));
}
EXPECT_CALL(*grpc_reader, Read())
.WillOnce(Return(response[0]))
.WillOnce(Return(response[1]))
.WillOnce(Return(response[2]));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(Status()));
.WillOnce(ResultMock<ReadResult>(response[0]))
.WillOnce(ResultMock<ReadResult>(response[1]))
.WillOnce(ResultMock<ReadResult>(response[2]));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status()));
// The destructor should try to cancel the RPC to avoid deadlocks.
EXPECT_CALL(*grpc_reader, TryCancel()).Times(1);
EXPECT_CALL(*grpc_reader, TryCancel()).WillOnce(VoidMock());

auto reader = PartialResultSetSource::Create(std::move(grpc_reader));
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto reader = CreatePartialResultSetSource(std::move(grpc_reader));
EXPECT_STATUS_OK(reader.status());

// Trying to read the next row should fail.
Expand Down Expand Up @@ -678,15 +740,17 @@ TEST(PartialResultSetSourceTest, ErrorOnIncompleteRow) {
ASSERT_TRUE(TextFormat::ParseFromString(text[i], &response[i]));
}
EXPECT_CALL(*grpc_reader, Read())
.WillOnce(Return(response[0]))
.WillOnce(Return(response[1]))
.WillOnce(Return(response[2]))
.WillOnce(Return(response[3]))
.WillOnce(Return(response[4]))
.WillOnce(Return(absl::optional<spanner_proto::PartialResultSet>{}));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(Return(Status()));

auto reader = PartialResultSetSource::Create(std::move(grpc_reader));
.WillOnce(ResultMock<ReadResult>(response[0]))
.WillOnce(ResultMock<ReadResult>(response[1]))
.WillOnce(ResultMock<ReadResult>(response[2]))
.WillOnce(ResultMock<ReadResult>(response[3]))
.WillOnce(ResultMock<ReadResult>(response[4]))
.WillOnce(ResultMock<ReadResult>(absl::nullopt));
EXPECT_CALL(*grpc_reader, Finish()).WillOnce(ResultMock(Status()));
EXPECT_CALL(*grpc_reader, TryCancel()).Times(0);

internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto reader = CreatePartialResultSetSource(std::move(grpc_reader));
EXPECT_STATUS_OK(reader.status());

// Verify the first two rows are correct.
Expand Down
Loading