diff --git a/google/cloud/internal/async_connection_ready.cc b/google/cloud/internal/async_connection_ready.cc index cbe7c5c691e7e..06c7177e35b17 100644 --- a/google/cloud/internal/async_connection_ready.cc +++ b/google/cloud/internal/async_connection_ready.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/internal/async_connection_ready.h" +#include "google/cloud/options.h" namespace google { namespace cloud { @@ -60,6 +61,7 @@ void AsyncConnectionReadyFuture::RunIteration(ChannelStateType state) { explicit OnStateChange(std::shared_ptr s) : self_(std::move(s)) {} bool Notify(bool ok) override { + OptionsSpan span(options_); self_->Notify(ok); return true; } @@ -67,6 +69,7 @@ void AsyncConnectionReadyFuture::RunIteration(ChannelStateType state) { private: std::shared_ptr const self_; + Options options_ = CurrentOptions(); }; auto op = std::make_shared(shared_from_this()); diff --git a/google/cloud/internal/async_read_stream_impl.h b/google/cloud/internal/async_read_stream_impl.h index 7a72495af5c20..d77501737af29 100644 --- a/google/cloud/internal/async_read_stream_impl.h +++ b/google/cloud/internal/async_read_stream_impl.h @@ -17,6 +17,7 @@ #include "google/cloud/grpc_error_delegate.h" #include "google/cloud/internal/completion_queue_impl.h" +#include "google/cloud/options.h" #include "google/cloud/version.h" #include #include @@ -150,10 +151,12 @@ class AsyncReadStreamImpl private: void Cancel() override {} // LCOV_EXCL_LINE bool Notify(bool ok) override { + OptionsSpan span(options_); control_->OnStart(ok); return true; } std::shared_ptr control_; + Options options_ = CurrentOptions(); }; context_ = std::move(context); @@ -195,10 +198,12 @@ class AsyncReadStreamImpl private: void Cancel() override {} // LCOV_EXCL_LINE bool Notify(bool ok) override { + OptionsSpan span(options_); control_->OnRead(ok, std::move(response)); return true; } std::shared_ptr control_; + Options options_ = CurrentOptions(); }; auto callback = std::make_shared(this->shared_from_this()); @@ -243,10 +248,12 @@ class AsyncReadStreamImpl private: void Cancel() override {} // LCOV_EXCL_LINE bool Notify(bool ok) override { + OptionsSpan span(options_); control_->OnFinish(ok, MakeStatusFromRpcError(status)); return true; } std::shared_ptr control_; + Options options_ = CurrentOptions(); }; auto callback = std::make_shared(this->shared_from_this()); @@ -281,10 +288,12 @@ class AsyncReadStreamImpl private: void Cancel() override {} // LCOV_EXCL_LINE bool Notify(bool ok) override { + OptionsSpan span(options_); control_->OnDiscard(ok, std::move(response)); return true; } std::shared_ptr control_; + Options options_ = CurrentOptions(); }; auto callback = std::make_shared(this->shared_from_this()); diff --git a/google/cloud/internal/async_read_write_stream_impl.h b/google/cloud/internal/async_read_write_stream_impl.h index b9841065344f7..9fb07e6061943 100644 --- a/google/cloud/internal/async_read_write_stream_impl.h +++ b/google/cloud/internal/async_read_write_stream_impl.h @@ -18,6 +18,7 @@ #include "google/cloud/completion_queue.h" #include "google/cloud/grpc_error_delegate.h" #include "google/cloud/internal/completion_queue_impl.h" +#include "google/cloud/options.h" #include "google/cloud/version.h" #include "absl/functional/function_ref.h" #include "absl/types/optional.h" @@ -67,7 +68,9 @@ class AsyncStreamingReadWriteRpcImpl future Start() override { struct OnStart : public AsyncGrpcOperation { promise p; + Options options_ = CurrentOptions(); bool Notify(bool ok) override { + OptionsSpan span(options_); p.set_value(ok); return true; } @@ -82,7 +85,9 @@ class AsyncStreamingReadWriteRpcImpl struct OnRead : public AsyncGrpcOperation { promise> p; Response response; + Options options_ = CurrentOptions(); bool Notify(bool ok) override { + OptionsSpan span(options_); if (!ok) { p.set_value({}); return true; @@ -102,7 +107,9 @@ class AsyncStreamingReadWriteRpcImpl grpc::WriteOptions options) override { struct OnWrite : public AsyncGrpcOperation { promise p; + Options options_ = CurrentOptions(); bool Notify(bool ok) override { + OptionsSpan span(options_); p.set_value(ok); return true; } @@ -118,7 +125,9 @@ class AsyncStreamingReadWriteRpcImpl future WritesDone() override { struct OnWritesDone : public AsyncGrpcOperation { promise p; + Options options_ = CurrentOptions(); bool Notify(bool ok) override { + OptionsSpan span(options_); p.set_value(ok); return true; } @@ -132,8 +141,10 @@ class AsyncStreamingReadWriteRpcImpl future Finish() override { struct OnFinish : public AsyncGrpcOperation { promise p; + Options options_ = CurrentOptions(); grpc::Status status; bool Notify(bool /*ok*/) override { + OptionsSpan span(options_); p.set_value(MakeStatusFromRpcError(std::move(status))); return true; } diff --git a/google/cloud/internal/async_retry_loop_test.cc b/google/cloud/internal/async_retry_loop_test.cc index 27450878254f0..2162cce2445f5 100644 --- a/google/cloud/internal/async_retry_loop_test.cc +++ b/google/cloud/internal/async_retry_loop_test.cc @@ -14,10 +14,12 @@ #include "google/cloud/internal/async_retry_loop.h" #include "google/cloud/internal/background_threads_impl.h" +#include "google/cloud/options.h" #include "google/cloud/testing_util/fake_completion_queue_impl.h" #include "google/cloud/testing_util/status_matchers.h" #include #include +#include namespace google { namespace cloud { @@ -31,6 +33,10 @@ using ::testing::AllOf; using ::testing::HasSubstr; using ::testing::Return; +struct TestOption { + using Type = std::string; +}; + struct TestRetryablePolicy { static bool IsPermanentFailure(google::cloud::Status const& s) { return !s.ok() && @@ -92,6 +98,9 @@ class AsyncRetryLoopCancelTest : public ::testing::Test { }; TEST(AsyncRetryLoopTest, Success) { + EXPECT_EQ(internal::CurrentOptions().get(), ""); + internal::OptionsSpan span(Options{}.set("Success")); + EXPECT_EQ(internal::CurrentOptions().get(), "Success"); AutomaticallyCreatedBackgroundThreads background; StatusOr actual = AsyncRetryLoop( @@ -100,6 +109,7 @@ TEST(AsyncRetryLoopTest, Success) { [](google::cloud::CompletionQueue&, std::unique_ptr, int request) -> future> { + EXPECT_EQ(internal::CurrentOptions().get(), "Success"); return make_ready_future(StatusOr(2 * request)); }, 42, "error message") @@ -110,6 +120,10 @@ TEST(AsyncRetryLoopTest, Success) { TEST(AsyncRetryLoopTest, TransientThenSuccess) { int counter = 0; + EXPECT_EQ(internal::CurrentOptions().get(), ""); + internal::OptionsSpan span(Options{}.set("TransientThenSuccess")); + EXPECT_EQ(internal::CurrentOptions().get(), + "TransientThenSuccess"); AutomaticallyCreatedBackgroundThreads background; StatusOr actual = AsyncRetryLoop( @@ -117,6 +131,8 @@ TEST(AsyncRetryLoopTest, TransientThenSuccess) { background.cq(), [&](google::cloud::CompletionQueue&, std::unique_ptr, int request) { + EXPECT_EQ(internal::CurrentOptions().get(), + "TransientThenSuccess"); if (++counter < 3) { return make_ready_future( StatusOr(Status(StatusCode::kUnavailable, "try again"))); @@ -131,12 +147,17 @@ TEST(AsyncRetryLoopTest, TransientThenSuccess) { TEST(AsyncRetryLoopTest, ReturnJustStatus) { int counter = 0; + EXPECT_EQ(internal::CurrentOptions().get(), ""); + internal::OptionsSpan span(Options{}.set("ReturnJustStatus")); + EXPECT_EQ(internal::CurrentOptions().get(), "ReturnJustStatus"); AutomaticallyCreatedBackgroundThreads background; Status actual = AsyncRetryLoop( TestRetryPolicy(), TestBackoffPolicy(), Idempotency::kIdempotent, background.cq(), [&](google::cloud::CompletionQueue&, std::unique_ptr, int) { + EXPECT_EQ(internal::CurrentOptions().get(), + "ReturnJustStatus"); if (++counter <= 3) { return make_ready_future(Status( StatusCode::kResourceExhausted, "slow-down")); @@ -162,6 +183,9 @@ TEST(AsyncRetryLoopTest, UsesBackoffPolicy) { EXPECT_CALL(*mock, OnCompletion()).Times(3).WillRepeatedly(Return(ms(1))); int counter = 0; + EXPECT_EQ(internal::CurrentOptions().get(), ""); + internal::OptionsSpan span(Options{}.set("UsesBackoffPolicy")); + EXPECT_EQ(internal::CurrentOptions().get(), "UsesBackoffPolicy"); AutomaticallyCreatedBackgroundThreads background; StatusOr actual = AsyncRetryLoop( @@ -169,6 +193,8 @@ TEST(AsyncRetryLoopTest, UsesBackoffPolicy) { background.cq(), [&](google::cloud::CompletionQueue&, std::unique_ptr, int request) { + EXPECT_EQ(internal::CurrentOptions().get(), + "UsesBackoffPolicy"); if (++counter <= 3) { return make_ready_future( StatusOr(Status(StatusCode::kUnavailable, "try again"))); @@ -182,6 +208,11 @@ TEST(AsyncRetryLoopTest, UsesBackoffPolicy) { } TEST(AsyncRetryLoopTest, TransientFailureNonIdempotent) { + EXPECT_EQ(internal::CurrentOptions().get(), ""); + internal::OptionsSpan span( + Options{}.set("TransientFailureNonIdempotent")); + EXPECT_EQ(internal::CurrentOptions().get(), + "TransientFailureNonIdempotent"); AutomaticallyCreatedBackgroundThreads background; StatusOr actual = AsyncRetryLoop( @@ -189,6 +220,8 @@ TEST(AsyncRetryLoopTest, TransientFailureNonIdempotent) { background.cq(), [](google::cloud::CompletionQueue&, std::unique_ptr, int) { + EXPECT_EQ(internal::CurrentOptions().get(), + "TransientFailureNonIdempotent"); return make_ready_future(StatusOr( Status(StatusCode::kUnavailable, "test-message-try-again"))); }, @@ -201,6 +234,11 @@ TEST(AsyncRetryLoopTest, TransientFailureNonIdempotent) { } TEST(AsyncRetryLoopTest, PermanentFailureIdempotent) { + EXPECT_EQ(internal::CurrentOptions().get(), ""); + internal::OptionsSpan span( + Options{}.set("PermanentFailureIdempotent")); + EXPECT_EQ(internal::CurrentOptions().get(), + "PermanentFailureIdempotent"); AutomaticallyCreatedBackgroundThreads background; StatusOr actual = AsyncRetryLoop( @@ -208,6 +246,8 @@ TEST(AsyncRetryLoopTest, PermanentFailureIdempotent) { background.cq(), [](google::cloud::CompletionQueue&, std::unique_ptr, int) { + EXPECT_EQ(internal::CurrentOptions().get(), + "PermanentFailureIdempotent"); return make_ready_future(StatusOr( Status(StatusCode::kPermissionDenied, "test-message-uh-oh"))); }, @@ -220,6 +260,11 @@ TEST(AsyncRetryLoopTest, PermanentFailureIdempotent) { } TEST(AsyncRetryLoopTest, TooManyTransientFailuresIdempotent) { + EXPECT_EQ(internal::CurrentOptions().get(), ""); + internal::OptionsSpan span( + Options{}.set("TooManyTransientFailuresIdempotent")); + EXPECT_EQ(internal::CurrentOptions().get(), + "TooManyTransientFailuresIdempotent"); AutomaticallyCreatedBackgroundThreads background; StatusOr actual = AsyncRetryLoop( @@ -227,6 +272,8 @@ TEST(AsyncRetryLoopTest, TooManyTransientFailuresIdempotent) { background.cq(), [](google::cloud::CompletionQueue&, std::unique_ptr, int) { + EXPECT_EQ(internal::CurrentOptions().get(), + "TooManyTransientFailuresIdempotent"); return make_ready_future(StatusOr( Status(StatusCode::kUnavailable, "test-message-try-again"))); }, @@ -240,6 +287,11 @@ TEST(AsyncRetryLoopTest, TooManyTransientFailuresIdempotent) { TEST(AsyncRetryLoopTest, ExhaustedDuringBackoff) { using ms = std::chrono::milliseconds; + EXPECT_EQ(internal::CurrentOptions().get(), ""); + internal::OptionsSpan span( + Options{}.set("ExhaustedDuringBackoff")); + EXPECT_EQ(internal::CurrentOptions().get(), + "ExhaustedDuringBackoff"); AutomaticallyCreatedBackgroundThreads background; StatusOr actual = AsyncRetryLoop( @@ -248,6 +300,8 @@ TEST(AsyncRetryLoopTest, ExhaustedDuringBackoff) { Idempotency::kIdempotent, background.cq(), [](google::cloud::CompletionQueue&, std::unique_ptr, int) { + EXPECT_EQ(internal::CurrentOptions().get(), + "ExhaustedDuringBackoff"); return make_ready_future(StatusOr( Status(StatusCode::kUnavailable, "test-message-try-again"))); }, @@ -290,6 +344,9 @@ TEST(AsyncRetryLoopTest, SetsTimeout) { EXPECT_CALL(*mock, IsPermanentFailure).WillRepeatedly(Return(false)); EXPECT_CALL(*mock, Setup).Times(3); + EXPECT_EQ(internal::CurrentOptions().get(), ""); + internal::OptionsSpan span(Options{}.set("SetsTimeout")); + EXPECT_EQ(internal::CurrentOptions().get(), "SetsTimeout"); AutomaticallyCreatedBackgroundThreads background; StatusOr actual = @@ -298,6 +355,8 @@ TEST(AsyncRetryLoopTest, SetsTimeout) { TestBackoffPolicy(), Idempotency::kIdempotent, background.cq(), [&](google::cloud::CompletionQueue&, std::unique_ptr, int /*request*/) { + EXPECT_EQ(internal::CurrentOptions().get(), + "SetsTimeout"); return make_ready_future( StatusOr(Status(StatusCode::kUnavailable, "try again"))); }, diff --git a/google/cloud/internal/async_rpc_details.h b/google/cloud/internal/async_rpc_details.h index fe20d254d01ca..cf79dd741eff2 100644 --- a/google/cloud/internal/async_rpc_details.h +++ b/google/cloud/internal/async_rpc_details.h @@ -18,6 +18,7 @@ #include "google/cloud/async_operation.h" #include "google/cloud/future.h" #include "google/cloud/grpc_error_delegate.h" +#include "google/cloud/options.h" #include "google/cloud/status_or.h" #include "google/cloud/version.h" #include @@ -68,6 +69,7 @@ class AsyncUnaryRpcFuture : public AsyncGrpcOperation { void Cancel() override {} bool Notify(bool ok) override { + OptionsSpan span(options_); if (!ok) { // `Finish()` always returns `true` for unary RPCs, so the only time we // get `!ok` is after `Shutdown()` was called; treat that as "cancelled". @@ -95,6 +97,7 @@ class AsyncUnaryRpcFuture : public AsyncGrpcOperation { Response response_; promise> promise_; + Options options_ = CurrentOptions(); }; /// Verify that @p Functor meets the requirements for an AsyncUnaryRpc callback. diff --git a/google/cloud/internal/default_completion_queue_impl.cc b/google/cloud/internal/default_completion_queue_impl.cc index bf5cf150190cf..cd947c58f8ea4 100644 --- a/google/cloud/internal/default_completion_queue_impl.cc +++ b/google/cloud/internal/default_completion_queue_impl.cc @@ -14,6 +14,7 @@ #include "google/cloud/internal/default_completion_queue_impl.h" #include "google/cloud/internal/throw_delegate.h" +#include "google/cloud/options.h" #include "absl/memory/memory.h" #include #include @@ -68,12 +69,16 @@ class AsyncTimerFuture : public internal::AsyncGrpcOperation { alarm_.Set(&cq, deadline, tag); } - void Cancel() override { alarm_.Cancel(); } + void Cancel() override { + OptionsSpan span(options_); + alarm_.Cancel(); + } private: explicit AsyncTimerFuture() : promise_(null_promise_t{}) {} bool Notify(bool ok) override { + OptionsSpan span(options_); promise_.set_value(ok ? ValueType(deadline_) : Canceled()); return true; } @@ -85,6 +90,7 @@ class AsyncTimerFuture : public internal::AsyncGrpcOperation { promise promise_; std::chrono::system_clock::time_point deadline_; grpc::Alarm alarm_; + Options options_ = CurrentOptions(); }; } // namespace @@ -105,6 +111,7 @@ class DefaultCompletionQueueImpl::WakeUpRunAsyncLoop private: bool Notify(bool ok) override { + OptionsSpan span(options_); if (!ok) return true; // do not run async operations on shutdown CQs if (auto self = weak_.lock()) self->DrainRunAsyncLoop(); return true; @@ -112,6 +119,7 @@ class DefaultCompletionQueueImpl::WakeUpRunAsyncLoop std::weak_ptr weak_; grpc::Alarm alarm_; + Options options_ = CurrentOptions(); }; // A helper class to wake up the asynchronous thread and drain the RunAsync() @@ -130,6 +138,7 @@ class DefaultCompletionQueueImpl::WakeUpRunAsyncOnIdle private: bool Notify(bool ok) override { + OptionsSpan span(options_); if (!ok) return true; // do not run async operations on shutdown CQs if (auto self = weak_.lock()) self->DrainRunAsyncOnIdle(); return true; @@ -137,6 +146,7 @@ class DefaultCompletionQueueImpl::WakeUpRunAsyncOnIdle std::weak_ptr weak_; grpc::Alarm alarm_; + Options options_ = CurrentOptions(); }; DefaultCompletionQueueImpl::DefaultCompletionQueueImpl() diff --git a/google/cloud/options.cc b/google/cloud/options.cc index e30966b5d4815..a853380b2add9 100644 --- a/google/cloud/options.cc +++ b/google/cloud/options.cc @@ -15,6 +15,7 @@ #include "google/cloud/options.h" #include "google/cloud/internal/algorithm.h" #include "google/cloud/log.h" +#include #include #include #include @@ -40,6 +41,27 @@ Options MergeOptions(Options a, Options b) { return a; } +namespace { + +// The prevailing options for the current operation. Thread local, so +// additional propagation must be done whenever work for the operation +// is done in another thread. +Options& ThreadLocalOptions() { + thread_local Options current_options; + return current_options; +} + +} // namespace + +Options const& CurrentOptions() { return ThreadLocalOptions(); } + +OptionsSpan::OptionsSpan(Options opts) : opts_(std::move(opts)) { + using std::swap; + swap(opts_, ThreadLocalOptions()); +} + +OptionsSpan::~OptionsSpan() { ThreadLocalOptions() = std::move(opts_); } + } // namespace internal GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace cloud diff --git a/google/cloud/options.h b/google/cloud/options.h index a9d4856f2a78f..6de7fe918eeef 100644 --- a/google/cloud/options.h +++ b/google/cloud/options.h @@ -17,6 +17,7 @@ #include "google/cloud/internal/type_list.h" #include "google/cloud/version.h" +#include "absl/base/attributes.h" #include "absl/memory/memory.h" #include #include @@ -277,11 +278,11 @@ void CheckExpectedOptionsImpl(OptionList const&, Options const& opts, * For example, * * @code - * struct FooOption { int value; }; - * struct BarOption { int value; }; + * struct FooOption { using Type = int; }; + * struct BarOption { using Type = int; }; * using MyOptions = OptionList; * - * struct BazOption { int value; }; + * struct BazOption { using Type = int; }; * * // All valid ways to call this with varying expectations. * CheckExpectedOptions(opts, "test caller"); @@ -306,6 +307,40 @@ void CheckExpectedOptions(Options const& opts, char const* caller) { */ Options MergeOptions(Options a, Options b); +/** + * The prevailing options for the current operation. + */ +Options const& CurrentOptions(); + +/** + * RAII object to set/restore the prevailing options for the enclosing scope. + * + * @code + * struct IntOption { using Type = int; }; + * assert(!internal::CurrentOptions().has()); + * { + * internal::OptionsSpan span(Options{}.set(1)); + * assert(internal::CurrentOptions().get() == 1); + * { + * internal::OptionsSpan span(Options{}.set(2)); + * assert(internal::CurrentOptions().get() == 2); + * } + * assert(internal::CurrentOptions().get() == 1); + * } + * assert(!internal::CurrentOptions().has()); + * @endcode + * + * @param opts the `Options` to install. + */ +class ABSL_MUST_USE_RESULT OptionsSpan { + public: + explicit OptionsSpan(Options opts); + ~OptionsSpan(); + + private: + Options opts_; +}; + } // namespace internal GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/options_test.cc b/google/cloud/options_test.cc index e715ef2e46b6f..455127fdaa142 100644 --- a/google/cloud/options_test.cc +++ b/google/cloud/options_test.cc @@ -260,6 +260,20 @@ TEST(MergeOptions, Basics) { EXPECT_EQ(a.get(), 42); // From a } +TEST(OptionsSpan, Basics) { + EXPECT_FALSE(internal::CurrentOptions().has()); + { + internal::OptionsSpan span(Options{}.set(1)); + EXPECT_EQ(internal::CurrentOptions().get(), 1); + { + internal::OptionsSpan span(Options{}.set(2)); + EXPECT_EQ(internal::CurrentOptions().get(), 2); + } + EXPECT_EQ(internal::CurrentOptions().get(), 1); + } + EXPECT_FALSE(internal::CurrentOptions().has()); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace cloud diff --git a/google/cloud/testing_util/fake_completion_queue_impl.cc b/google/cloud/testing_util/fake_completion_queue_impl.cc index 975fabde85852..99a5596debb27 100644 --- a/google/cloud/testing_util/fake_completion_queue_impl.cc +++ b/google/cloud/testing_util/fake_completion_queue_impl.cc @@ -13,12 +13,14 @@ // limitations under the License. #include "google/cloud/testing_util/fake_completion_queue_impl.h" +#include "google/cloud/options.h" namespace google { namespace cloud { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace testing_util { namespace { + class FakeAsyncTimer : public internal::AsyncGrpcOperation { public: explicit FakeAsyncTimer(std::chrono::system_clock::time_point deadline) @@ -31,6 +33,7 @@ class FakeAsyncTimer : public internal::AsyncGrpcOperation { void Cancel() override {} bool Notify(bool ok) override { + internal::OptionsSpan span(options_); if (!ok) { promise_.set_value(Status(StatusCode::kCancelled, "timer canceled")); } else { @@ -42,6 +45,7 @@ class FakeAsyncTimer : public internal::AsyncGrpcOperation { private: std::chrono::system_clock::time_point const deadline_; promise> promise_; + Options options_ = internal::CurrentOptions(); }; class FakeAsyncFunction : public internal::AsyncGrpcOperation { @@ -53,6 +57,7 @@ class FakeAsyncFunction : public internal::AsyncGrpcOperation { private: bool Notify(bool ok) override { + internal::OptionsSpan span(options_); auto f = std::move(function_); if (!ok) return true; f->exec(); @@ -60,6 +65,7 @@ class FakeAsyncFunction : public internal::AsyncGrpcOperation { } std::unique_ptr function_; + Options options_ = internal::CurrentOptions(); }; } // namespace