From d819840c2794546ae9256b7d70c387f617d67d6c Mon Sep 17 00:00:00 2001 From: Bradley White <14679271+devbww@users.noreply.github.com> Date: Mon, 24 Jan 2022 21:53:13 -0500 Subject: [PATCH] feat(spanner): merge connection options into client options Store the `Connection` options so that they may be made available to merge into the `Client` options. Use those stored options to implement four `Connection` data elements that were previously extracted during construction. (When all `Client` operations have been taught how to set the "current options" we will be able to use those instead.) Part of #7690. --- google/cloud/spanner/client.h | 3 +- google/cloud/spanner/connection.h | 3 + .../cloud/spanner/internal/connection_impl.cc | 67 ++++++++++++------- .../cloud/spanner/internal/connection_impl.h | 15 +++-- 4 files changed, 56 insertions(+), 32 deletions(-) diff --git a/google/cloud/spanner/client.h b/google/cloud/spanner/client.h index 200a770489d59..0e4099042c526 100644 --- a/google/cloud/spanner/client.h +++ b/google/cloud/spanner/client.h @@ -133,7 +133,8 @@ class Client { */ explicit Client(std::shared_ptr conn, Options opts = {}) : conn_(std::move(conn)), - opts_(spanner_internal::DefaultOptions(std::move(opts))) {} + opts_(spanner_internal::DefaultOptions( + internal::MergeOptions(std::move(opts), conn_->options()))) {} //@{ /// Backwards compatibility for `ClientOptions`. diff --git a/google/cloud/spanner/connection.h b/google/cloud/spanner/connection.h index dfe3eed7922ca..9420b902ab4ad 100644 --- a/google/cloud/spanner/connection.h +++ b/google/cloud/spanner/connection.h @@ -128,6 +128,9 @@ class Connection { }; //@} + /// Returns any options use by the concrete Connection. + virtual Options options() { return Options{}; } + /// Defines the interface for `Client::Read()` virtual RowStream Read(ReadParams) = 0; diff --git a/google/cloud/spanner/internal/connection_impl.cc b/google/cloud/spanner/internal/connection_impl.cc index b238ba0d43666..863e268c4da6e 100644 --- a/google/cloud/spanner/internal/connection_impl.cc +++ b/google/cloud/spanner/internal/connection_impl.cc @@ -103,18 +103,12 @@ Status MissingTransactionStatus(std::string const& operation) { ConnectionImpl::ConnectionImpl( spanner::Database db, std::unique_ptr background_threads, - std::vector> stubs, Options const& opts) + std::vector> stubs, Options opts) : db_(std::move(db)), - retry_policy_prototype_( - opts.get()->clone()), - backoff_policy_prototype_( - opts.get()->clone()), background_threads_(std::move(background_threads)), + opts_(spanner_internal::DefaultOptions(std::move(opts))), session_pool_(MakeSessionPool(db_, std::move(stubs), - background_threads_->cq(), opts)), - rpc_stream_tracing_enabled_(internal::Contains( - opts.get(), "rpc-streams")), - tracing_options_(opts.get()) {} + background_threads_->cq(), opts_)) {} spanner::RowStream ConnectionImpl::Read(ReadParams params) { return Visit(std::move(params.transaction), @@ -327,6 +321,29 @@ class StreamingPartitionedDmlResult { std::unique_ptr source_; }; +std::shared_ptr const& ConnectionImpl::RetryPolicy() + const { + // TODO(#7690): Base this on internal::CurrentOptions(). + return opts_.get(); +} + +std::shared_ptr const& ConnectionImpl::BackoffPolicy() + const { + // TODO(#7690): Base this on internal::CurrentOptions(). + return opts_.get(); +} + +bool ConnectionImpl::RpcStreamTracingEnabled() const { + // TODO(#7690): Base this on internal::CurrentOptions(). + return internal::Contains(opts_.get(), + "rpc-streams"); +} + +TracingOptions const& ConnectionImpl::RpcTracingOptions() const { + // TODO(#7690): Base this on internal::CurrentOptions(). + return opts_.get(); +} + /** * Helper function that ensures `session` holds a valid `Session`, or returns * an error if `session` is empty and no `Session` can be allocated. @@ -366,7 +383,7 @@ StatusOr ConnectionImpl::BeginTransaction( auto stub = session_pool_->GetStub(*session); auto response = RetryLoop( - retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(), + RetryPolicy()->clone(), BackoffPolicy()->clone(), Idempotency::kIdempotent, [&stub](grpc::ClientContext& context, spanner_proto::BeginTransactionRequest const& request) { @@ -417,8 +434,8 @@ spanner::RowStream ConnectionImpl::ReadImpl( // Capture a copy of `stub` to ensure the `shared_ptr<>` remains valid through // the lifetime of the lambda. auto stub = session_pool_->GetStub(*session); - auto const tracing_enabled = rpc_stream_tracing_enabled_; - auto const tracing_options = tracing_options_; + auto const tracing_enabled = RpcStreamTracingEnabled(); + auto const& tracing_options = RpcTracingOptions(); auto factory = [stub, &request, tracing_enabled, tracing_options](std::string const& resume_token) mutable { request.set_resume_token(resume_token); @@ -434,8 +451,8 @@ spanner::RowStream ConnectionImpl::ReadImpl( }; for (;;) { auto rpc = absl::make_unique( - factory, Idempotency::kIdempotent, retry_policy_prototype_->clone(), - backoff_policy_prototype_->clone()); + factory, Idempotency::kIdempotent, RetryPolicy()->clone(), + BackoffPolicy()->clone()); auto reader = PartialResultSetSource::Create(std::move(rpc)); if (s->has_begin()) { if (reader.ok()) { @@ -496,7 +513,7 @@ StatusOr> ConnectionImpl::PartitionReadImpl( auto stub = session_pool_->GetStub(*session); for (;;) { auto response = RetryLoop( - retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(), + RetryPolicy()->clone(), BackoffPolicy()->clone(), Idempotency::kIdempotent, [&stub](grpc::ClientContext& context, spanner_proto::PartitionReadRequest const& request) { @@ -627,10 +644,10 @@ ResultType ConnectionImpl::CommonQueryImpl( // through the lifetime of the lambda. Note that the local variables are a // reference to avoid increasing refcounts twice, but the capture is by value. auto stub = session_pool_->GetStub(*session); - auto const& retry_policy = retry_policy_prototype_; - auto const& backoff_policy = backoff_policy_prototype_; - auto const tracing_enabled = rpc_stream_tracing_enabled_; - auto const tracing_options = tracing_options_; + auto const& retry_policy = RetryPolicy(); + auto const& backoff_policy = BackoffPolicy(); + auto const tracing_enabled = RpcStreamTracingEnabled(); + auto const& tracing_options = RpcTracingOptions(); auto retry_resume_fn = [stub, retry_policy, backoff_policy, tracing_enabled, tracing_options](spanner_proto::ExecuteSqlRequest& request) mutable @@ -699,8 +716,8 @@ StatusOr ConnectionImpl::CommonDmlImpl( // through the lifetime of the lambda. Note that the local variables are a // reference to avoid increasing refcounts twice, but the capture is by value. auto stub = session_pool_->GetStub(*session); - auto const& retry_policy = retry_policy_prototype_; - auto const& backoff_policy = backoff_policy_prototype_; + auto const& retry_policy = RetryPolicy(); + auto const& backoff_policy = BackoffPolicy(); auto retry_resume_fn = [function_name, stub, retry_policy, backoff_policy, @@ -783,7 +800,7 @@ ConnectionImpl::PartitionQueryImpl( auto stub = session_pool_->GetStub(*session); for (;;) { auto response = RetryLoop( - retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(), + RetryPolicy()->clone(), BackoffPolicy()->clone(), Idempotency::kIdempotent, [&stub](grpc::ClientContext& context, spanner_proto::PartitionQueryRequest const& request) { @@ -857,7 +874,7 @@ StatusOr ConnectionImpl::ExecuteBatchDmlImpl( auto stub = session_pool_->GetStub(*session); for (;;) { auto response = RetryLoop( - retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(), + RetryPolicy()->clone(), BackoffPolicy()->clone(), Idempotency::kIdempotent, [&stub](grpc::ClientContext& context, spanner_proto::ExecuteBatchDmlRequest const& request) { @@ -979,7 +996,7 @@ StatusOr ConnectionImpl::CommitImpl( auto stub = session_pool_->GetStub(*session); auto response = RetryLoop( - retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(), + RetryPolicy()->clone(), BackoffPolicy()->clone(), Idempotency::kIdempotent, [&stub](grpc::ClientContext& context, spanner_proto::CommitRequest const& request) { @@ -1041,7 +1058,7 @@ Status ConnectionImpl::RollbackImpl( request.set_transaction_id(s->id()); auto stub = session_pool_->GetStub(*session); auto status = RetryLoop( - retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(), + RetryPolicy()->clone(), BackoffPolicy()->clone(), Idempotency::kIdempotent, [&stub](grpc::ClientContext& context, spanner_proto::RollbackRequest const& request) { diff --git a/google/cloud/spanner/internal/connection_impl.h b/google/cloud/spanner/internal/connection_impl.h index 252c10cba5e6f..6c562c6f4be57 100644 --- a/google/cloud/spanner/internal/connection_impl.h +++ b/google/cloud/spanner/internal/connection_impl.h @@ -48,8 +48,9 @@ class ConnectionImpl : public spanner::Connection { public: ConnectionImpl(spanner::Database db, std::unique_ptr background_threads, - std::vector> stubs, - Options const& opts); + std::vector> stubs, Options opts); + + Options options() override { return opts_; } spanner::RowStream Read(ReadParams) override; StatusOr> PartitionRead( @@ -69,6 +70,11 @@ class ConnectionImpl : public spanner::Connection { Status Rollback(RollbackParams) override; private: + std::shared_ptr const& RetryPolicy() const; + std::shared_ptr const& BackoffPolicy() const; + bool RpcStreamTracingEnabled() const; + TracingOptions const& RpcTracingOptions() const; + Status PrepareSession(SessionHolder& session, bool dissociate_from_pool = false); @@ -163,12 +169,9 @@ class ConnectionImpl : public spanner::Connection { google::spanner::v1::ExecuteSqlRequest::QueryMode query_mode); spanner::Database db_; - std::shared_ptr retry_policy_prototype_; - std::shared_ptr backoff_policy_prototype_; std::unique_ptr background_threads_; + Options opts_; std::shared_ptr session_pool_; - bool rpc_stream_tracing_enabled_ = false; - TracingOptions tracing_options_; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END