Skip to content

Commit

Permalink
feat(common): add support for call-tree-specific options (#7669)
Browse files Browse the repository at this point in the history
Provide access to the prevailing `Options` for an operation without having
to plumb function parameters through the various and sundry layers, like
connection.

This also means that helper libraries can determine things like whether a
tracing component is enabled, without having to distort their APIs.

For example, if all `ServiceClient::Operation()` calls are implemented like
```
class ServiceClient {
 public
  ServiceClient(std::shared_ptr<ServiceConnection> connection,
                Options options = {})
      : connection_(std::move(connection)),
        options_(ServiceDefaultOptions(std::move(options))) {}

  T Operation(..., Options options = {}) {
    internal::OptionsSpan span(internal::MergeOptions(options, options_));
    ...
  }

 private:
  std::shared_ptr<ServiceConnection> connection_;
  Options options_;
};
```
then the connection layer, or any other internal library, can retrieve the
prevailing options using `internal::CurrentOptions()`.  If the operation is
asynchronous, the invocation-time `Options` are also installed during its
callbacks.
  • Loading branch information
devbww authored Dec 1, 2021
1 parent 6080a8b commit 823c121
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 4 deletions.
3 changes: 3 additions & 0 deletions google/cloud/internal/async_connection_ready.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "google/cloud/internal/async_connection_ready.h"
#include "google/cloud/options.h"

namespace google {
namespace cloud {
Expand Down Expand Up @@ -60,13 +61,15 @@ void AsyncConnectionReadyFuture::RunIteration(ChannelStateType state) {
explicit OnStateChange(std::shared_ptr<AsyncConnectionReadyFuture> s)
: self_(std::move(s)) {}
bool Notify(bool ok) override {
OptionsSpan span(options_);
self_->Notify(ok);
return true;
}
void Cancel() override {}

private:
std::shared_ptr<AsyncConnectionReadyFuture> const self_;
Options options_ = CurrentOptions();
};

auto op = std::make_shared<OnStateChange>(shared_from_this());
Expand Down
9 changes: 9 additions & 0 deletions google/cloud/internal/async_read_stream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "google/cloud/grpc_error_delegate.h"
#include "google/cloud/internal/completion_queue_impl.h"
#include "google/cloud/options.h"
#include "google/cloud/version.h"
#include <grpcpp/support/async_stream.h>
#include <memory>
Expand Down Expand Up @@ -150,10 +151,12 @@ class AsyncReadStreamImpl
private:
void Cancel() override {} // LCOV_EXCL_LINE
bool Notify(bool ok) override {
OptionsSpan span(options_);
control_->OnStart(ok);
return true;
}
std::shared_ptr<AsyncReadStreamImpl> control_;
Options options_ = CurrentOptions();
};

context_ = std::move(context);
Expand Down Expand Up @@ -195,10 +198,12 @@ class AsyncReadStreamImpl
private:
void Cancel() override {} // LCOV_EXCL_LINE
bool Notify(bool ok) override {
OptionsSpan span(options_);
control_->OnRead(ok, std::move(response));
return true;
}
std::shared_ptr<AsyncReadStreamImpl> control_;
Options options_ = CurrentOptions();
};

auto callback = std::make_shared<NotifyRead>(this->shared_from_this());
Expand Down Expand Up @@ -243,10 +248,12 @@ class AsyncReadStreamImpl
private:
void Cancel() override {} // LCOV_EXCL_LINE
bool Notify(bool ok) override {
OptionsSpan span(options_);
control_->OnFinish(ok, MakeStatusFromRpcError(status));
return true;
}
std::shared_ptr<AsyncReadStreamImpl> control_;
Options options_ = CurrentOptions();
};

auto callback = std::make_shared<NotifyFinish>(this->shared_from_this());
Expand Down Expand Up @@ -281,10 +288,12 @@ class AsyncReadStreamImpl
private:
void Cancel() override {} // LCOV_EXCL_LINE
bool Notify(bool ok) override {
OptionsSpan span(options_);
control_->OnDiscard(ok, std::move(response));
return true;
}
std::shared_ptr<AsyncReadStreamImpl> control_;
Options options_ = CurrentOptions();
};

auto callback = std::make_shared<NotifyDiscard>(this->shared_from_this());
Expand Down
11 changes: 11 additions & 0 deletions google/cloud/internal/async_read_write_stream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "google/cloud/completion_queue.h"
#include "google/cloud/grpc_error_delegate.h"
#include "google/cloud/internal/completion_queue_impl.h"
#include "google/cloud/options.h"
#include "google/cloud/version.h"
#include "absl/functional/function_ref.h"
#include "absl/types/optional.h"
Expand Down Expand Up @@ -67,7 +68,9 @@ class AsyncStreamingReadWriteRpcImpl
future<bool> Start() override {
struct OnStart : public AsyncGrpcOperation {
promise<bool> p;
Options options_ = CurrentOptions();
bool Notify(bool ok) override {
OptionsSpan span(options_);
p.set_value(ok);
return true;
}
Expand All @@ -82,7 +85,9 @@ class AsyncStreamingReadWriteRpcImpl
struct OnRead : public AsyncGrpcOperation {
promise<absl::optional<Response>> p;
Response response;
Options options_ = CurrentOptions();
bool Notify(bool ok) override {
OptionsSpan span(options_);
if (!ok) {
p.set_value({});
return true;
Expand All @@ -102,7 +107,9 @@ class AsyncStreamingReadWriteRpcImpl
grpc::WriteOptions options) override {
struct OnWrite : public AsyncGrpcOperation {
promise<bool> p;
Options options_ = CurrentOptions();
bool Notify(bool ok) override {
OptionsSpan span(options_);
p.set_value(ok);
return true;
}
Expand All @@ -118,7 +125,9 @@ class AsyncStreamingReadWriteRpcImpl
future<bool> WritesDone() override {
struct OnWritesDone : public AsyncGrpcOperation {
promise<bool> p;
Options options_ = CurrentOptions();
bool Notify(bool ok) override {
OptionsSpan span(options_);
p.set_value(ok);
return true;
}
Expand All @@ -132,8 +141,10 @@ class AsyncStreamingReadWriteRpcImpl
future<Status> Finish() override {
struct OnFinish : public AsyncGrpcOperation {
promise<Status> p;
Options options_ = CurrentOptions();
grpc::Status status;
bool Notify(bool /*ok*/) override {
OptionsSpan span(options_);
p.set_value(MakeStatusFromRpcError(std::move(status)));
return true;
}
Expand Down
59 changes: 59 additions & 0 deletions google/cloud/internal/async_retry_loop_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

#include "google/cloud/internal/async_retry_loop.h"
#include "google/cloud/internal/background_threads_impl.h"
#include "google/cloud/options.h"
#include "google/cloud/testing_util/fake_completion_queue_impl.h"
#include "google/cloud/testing_util/status_matchers.h"
#include <gmock/gmock.h>
#include <deque>
#include <string>

namespace google {
namespace cloud {
Expand All @@ -31,6 +33,10 @@ using ::testing::AllOf;
using ::testing::HasSubstr;
using ::testing::Return;

struct TestOption {
using Type = std::string;
};

struct TestRetryablePolicy {
static bool IsPermanentFailure(google::cloud::Status const& s) {
return !s.ok() &&
Expand Down Expand Up @@ -92,6 +98,9 @@ class AsyncRetryLoopCancelTest : public ::testing::Test {
};

TEST(AsyncRetryLoopTest, Success) {
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(), "");
internal::OptionsSpan span(Options{}.set<TestOption>("Success"));
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(), "Success");
AutomaticallyCreatedBackgroundThreads background;
StatusOr<int> actual =
AsyncRetryLoop(
Expand All @@ -100,6 +109,7 @@ TEST(AsyncRetryLoopTest, Success) {
[](google::cloud::CompletionQueue&,
std::unique_ptr<grpc::ClientContext>,
int request) -> future<StatusOr<int>> {
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(), "Success");
return make_ready_future(StatusOr<int>(2 * request));
},
42, "error message")
Expand All @@ -110,13 +120,19 @@ TEST(AsyncRetryLoopTest, Success) {

TEST(AsyncRetryLoopTest, TransientThenSuccess) {
int counter = 0;
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(), "");
internal::OptionsSpan span(Options{}.set<TestOption>("TransientThenSuccess"));
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(),
"TransientThenSuccess");
AutomaticallyCreatedBackgroundThreads background;
StatusOr<int> actual =
AsyncRetryLoop(
TestRetryPolicy(), TestBackoffPolicy(), Idempotency::kIdempotent,
background.cq(),
[&](google::cloud::CompletionQueue&,
std::unique_ptr<grpc::ClientContext>, int request) {
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(),
"TransientThenSuccess");
if (++counter < 3) {
return make_ready_future(
StatusOr<int>(Status(StatusCode::kUnavailable, "try again")));
Expand All @@ -131,12 +147,17 @@ TEST(AsyncRetryLoopTest, TransientThenSuccess) {

TEST(AsyncRetryLoopTest, ReturnJustStatus) {
int counter = 0;
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(), "");
internal::OptionsSpan span(Options{}.set<TestOption>("ReturnJustStatus"));
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(), "ReturnJustStatus");
AutomaticallyCreatedBackgroundThreads background;
Status actual = AsyncRetryLoop(
TestRetryPolicy(), TestBackoffPolicy(),
Idempotency::kIdempotent, background.cq(),
[&](google::cloud::CompletionQueue&,
std::unique_ptr<grpc::ClientContext>, int) {
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(),
"ReturnJustStatus");
if (++counter <= 3) {
return make_ready_future(Status(
StatusCode::kResourceExhausted, "slow-down"));
Expand All @@ -162,13 +183,18 @@ TEST(AsyncRetryLoopTest, UsesBackoffPolicy) {
EXPECT_CALL(*mock, OnCompletion()).Times(3).WillRepeatedly(Return(ms(1)));

int counter = 0;
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(), "");
internal::OptionsSpan span(Options{}.set<TestOption>("UsesBackoffPolicy"));
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(), "UsesBackoffPolicy");
AutomaticallyCreatedBackgroundThreads background;
StatusOr<int> actual =
AsyncRetryLoop(
TestRetryPolicy(), std::move(mock), Idempotency::kIdempotent,
background.cq(),
[&](google::cloud::CompletionQueue&,
std::unique_ptr<grpc::ClientContext>, int request) {
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(),
"UsesBackoffPolicy");
if (++counter <= 3) {
return make_ready_future(
StatusOr<int>(Status(StatusCode::kUnavailable, "try again")));
Expand All @@ -182,13 +208,20 @@ TEST(AsyncRetryLoopTest, UsesBackoffPolicy) {
}

TEST(AsyncRetryLoopTest, TransientFailureNonIdempotent) {
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(), "");
internal::OptionsSpan span(
Options{}.set<TestOption>("TransientFailureNonIdempotent"));
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(),
"TransientFailureNonIdempotent");
AutomaticallyCreatedBackgroundThreads background;
StatusOr<int> actual =
AsyncRetryLoop(
TestRetryPolicy(), TestBackoffPolicy(), Idempotency::kNonIdempotent,
background.cq(),
[](google::cloud::CompletionQueue&,
std::unique_ptr<grpc::ClientContext>, int) {
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(),
"TransientFailureNonIdempotent");
return make_ready_future(StatusOr<int>(
Status(StatusCode::kUnavailable, "test-message-try-again")));
},
Expand All @@ -201,13 +234,20 @@ TEST(AsyncRetryLoopTest, TransientFailureNonIdempotent) {
}

TEST(AsyncRetryLoopTest, PermanentFailureIdempotent) {
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(), "");
internal::OptionsSpan span(
Options{}.set<TestOption>("PermanentFailureIdempotent"));
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(),
"PermanentFailureIdempotent");
AutomaticallyCreatedBackgroundThreads background;
StatusOr<int> actual =
AsyncRetryLoop(
TestRetryPolicy(), TestBackoffPolicy(), Idempotency::kIdempotent,
background.cq(),
[](google::cloud::CompletionQueue&,
std::unique_ptr<grpc::ClientContext>, int) {
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(),
"PermanentFailureIdempotent");
return make_ready_future(StatusOr<int>(
Status(StatusCode::kPermissionDenied, "test-message-uh-oh")));
},
Expand All @@ -220,13 +260,20 @@ TEST(AsyncRetryLoopTest, PermanentFailureIdempotent) {
}

TEST(AsyncRetryLoopTest, TooManyTransientFailuresIdempotent) {
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(), "");
internal::OptionsSpan span(
Options{}.set<TestOption>("TooManyTransientFailuresIdempotent"));
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(),
"TooManyTransientFailuresIdempotent");
AutomaticallyCreatedBackgroundThreads background;
StatusOr<int> actual =
AsyncRetryLoop(
TestRetryPolicy(), TestBackoffPolicy(), Idempotency::kIdempotent,
background.cq(),
[](google::cloud::CompletionQueue&,
std::unique_ptr<grpc::ClientContext>, int) {
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(),
"TooManyTransientFailuresIdempotent");
return make_ready_future(StatusOr<int>(
Status(StatusCode::kUnavailable, "test-message-try-again")));
},
Expand All @@ -240,6 +287,11 @@ TEST(AsyncRetryLoopTest, TooManyTransientFailuresIdempotent) {

TEST(AsyncRetryLoopTest, ExhaustedDuringBackoff) {
using ms = std::chrono::milliseconds;
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(), "");
internal::OptionsSpan span(
Options{}.set<TestOption>("ExhaustedDuringBackoff"));
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(),
"ExhaustedDuringBackoff");
AutomaticallyCreatedBackgroundThreads background;
StatusOr<int> actual =
AsyncRetryLoop(
Expand All @@ -248,6 +300,8 @@ TEST(AsyncRetryLoopTest, ExhaustedDuringBackoff) {
Idempotency::kIdempotent, background.cq(),
[](google::cloud::CompletionQueue&,
std::unique_ptr<grpc::ClientContext>, int) {
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(),
"ExhaustedDuringBackoff");
return make_ready_future(StatusOr<int>(
Status(StatusCode::kUnavailable, "test-message-try-again")));
},
Expand Down Expand Up @@ -290,6 +344,9 @@ TEST(AsyncRetryLoopTest, SetsTimeout) {
EXPECT_CALL(*mock, IsPermanentFailure).WillRepeatedly(Return(false));
EXPECT_CALL(*mock, Setup).Times(3);

EXPECT_EQ(internal::CurrentOptions().get<TestOption>(), "");
internal::OptionsSpan span(Options{}.set<TestOption>("SetsTimeout"));
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(), "SetsTimeout");
AutomaticallyCreatedBackgroundThreads background;

StatusOr<int> actual =
Expand All @@ -298,6 +355,8 @@ TEST(AsyncRetryLoopTest, SetsTimeout) {
TestBackoffPolicy(), Idempotency::kIdempotent, background.cq(),
[&](google::cloud::CompletionQueue&,
std::unique_ptr<grpc::ClientContext>, int /*request*/) {
EXPECT_EQ(internal::CurrentOptions().get<TestOption>(),
"SetsTimeout");
return make_ready_future(
StatusOr<int>(Status(StatusCode::kUnavailable, "try again")));
},
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/internal/async_rpc_details.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "google/cloud/async_operation.h"
#include "google/cloud/future.h"
#include "google/cloud/grpc_error_delegate.h"
#include "google/cloud/options.h"
#include "google/cloud/status_or.h"
#include "google/cloud/version.h"
#include <grpcpp/support/async_unary_call.h>
Expand Down Expand Up @@ -68,6 +69,7 @@ class AsyncUnaryRpcFuture : public AsyncGrpcOperation {
void Cancel() override {}

bool Notify(bool ok) override {
OptionsSpan span(options_);
if (!ok) {
// `Finish()` always returns `true` for unary RPCs, so the only time we
// get `!ok` is after `Shutdown()` was called; treat that as "cancelled".
Expand Down Expand Up @@ -95,6 +97,7 @@ class AsyncUnaryRpcFuture : public AsyncGrpcOperation {
Response response_;

promise<StatusOr<Response>> promise_;
Options options_ = CurrentOptions();
};

/// Verify that @p Functor meets the requirements for an AsyncUnaryRpc callback.
Expand Down
Loading

0 comments on commit 823c121

Please sign in to comment.