Skip to content

Commit

Permalink
feat(spanner): merge connection options into client options
Browse files Browse the repository at this point in the history
Store the `Connection` options so that they may be made available to
merge into the `Client` options.

Use those stored options to implement four `Connection` data elements
that were previously extracted during construction.  (When all `Client`
operations have been taught how to set the "current options" we will
be able to use those instead.)

Part of googleapis#7690.
  • Loading branch information
devbww committed Jan 25, 2022
1 parent 562cd84 commit d819840
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 32 deletions.
3 changes: 2 additions & 1 deletion google/cloud/spanner/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ class Client {
*/
explicit Client(std::shared_ptr<Connection> conn, Options opts = {})
: conn_(std::move(conn)),
opts_(spanner_internal::DefaultOptions(std::move(opts))) {}
opts_(spanner_internal::DefaultOptions(
internal::MergeOptions(std::move(opts), conn_->options()))) {}

//@{
/// Backwards compatibility for `ClientOptions`.
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/spanner/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ class Connection {
};
//@}

/// Returns any options use by the concrete Connection.
virtual Options options() { return Options{}; }

/// Defines the interface for `Client::Read()`
virtual RowStream Read(ReadParams) = 0;

Expand Down
67 changes: 42 additions & 25 deletions google/cloud/spanner/internal/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,12 @@ Status MissingTransactionStatus(std::string const& operation) {

ConnectionImpl::ConnectionImpl(
spanner::Database db, std::unique_ptr<BackgroundThreads> background_threads,
std::vector<std::shared_ptr<SpannerStub>> stubs, Options const& opts)
std::vector<std::shared_ptr<SpannerStub>> stubs, Options opts)
: db_(std::move(db)),
retry_policy_prototype_(
opts.get<spanner::SpannerRetryPolicyOption>()->clone()),
backoff_policy_prototype_(
opts.get<spanner::SpannerBackoffPolicyOption>()->clone()),
background_threads_(std::move(background_threads)),
opts_(spanner_internal::DefaultOptions(std::move(opts))),
session_pool_(MakeSessionPool(db_, std::move(stubs),
background_threads_->cq(), opts)),
rpc_stream_tracing_enabled_(internal::Contains(
opts.get<TracingComponentsOption>(), "rpc-streams")),
tracing_options_(opts.get<GrpcTracingOptionsOption>()) {}
background_threads_->cq(), opts_)) {}

spanner::RowStream ConnectionImpl::Read(ReadParams params) {
return Visit(std::move(params.transaction),
Expand Down Expand Up @@ -327,6 +321,29 @@ class StreamingPartitionedDmlResult {
std::unique_ptr<ResultSourceInterface> source_;
};

std::shared_ptr<spanner::RetryPolicy> const& ConnectionImpl::RetryPolicy()
const {
// TODO(#7690): Base this on internal::CurrentOptions().
return opts_.get<spanner::SpannerRetryPolicyOption>();
}

std::shared_ptr<spanner::BackoffPolicy> const& ConnectionImpl::BackoffPolicy()
const {
// TODO(#7690): Base this on internal::CurrentOptions().
return opts_.get<spanner::SpannerBackoffPolicyOption>();
}

bool ConnectionImpl::RpcStreamTracingEnabled() const {
// TODO(#7690): Base this on internal::CurrentOptions().
return internal::Contains(opts_.get<TracingComponentsOption>(),
"rpc-streams");
}

TracingOptions const& ConnectionImpl::RpcTracingOptions() const {
// TODO(#7690): Base this on internal::CurrentOptions().
return opts_.get<GrpcTracingOptionsOption>();
}

/**
* Helper function that ensures `session` holds a valid `Session`, or returns
* an error if `session` is empty and no `Session` can be allocated.
Expand Down Expand Up @@ -366,7 +383,7 @@ StatusOr<spanner_proto::Transaction> ConnectionImpl::BeginTransaction(

auto stub = session_pool_->GetStub(*session);
auto response = RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
RetryPolicy()->clone(), BackoffPolicy()->clone(),
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
spanner_proto::BeginTransactionRequest const& request) {
Expand Down Expand Up @@ -417,8 +434,8 @@ spanner::RowStream ConnectionImpl::ReadImpl(
// Capture a copy of `stub` to ensure the `shared_ptr<>` remains valid through
// the lifetime of the lambda.
auto stub = session_pool_->GetStub(*session);
auto const tracing_enabled = rpc_stream_tracing_enabled_;
auto const tracing_options = tracing_options_;
auto const tracing_enabled = RpcStreamTracingEnabled();
auto const& tracing_options = RpcTracingOptions();
auto factory = [stub, &request, tracing_enabled,
tracing_options](std::string const& resume_token) mutable {
request.set_resume_token(resume_token);
Expand All @@ -434,8 +451,8 @@ spanner::RowStream ConnectionImpl::ReadImpl(
};
for (;;) {
auto rpc = absl::make_unique<PartialResultSetResume>(
factory, Idempotency::kIdempotent, retry_policy_prototype_->clone(),
backoff_policy_prototype_->clone());
factory, Idempotency::kIdempotent, RetryPolicy()->clone(),
BackoffPolicy()->clone());
auto reader = PartialResultSetSource::Create(std::move(rpc));
if (s->has_begin()) {
if (reader.ok()) {
Expand Down Expand Up @@ -496,7 +513,7 @@ StatusOr<std::vector<spanner::ReadPartition>> ConnectionImpl::PartitionReadImpl(
auto stub = session_pool_->GetStub(*session);
for (;;) {
auto response = RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
RetryPolicy()->clone(), BackoffPolicy()->clone(),
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
spanner_proto::PartitionReadRequest const& request) {
Expand Down Expand Up @@ -627,10 +644,10 @@ ResultType ConnectionImpl::CommonQueryImpl(
// through the lifetime of the lambda. Note that the local variables are a
// reference to avoid increasing refcounts twice, but the capture is by value.
auto stub = session_pool_->GetStub(*session);
auto const& retry_policy = retry_policy_prototype_;
auto const& backoff_policy = backoff_policy_prototype_;
auto const tracing_enabled = rpc_stream_tracing_enabled_;
auto const tracing_options = tracing_options_;
auto const& retry_policy = RetryPolicy();
auto const& backoff_policy = BackoffPolicy();
auto const tracing_enabled = RpcStreamTracingEnabled();
auto const& tracing_options = RpcTracingOptions();
auto retry_resume_fn =
[stub, retry_policy, backoff_policy, tracing_enabled,
tracing_options](spanner_proto::ExecuteSqlRequest& request) mutable
Expand Down Expand Up @@ -699,8 +716,8 @@ StatusOr<ResultType> ConnectionImpl::CommonDmlImpl(
// through the lifetime of the lambda. Note that the local variables are a
// reference to avoid increasing refcounts twice, but the capture is by value.
auto stub = session_pool_->GetStub(*session);
auto const& retry_policy = retry_policy_prototype_;
auto const& backoff_policy = backoff_policy_prototype_;
auto const& retry_policy = RetryPolicy();
auto const& backoff_policy = BackoffPolicy();

auto retry_resume_fn =
[function_name, stub, retry_policy, backoff_policy,
Expand Down Expand Up @@ -783,7 +800,7 @@ ConnectionImpl::PartitionQueryImpl(
auto stub = session_pool_->GetStub(*session);
for (;;) {
auto response = RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
RetryPolicy()->clone(), BackoffPolicy()->clone(),
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
spanner_proto::PartitionQueryRequest const& request) {
Expand Down Expand Up @@ -857,7 +874,7 @@ StatusOr<spanner::BatchDmlResult> ConnectionImpl::ExecuteBatchDmlImpl(
auto stub = session_pool_->GetStub(*session);
for (;;) {
auto response = RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
RetryPolicy()->clone(), BackoffPolicy()->clone(),
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
spanner_proto::ExecuteBatchDmlRequest const& request) {
Expand Down Expand Up @@ -979,7 +996,7 @@ StatusOr<spanner::CommitResult> ConnectionImpl::CommitImpl(

auto stub = session_pool_->GetStub(*session);
auto response = RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
RetryPolicy()->clone(), BackoffPolicy()->clone(),
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
spanner_proto::CommitRequest const& request) {
Expand Down Expand Up @@ -1041,7 +1058,7 @@ Status ConnectionImpl::RollbackImpl(
request.set_transaction_id(s->id());
auto stub = session_pool_->GetStub(*session);
auto status = RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
RetryPolicy()->clone(), BackoffPolicy()->clone(),
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
spanner_proto::RollbackRequest const& request) {
Expand Down
15 changes: 9 additions & 6 deletions google/cloud/spanner/internal/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ class ConnectionImpl : public spanner::Connection {
public:
ConnectionImpl(spanner::Database db,
std::unique_ptr<BackgroundThreads> background_threads,
std::vector<std::shared_ptr<SpannerStub>> stubs,
Options const& opts);
std::vector<std::shared_ptr<SpannerStub>> stubs, Options opts);

Options options() override { return opts_; }

spanner::RowStream Read(ReadParams) override;
StatusOr<std::vector<spanner::ReadPartition>> PartitionRead(
Expand All @@ -69,6 +70,11 @@ class ConnectionImpl : public spanner::Connection {
Status Rollback(RollbackParams) override;

private:
std::shared_ptr<spanner::RetryPolicy> const& RetryPolicy() const;
std::shared_ptr<spanner::BackoffPolicy> const& BackoffPolicy() const;
bool RpcStreamTracingEnabled() const;
TracingOptions const& RpcTracingOptions() const;

Status PrepareSession(SessionHolder& session,
bool dissociate_from_pool = false);

Expand Down Expand Up @@ -163,12 +169,9 @@ class ConnectionImpl : public spanner::Connection {
google::spanner::v1::ExecuteSqlRequest::QueryMode query_mode);

spanner::Database db_;
std::shared_ptr<spanner::RetryPolicy const> retry_policy_prototype_;
std::shared_ptr<spanner::BackoffPolicy const> backoff_policy_prototype_;
std::unique_ptr<BackgroundThreads> background_threads_;
Options opts_;
std::shared_ptr<SessionPool> session_pool_;
bool rpc_stream_tracing_enabled_ = false;
TracingOptions tracing_options_;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down

0 comments on commit d819840

Please sign in to comment.