diff --git a/google/cloud/internal/resumable_streaming_read_rpc.h b/google/cloud/internal/resumable_streaming_read_rpc.h index 5916f67ddd106..616d0a9703f7c 100644 --- a/google/cloud/internal/resumable_streaming_read_rpc.h +++ b/google/cloud/internal/resumable_streaming_read_rpc.h @@ -16,6 +16,7 @@ #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_RESUMABLE_STREAMING_READ_RPC_H #include "google/cloud/internal/streaming_read_rpc.h" +#include "google/cloud/options.h" #include "google/cloud/version.h" #include #include @@ -81,6 +82,7 @@ class ResumableStreamingReadRpc : public StreamingReadRpc { backoff_policy_prototype_(std::move(backoff_policy)), sleeper_(std::move(sleeper)), stream_factory_(std::move(stream_factory)), + options_(internal::CurrentOptions()), updater_(std::move(updater)), request_(std::move(request)), impl_(stream_factory_(request_)) {} @@ -88,9 +90,13 @@ class ResumableStreamingReadRpc : public StreamingReadRpc { ResumableStreamingReadRpc(ResumableStreamingReadRpc&&) = delete; ResumableStreamingReadRpc& operator=(ResumableStreamingReadRpc&&) = delete; - void Cancel() override { impl_->Cancel(); } + void Cancel() override { + internal::OptionsSpan span(options_); + impl_->Cancel(); + } absl::variant Read() override { + internal::OptionsSpan span(options_); auto response = impl_->Read(); if (absl::holds_alternative(response)) { updater_(absl::get(response), request_); @@ -128,6 +134,7 @@ class ResumableStreamingReadRpc : public StreamingReadRpc { } StreamingRpcMetadata GetRequestMetadata() const override { + internal::OptionsSpan span(options_); return impl_ ? impl_->GetRequestMetadata() : StreamingRpcMetadata{}; } @@ -136,6 +143,7 @@ class ResumableStreamingReadRpc : public StreamingReadRpc { std::unique_ptr const backoff_policy_prototype_; Sleeper sleeper_; StreamFactory const stream_factory_; + Options const options_; RequestUpdater const updater_; RequestType request_; std::unique_ptr> impl_; diff --git a/google/cloud/internal/resumable_streaming_read_rpc_test.cc b/google/cloud/internal/resumable_streaming_read_rpc_test.cc index 839b6f3591167..1316ff3df6a1a 100644 --- a/google/cloud/internal/resumable_streaming_read_rpc_test.cc +++ b/google/cloud/internal/resumable_streaming_read_rpc_test.cc @@ -42,6 +42,10 @@ struct FakeResponse { std::string token; }; +struct FakeOption { + using Type = std::string; +}; + using ReadReturn = absl::variant; class MockStreamingReadRpc : public StreamingReadRpc { @@ -105,6 +109,7 @@ TEST(ResumableStreamingReadRpc, ResumeWithPartials) { MockStub mock; EXPECT_CALL(mock, StreamingRead) .WillOnce([](FakeRequest const& request) { + EXPECT_EQ("create-time", internal::CurrentOptions().get()); EXPECT_EQ(request.key, "test-key"); EXPECT_THAT(request.token, IsEmpty()); auto stream = absl::make_unique(); @@ -115,6 +120,7 @@ TEST(ResumableStreamingReadRpc, ResumeWithPartials) { return stream; }) .WillOnce([](FakeRequest const& request) { + EXPECT_EQ("create-time", internal::CurrentOptions().get()); EXPECT_EQ(request.key, "test-key"); EXPECT_THAT(request.token, "token-2"); auto stream = absl::make_unique(); @@ -123,6 +129,7 @@ TEST(ResumableStreamingReadRpc, ResumeWithPartials) { .WillOnce(Return(StreamSuccess())); return stream; }); + internal::OptionsSpan create_span(Options{}.set("create-time")); auto reader = MakeResumableStreamingReadRpc( DefaultRetryPolicy(), DefaultBackoffPolicy(), [](std::chrono::milliseconds) {}, @@ -131,6 +138,7 @@ TEST(ResumableStreamingReadRpc, ResumeWithPartials) { }, DefaultUpdater, FakeRequest{"test-key", {}}); + internal::OptionsSpan use_span(Options{}.set("use-time")); std::vector values; for (;;) { auto v = reader->Read(); @@ -148,16 +156,19 @@ TEST(ResumableStreamingReadRpc, TooManyTransientFailures) { MockStub mock; EXPECT_CALL(mock, StreamingRead) .WillOnce([](FakeRequest const&) { + EXPECT_EQ("create-time", internal::CurrentOptions().get()); auto stream = absl::make_unique(); EXPECT_CALL(*stream, Read).WillOnce(Return(TransientFailure())); return stream; }) .WillOnce([](FakeRequest const&) { + EXPECT_EQ("create-time", internal::CurrentOptions().get()); auto stream = absl::make_unique(); EXPECT_CALL(*stream, Read).WillOnce(Return(TransientFailure())); return stream; }) .WillOnce([](FakeRequest const&) { + EXPECT_EQ("create-time", internal::CurrentOptions().get()); // Event though this stream ends with a failure it will be resumed // because its successful Read() resets the retry policy. auto stream = absl::make_unique(); @@ -167,20 +178,25 @@ TEST(ResumableStreamingReadRpc, TooManyTransientFailures) { return stream; }) .WillOnce([](FakeRequest const&) { + EXPECT_EQ("create-time", internal::CurrentOptions().get()); auto stream = absl::make_unique(); EXPECT_CALL(*stream, Read).WillOnce(Return(TransientFailure())); return stream; }) .WillOnce([](FakeRequest const&) { + EXPECT_EQ("create-time", internal::CurrentOptions().get()); auto stream = absl::make_unique(); EXPECT_CALL(*stream, Read).WillOnce(Return(TransientFailure())); return stream; }) .WillOnce([](FakeRequest const&) { + EXPECT_EQ("create-time", internal::CurrentOptions().get()); auto stream = absl::make_unique(); EXPECT_CALL(*stream, Read).WillOnce(Return(TransientFailure())); return stream; }); + + internal::OptionsSpan create_span(Options{}.set("create-time")); auto reader = MakeResumableStreamingReadRpc( DefaultRetryPolicy(), DefaultBackoffPolicy(), [](std::chrono::milliseconds) {}, @@ -189,6 +205,7 @@ TEST(ResumableStreamingReadRpc, TooManyTransientFailures) { }, DefaultUpdater, FakeRequest{"test-key", {}}); + internal::OptionsSpan use_span(Options{}.set("use-time")); std::vector values; for (;;) { auto v = reader->Read(); @@ -207,6 +224,7 @@ TEST(ResumableStreamingReadRpc, PermanentFailure) { MockStub mock; EXPECT_CALL(mock, StreamingRead) .WillOnce([](FakeRequest const&) { + EXPECT_EQ("create-time", internal::CurrentOptions().get()); // Event though this stream ends with a failure it will be resumed // because its successful Read() resets the retry policy. auto stream = absl::make_unique(); @@ -216,10 +234,13 @@ TEST(ResumableStreamingReadRpc, PermanentFailure) { return stream; }) .WillOnce([](FakeRequest const&) { + EXPECT_EQ("create-time", internal::CurrentOptions().get()); auto stream = absl::make_unique(); EXPECT_CALL(*stream, Read).WillOnce(Return(PermanentFailure())); return stream; }); + + internal::OptionsSpan create_span(Options{}.set("create-time")); auto reader = MakeResumableStreamingReadRpc( DefaultRetryPolicy(), DefaultBackoffPolicy(), [](std::chrono::milliseconds) {}, @@ -228,6 +249,7 @@ TEST(ResumableStreamingReadRpc, PermanentFailure) { }, DefaultUpdater, FakeRequest{"test-key", {}}); + internal::OptionsSpan use_span(Options{}.set("use-time")); std::vector values; for (;;) { auto v = reader->Read(); @@ -245,10 +267,13 @@ TEST(ResumableStreamingReadRpc, PermanentFailure) { TEST(ResumableStreamingReadRpc, PermanentFailureAtStart) { MockStub mock; EXPECT_CALL(mock, StreamingRead).WillOnce([](FakeRequest const&) { + EXPECT_EQ("create-time", internal::CurrentOptions().get()); auto stream = absl::make_unique(); EXPECT_CALL(*stream, Read).WillOnce(Return(PermanentFailure())); return stream; }); + + internal::OptionsSpan create_span(Options{}.set("create-time")); auto reader = MakeResumableStreamingReadRpc( DefaultRetryPolicy(), DefaultBackoffPolicy(), [](std::chrono::milliseconds) {}, @@ -257,6 +282,7 @@ TEST(ResumableStreamingReadRpc, PermanentFailureAtStart) { }, DefaultUpdater, FakeRequest{"test-key", {}}); + internal::OptionsSpan use_span(Options{}.set("use-time")); std::vector values; for (;;) { auto v = reader->Read();