Skip to content

Commit

Permalink
fix: restore Options over deletion of StreamRange<T>::reader_ (#8403)
Browse files Browse the repository at this point in the history
Restore the creation-time `Options` in the `StreamRange<T>` destructor
while explicitly clearing the `reader_` functor.  If the reader has not
reached the end of the range, then deleting it may send a best-effort,
out-of-band cancel on the call, and we should restore the options when
re-entering the library.  Add a test for the destructor case.  This is
a followup to #8256.

Also take this opportunity to delete some unused type aliases I stumbled
upon during this work.
  • Loading branch information
devbww authored Feb 17, 2022
1 parent f20f642 commit 2fd4118
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 43 deletions.
3 changes: 0 additions & 3 deletions google/cloud/internal/resumable_streaming_read_rpc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ struct FakeResponse {
std::string token;
};

using MockReturnType =
std::unique_ptr<grpc::ClientReaderInterface<FakeResponse>>;

using ReadReturn = absl::variant<Status, FakeResponse>;

class MockStreamingReadRpc : public StreamingReadRpc<FakeResponse> {
Expand Down
3 changes: 0 additions & 3 deletions google/cloud/internal/streaming_read_rpc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ struct FakeResponse {
std::string value;
};

using MockReturnType =
std::unique_ptr<grpc::ClientReaderInterface<FakeResponse>>;

class MockReader : public grpc::ClientReaderInterface<FakeResponse> {
public:
MOCK_METHOD(bool, Read, (FakeResponse*), (override));
Expand Down
3 changes: 0 additions & 3 deletions google/cloud/internal/streaming_write_rpc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ struct FakeResponse {
std::string value;
};

using MockReturnType =
std::unique_ptr<grpc::ClientReaderInterface<FakeResponse>>;

class MockWriter : public grpc::ClientWriterInterface<FakeRequest> {
public:
MOCK_METHOD(bool, Write, (FakeRequest const&, grpc::WriteOptions),
Expand Down
7 changes: 6 additions & 1 deletion google/cloud/stream_range.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ class StreamRange {
*/
StreamRange() = default;

~StreamRange() {
internal::OptionsSpan span(options_);
reader_ = nullptr;
}

//@{
// @name Move-only
StreamRange(StreamRange const&) = delete;
Expand Down Expand Up @@ -226,8 +231,8 @@ class StreamRange {
Next();
}

internal::StreamReader<T> reader_;
Options options_ = internal::CurrentOptions();
internal::StreamReader<T> reader_;
StatusOr<T> current_;
bool current_ok_ = false;
bool is_end_ = true;
Expand Down
111 changes: 78 additions & 33 deletions google/cloud/stream_range_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,22 @@ namespace {

using ::google::cloud::testing_util::StatusIs;
using ::testing::ElementsAre;
using ::testing::UnitTest;

struct StringOption {
using Type = std::string;
};

std::string CurrentTestName() {
return UnitTest::GetInstance()->current_test_info()->name();
}

template <typename T>
StreamRange<T> MakeStreamRange(internal::StreamReader<T> reader) {
internal::OptionsSpan span(Options{}.set<StringOption>(CurrentTestName()));
return internal::MakeStreamRange(std::move(reader));
}

TEST(StreamRange, DefaultConstructed) {
StreamRange<int> sr;
auto it = sr.begin();
Expand All @@ -43,14 +54,14 @@ TEST(StreamRange, DefaultConstructed) {

TEST(StreamRange, MoveOnly) {
auto const reader = [] { return Status{}; };
StreamRange<int> sr = internal::MakeStreamRange<int>(reader);
StreamRange<int> move_construct = std::move(sr);
StreamRange<int> move_assign = internal::MakeStreamRange<int>(reader);
auto sr = MakeStreamRange<int>(reader);
auto move_construct = std::move(sr);
auto move_assign = MakeStreamRange<int>(reader);
move_assign = std::move(move_construct);
}

TEST(StreamRange, EmptyRange) {
StreamRange<int> sr = internal::MakeStreamRange<int>([] { return Status{}; });
auto sr = MakeStreamRange<int>([] { return Status{}; });
auto it = sr.begin();
auto end = sr.end();
EXPECT_EQ(it, end);
Expand All @@ -59,16 +70,16 @@ TEST(StreamRange, EmptyRange) {
}

TEST(StreamRange, OneElement) {
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto counter = 0;
auto reader = [&counter]() -> internal::StreamReader<int>::result_type {
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(), "OneElement");
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(),
CurrentTestName());
if (counter++ < 1) return 42;
return Status{};
};

internal::OptionsSpan span(Options{}.set<StringOption>("OneElement"));
StreamRange<int> sr = internal::MakeStreamRange<int>(std::move(reader));
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto sr = MakeStreamRange<int>(std::move(reader));
auto it = sr.begin();
EXPECT_NE(it, sr.end());
EXPECT_TRUE(*it);
Expand All @@ -78,14 +89,14 @@ TEST(StreamRange, OneElement) {
}

TEST(StreamRange, OneError) {
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto reader = []() -> internal::StreamReader<int>::result_type {
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(), "OneError");
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(),
CurrentTestName());
return Status(StatusCode::kUnknown, "oops");
};

internal::OptionsSpan span(Options{}.set<StringOption>("OneError"));
StreamRange<int> sr = internal::MakeStreamRange<int>(std::move(reader));
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto sr = MakeStreamRange<int>(std::move(reader));
auto it = sr.begin();
EXPECT_NE(it, sr.end());
EXPECT_FALSE(*it);
Expand All @@ -95,16 +106,16 @@ TEST(StreamRange, OneError) {
}

TEST(StreamRange, FiveElements) {
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto counter = 0;
auto reader = [&counter]() -> internal::StreamReader<int>::result_type {
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(), "FiveElements");
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(),
CurrentTestName());
if (counter++ < 5) return counter;
return Status{};
};

internal::OptionsSpan span(Options{}.set<StringOption>("FiveElements"));
StreamRange<int> sr = internal::MakeStreamRange<int>(std::move(reader));
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto sr = MakeStreamRange<int>(std::move(reader));
std::vector<int> v;
for (StatusOr<int>& x : sr) {
EXPECT_TRUE(x);
Expand All @@ -114,17 +125,16 @@ TEST(StreamRange, FiveElements) {
}

TEST(StreamRange, PostFixIteration) {
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto counter = 0;
auto reader = [&counter]() -> internal::StreamReader<int>::result_type {
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(),
"PostFixIteration");
CurrentTestName());
if (counter++ < 5) return counter;
return Status{};
};

internal::OptionsSpan span(Options{}.set<StringOption>("PostFixIteration"));
StreamRange<int> sr = internal::MakeStreamRange<int>(std::move(reader));
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto sr = MakeStreamRange<int>(std::move(reader));
std::vector<int> v;
// NOLINTNEXTLINE(modernize-loop-convert)
for (auto it = sr.begin(); it != sr.end(); it++) {
Expand All @@ -135,46 +145,46 @@ TEST(StreamRange, PostFixIteration) {
}

TEST(StreamRange, Distance) {
internal::OptionsSpan overlay1(Options{}.set<StringOption>("uh-oh"));

// Empty range
StreamRange<int> sr = internal::MakeStreamRange<int>([] { return Status{}; });
auto sr = MakeStreamRange<int>([] { return Status{}; });
EXPECT_EQ(0, std::distance(sr.begin(), sr.end()));

// Range of one element
auto counter = 0;
internal::OptionsSpan span1(Options{}.set<StringOption>("Distance1"));
StreamRange<int> one = internal::MakeStreamRange<int>(
auto one = MakeStreamRange<int>(
[&counter]() -> internal::StreamReader<int>::result_type {
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(), "Distance1");
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(),
CurrentTestName());
if (counter++ < 1) return counter;
return Status{};
});
internal::OptionsSpan overlay1(Options{}.set<StringOption>("uh-oh"));
EXPECT_EQ(1, std::distance(one.begin(), one.end()));

// Range of five elements
counter = 0;
internal::OptionsSpan span5(Options{}.set<StringOption>("Distance5"));
StreamRange<int> five = internal::MakeStreamRange<int>(
auto five = MakeStreamRange<int>(
[&counter]() -> internal::StreamReader<int>::result_type {
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(), "Distance5");
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(),
CurrentTestName());
if (counter++ < 5) return counter;
return Status{};
});
internal::OptionsSpan overlay5(Options{}.set<StringOption>("uh-oh"));
EXPECT_EQ(5, std::distance(five.begin(), five.end()));
}

TEST(StreamRange, StreamError) {
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto counter = 0;
auto reader = [&counter]() -> internal::StreamReader<int>::result_type {
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(), "StreamError");
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(),
CurrentTestName());
if (counter++ < 2) return counter;
return Status(StatusCode::kUnknown, "oops");
};

internal::OptionsSpan span(Options{}.set<StringOption>("StreamError"));
StreamRange<int> sr = internal::MakeStreamRange<int>(std::move(reader));
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto sr = MakeStreamRange<int>(std::move(reader));

auto it = sr.begin();
EXPECT_NE(it, sr.end());
Expand All @@ -197,6 +207,41 @@ TEST(StreamRange, StreamError) {
EXPECT_EQ(it, sr.end());
}

template <typename ResponseType>
class FakeResumableStreamingReadRpc {
public:
~FakeResumableStreamingReadRpc() {
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(),
CurrentTestName());
EXPECT_TRUE(read_called_);
}

typename internal::StreamReader<ResponseType>::result_type Read() {
EXPECT_EQ(internal::CurrentOptions().get<StringOption>(),
CurrentTestName());
EXPECT_FALSE(read_called_);
read_called_ = true;
return ResponseType{};
}

private:
bool read_called_ = false;
};

TEST(StreamRange, ReaderDestructorOptionsSpan) {
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
auto reader = [] {
auto resumable = std::make_shared<FakeResumableStreamingReadRpc<int>>();
return [resumable]() -> internal::StreamReader<int>::result_type {
return resumable->Read();
};
}();
auto sr = MakeStreamRange<int>(std::move(reader));
// `~StreamRange()` will now delete the reader, which will drop the last
// reference to the `FakeResumableStreamingReadRpc`, and all of that should
// happen with `CurrentOptions()` matching those at `StreamRange` ctor time.
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace cloud
Expand Down

0 comments on commit 2fd4118

Please sign in to comment.