Skip to content

Commit

Permalink
feat(spanner): merge connection options into client options (#8090)
Browse files Browse the repository at this point in the history
Store the `Connection` options so that they may be made available to
merge into the `Client` options.

Use those stored options to implement four `Connection` data elements
that were previously extracted during construction.  (When all `Client`
operations have been taught how to set the "current options" we will
be able to use those instead.)

Part of #7690.
  • Loading branch information
devbww authored Jan 26, 2022
1 parent 8fd0421 commit 2e33f18
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 39 deletions.
2 changes: 1 addition & 1 deletion google/cloud/spanner/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ std::shared_ptr<spanner::Connection> MakeConnection(spanner::Database const& db,
return spanner_internal::CreateDefaultSpannerStub(db, auth, opts, id++);
});
return std::make_shared<spanner_internal::ConnectionImpl>(
std::move(db), std::move(background), std::move(stubs), opts);
std::move(db), std::move(background), std::move(stubs), std::move(opts));
}

std::shared_ptr<Connection> MakeConnection(
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/spanner/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class Client {
*/
explicit Client(std::shared_ptr<Connection> conn, Options opts = {})
: conn_(std::move(conn)),
opts_(spanner_internal::DefaultOptions(std::move(opts))) {}
opts_(internal::MergeOptions(std::move(opts), conn_->options())) {}

//@{
/// Backwards compatibility for `ClientOptions`.
Expand Down
80 changes: 79 additions & 1 deletion google/cloud/spanner/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "google/cloud/spanner/client.h"
#include "google/cloud/spanner/connection.h"
#include "google/cloud/spanner/internal/defaults.h"
#include "google/cloud/spanner/mocks/mock_spanner_connection.h"
#include "google/cloud/spanner/mutations.h"
#include "google/cloud/spanner/results.h"
Expand All @@ -28,6 +29,7 @@
#include <array>
#include <chrono>
#include <cstdint>
#include <string>
#include <utility>

namespace google {
Expand All @@ -46,6 +48,7 @@ using ::google::protobuf::TextFormat;
using ::testing::ByMove;
using ::testing::DoAll;
using ::testing::ElementsAre;
using ::testing::Eq;
using ::testing::HasSubstr;
using ::testing::Return;
using ::testing::SaveArg;
Expand Down Expand Up @@ -389,7 +392,17 @@ TEST(ClientTest, MakeConnectionOptionalArguments) {
EXPECT_NE(conn, nullptr);

conn = MakeConnection(db, Options{});
EXPECT_NE(conn, nullptr);
ASSERT_NE(conn, nullptr);
ASSERT_TRUE(conn->options().has<EndpointOption>());
EXPECT_EQ(conn->options().get<EndpointOption>(),
spanner_internal::DefaultOptions().get<EndpointOption>());

conn = MakeConnection(db, Options{}.set<EndpointOption>("endpoint"));
ASSERT_NE(conn, nullptr);
ASSERT_TRUE(conn->options().has<EndpointOption>());
EXPECT_NE(conn->options().get<EndpointOption>(),
spanner_internal::DefaultOptions().get<EndpointOption>());
EXPECT_EQ(conn->options().get<EndpointOption>(), "endpoint");
}

TEST(ClientTest, CommitMutatorSuccess) {
Expand Down Expand Up @@ -1192,6 +1205,71 @@ TEST(ClientTest, QueryOptionsOverlayPrecedence) {
}
}

struct StringOption {
using Type = std::string;
};

TEST(ClientTest, UsesConnectionOptions) {
auto conn = std::make_shared<MockConnection>();
auto txn = MakeReadWriteTransaction();

EXPECT_CALL(*conn, options).WillOnce([] {
return Options{}.set<StringOption>("connection");
});
EXPECT_CALL(*conn, Rollback)
.WillOnce([txn](Connection::RollbackParams const& params) {
auto const& options = internal::CurrentOptions();
EXPECT_THAT(options.get<StringOption>(), Eq("connection"));
EXPECT_THAT(params.transaction, Eq(txn));
return Status();
});

Client client(conn, Options{});
auto rollback = client.Rollback(txn, Options{});
EXPECT_STATUS_OK(rollback);
}

TEST(ClientTest, UsesClientOptions) {
auto conn = std::make_shared<MockConnection>();
auto txn = MakeReadWriteTransaction();

EXPECT_CALL(*conn, options).WillOnce([] {
return Options{}.set<StringOption>("connection");
});
EXPECT_CALL(*conn, Rollback)
.WillOnce([txn](Connection::RollbackParams const& params) {
auto const& options = internal::CurrentOptions();
EXPECT_THAT(options.get<StringOption>(), Eq("client"));
EXPECT_THAT(params.transaction, Eq(txn));
return Status();
});

Client client(conn, Options{}.set<StringOption>("client"));
auto rollback = client.Rollback(txn, Options{});
EXPECT_STATUS_OK(rollback);
}

TEST(ClientTest, UsesOperationOptions) {
auto conn = std::make_shared<MockConnection>();
auto txn = MakeReadWriteTransaction();

EXPECT_CALL(*conn, options).WillOnce([] {
return Options{}.set<StringOption>("connection");
});
EXPECT_CALL(*conn, Rollback)
.WillOnce([txn](Connection::RollbackParams const& params) {
auto const& options = internal::CurrentOptions();
EXPECT_THAT(options.get<StringOption>(), Eq("operation"));
EXPECT_THAT(params.transaction, Eq(txn));
return Status();
});

Client client(conn, Options{}.set<StringOption>("client"));
auto rollback =
client.Rollback(txn, Options{}.set<StringOption>("operation"));
EXPECT_STATUS_OK(rollback);
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace spanner
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/spanner/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ class Connection {
};
//@}

/// Returns the options used by the Connection.
virtual Options options() { return Options{}; }

/// Defines the interface for `Client::Read()`
virtual RowStream Read(ReadParams) = 0;

Expand Down
79 changes: 49 additions & 30 deletions google/cloud/spanner/internal/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,12 @@ Status MissingTransactionStatus(std::string const& operation) {

ConnectionImpl::ConnectionImpl(
spanner::Database db, std::unique_ptr<BackgroundThreads> background_threads,
std::vector<std::shared_ptr<SpannerStub>> stubs, Options const& opts)
std::vector<std::shared_ptr<SpannerStub>> stubs, Options opts)
: db_(std::move(db)),
retry_policy_prototype_(
opts.get<spanner::SpannerRetryPolicyOption>()->clone()),
backoff_policy_prototype_(
opts.get<spanner::SpannerBackoffPolicyOption>()->clone()),
background_threads_(std::move(background_threads)),
opts_(internal::MergeOptions(std::move(opts), Connection::options())),
session_pool_(MakeSessionPool(db_, std::move(stubs),
background_threads_->cq(), opts)),
rpc_stream_tracing_enabled_(internal::Contains(
opts.get<TracingComponentsOption>(), "rpc-streams")),
tracing_options_(opts.get<GrpcTracingOptionsOption>()) {}
background_threads_->cq(), opts_)) {}

spanner::RowStream ConnectionImpl::Read(ReadParams params) {
return Visit(std::move(params.transaction),
Expand Down Expand Up @@ -327,6 +321,31 @@ class StreamingPartitionedDmlResult {
std::unique_ptr<ResultSourceInterface> source_;
};

std::shared_ptr<spanner::RetryPolicy> const&
ConnectionImpl::RetryPolicyPrototype() const {
// TODO(#7690): Base this on internal::CurrentOptions().
return opts_.get<spanner::SpannerRetryPolicyOption>();
}

std::shared_ptr<spanner::BackoffPolicy> const&
ConnectionImpl::BackoffPolicyPrototype() const {
// TODO(#7690): Base this on internal::CurrentOptions().
return opts_.get<spanner::SpannerBackoffPolicyOption>();
}

bool ConnectionImpl::RpcStreamTracingEnabled() const {
// TODO(#7690): Base this on internal::CurrentOptions() if we want to
// allow per-operation options to influence "rpc-streams" tracing.
return internal::Contains(opts_.get<TracingComponentsOption>(),
"rpc-streams");
}

TracingOptions const& ConnectionImpl::RpcTracingOptions() const {
// TODO(#7690): Base this on internal::CurrentOptions() if we want to
// allow per-operation options to influence "rpc-streams" tracing.
return opts_.get<GrpcTracingOptionsOption>();
}

/**
* Helper function that ensures `session` holds a valid `Session`, or returns
* an error if `session` is empty and no `Session` can be allocated.
Expand Down Expand Up @@ -366,7 +385,7 @@ StatusOr<spanner_proto::Transaction> ConnectionImpl::BeginTransaction(

auto stub = session_pool_->GetStub(*session);
auto response = RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
RetryPolicyPrototype()->clone(), BackoffPolicyPrototype()->clone(),
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
spanner_proto::BeginTransactionRequest const& request) {
Expand Down Expand Up @@ -417,8 +436,8 @@ spanner::RowStream ConnectionImpl::ReadImpl(
// Capture a copy of `stub` to ensure the `shared_ptr<>` remains valid through
// the lifetime of the lambda.
auto stub = session_pool_->GetStub(*session);
auto const tracing_enabled = rpc_stream_tracing_enabled_;
auto const tracing_options = tracing_options_;
auto const tracing_enabled = RpcStreamTracingEnabled();
auto const& tracing_options = RpcTracingOptions();
auto factory = [stub, &request, tracing_enabled,
tracing_options](std::string const& resume_token) mutable {
request.set_resume_token(resume_token);
Expand All @@ -434,8 +453,8 @@ spanner::RowStream ConnectionImpl::ReadImpl(
};
for (;;) {
auto rpc = absl::make_unique<PartialResultSetResume>(
factory, Idempotency::kIdempotent, retry_policy_prototype_->clone(),
backoff_policy_prototype_->clone());
factory, Idempotency::kIdempotent, RetryPolicyPrototype()->clone(),
BackoffPolicyPrototype()->clone());
auto reader = PartialResultSetSource::Create(std::move(rpc));
if (s->has_begin()) {
if (reader.ok()) {
Expand Down Expand Up @@ -496,7 +515,7 @@ StatusOr<std::vector<spanner::ReadPartition>> ConnectionImpl::PartitionReadImpl(
auto stub = session_pool_->GetStub(*session);
for (;;) {
auto response = RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
RetryPolicyPrototype()->clone(), BackoffPolicyPrototype()->clone(),
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
spanner_proto::PartitionReadRequest const& request) {
Expand Down Expand Up @@ -627,12 +646,12 @@ ResultType ConnectionImpl::CommonQueryImpl(
// through the lifetime of the lambda. Note that the local variables are a
// reference to avoid increasing refcounts twice, but the capture is by value.
auto stub = session_pool_->GetStub(*session);
auto const& retry_policy = retry_policy_prototype_;
auto const& backoff_policy = backoff_policy_prototype_;
auto const tracing_enabled = rpc_stream_tracing_enabled_;
auto const tracing_options = tracing_options_;
auto const& retry_policy_prototype = RetryPolicyPrototype();
auto const& backoff_policy_prototype = BackoffPolicyPrototype();
auto const tracing_enabled = RpcStreamTracingEnabled();
auto const& tracing_options = RpcTracingOptions();
auto retry_resume_fn =
[stub, retry_policy, backoff_policy, tracing_enabled,
[stub, retry_policy_prototype, backoff_policy_prototype, tracing_enabled,
tracing_options](spanner_proto::ExecuteSqlRequest& request) mutable
-> StatusOr<std::unique_ptr<ResultSourceInterface>> {
auto factory = [stub, request, tracing_enabled,
Expand All @@ -649,8 +668,8 @@ ResultType ConnectionImpl::CommonQueryImpl(
return reader;
};
auto rpc = absl::make_unique<PartialResultSetResume>(
std::move(factory), Idempotency::kIdempotent, retry_policy->clone(),
backoff_policy->clone());
std::move(factory), Idempotency::kIdempotent,
retry_policy_prototype->clone(), backoff_policy_prototype->clone());

return PartialResultSetSource::Create(std::move(rpc));
};
Expand Down Expand Up @@ -699,15 +718,15 @@ StatusOr<ResultType> ConnectionImpl::CommonDmlImpl(
// through the lifetime of the lambda. Note that the local variables are a
// reference to avoid increasing refcounts twice, but the capture is by value.
auto stub = session_pool_->GetStub(*session);
auto const& retry_policy = retry_policy_prototype_;
auto const& backoff_policy = backoff_policy_prototype_;
auto const& retry_policy_prototype = RetryPolicyPrototype();
auto const& backoff_policy_prototype = BackoffPolicyPrototype();

auto retry_resume_fn =
[function_name, stub, retry_policy, backoff_policy,
[function_name, stub, retry_policy_prototype, backoff_policy_prototype,
session](spanner_proto::ExecuteSqlRequest& request) mutable
-> StatusOr<std::unique_ptr<ResultSourceInterface>> {
StatusOr<spanner_proto::ResultSet> response = RetryLoop(
retry_policy->clone(), backoff_policy->clone(),
retry_policy_prototype->clone(), backoff_policy_prototype->clone(),
Idempotency::kIdempotent,
[stub](grpc::ClientContext& context,
spanner_proto::ExecuteSqlRequest const& request) {
Expand Down Expand Up @@ -783,7 +802,7 @@ ConnectionImpl::PartitionQueryImpl(
auto stub = session_pool_->GetStub(*session);
for (;;) {
auto response = RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
RetryPolicyPrototype()->clone(), BackoffPolicyPrototype()->clone(),
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
spanner_proto::PartitionQueryRequest const& request) {
Expand Down Expand Up @@ -857,7 +876,7 @@ StatusOr<spanner::BatchDmlResult> ConnectionImpl::ExecuteBatchDmlImpl(
auto stub = session_pool_->GetStub(*session);
for (;;) {
auto response = RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
RetryPolicyPrototype()->clone(), BackoffPolicyPrototype()->clone(),
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
spanner_proto::ExecuteBatchDmlRequest const& request) {
Expand Down Expand Up @@ -979,7 +998,7 @@ StatusOr<spanner::CommitResult> ConnectionImpl::CommitImpl(

auto stub = session_pool_->GetStub(*session);
auto response = RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
RetryPolicyPrototype()->clone(), BackoffPolicyPrototype()->clone(),
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
spanner_proto::CommitRequest const& request) {
Expand Down Expand Up @@ -1041,7 +1060,7 @@ Status ConnectionImpl::RollbackImpl(
request.set_transaction_id(s->id());
auto stub = session_pool_->GetStub(*session);
auto status = RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
RetryPolicyPrototype()->clone(), BackoffPolicyPrototype()->clone(),
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
spanner_proto::RollbackRequest const& request) {
Expand Down
15 changes: 9 additions & 6 deletions google/cloud/spanner/internal/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ class ConnectionImpl : public spanner::Connection {
public:
ConnectionImpl(spanner::Database db,
std::unique_ptr<BackgroundThreads> background_threads,
std::vector<std::shared_ptr<SpannerStub>> stubs,
Options const& opts);
std::vector<std::shared_ptr<SpannerStub>> stubs, Options opts);

Options options() override { return opts_; }

spanner::RowStream Read(ReadParams) override;
StatusOr<std::vector<spanner::ReadPartition>> PartitionRead(
Expand All @@ -69,6 +70,11 @@ class ConnectionImpl : public spanner::Connection {
Status Rollback(RollbackParams) override;

private:
std::shared_ptr<spanner::RetryPolicy> const& RetryPolicyPrototype() const;
std::shared_ptr<spanner::BackoffPolicy> const& BackoffPolicyPrototype() const;
bool RpcStreamTracingEnabled() const;
TracingOptions const& RpcTracingOptions() const;

Status PrepareSession(SessionHolder& session,
bool dissociate_from_pool = false);

Expand Down Expand Up @@ -163,12 +169,9 @@ class ConnectionImpl : public spanner::Connection {
google::spanner::v1::ExecuteSqlRequest::QueryMode query_mode);

spanner::Database db_;
std::shared_ptr<spanner::RetryPolicy const> retry_policy_prototype_;
std::shared_ptr<spanner::BackoffPolicy const> backoff_policy_prototype_;
std::unique_ptr<BackgroundThreads> background_threads_;
Options opts_;
std::shared_ptr<SessionPool> session_pool_;
bool rpc_stream_tracing_enabled_ = false;
TracingOptions tracing_options_;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
1 change: 1 addition & 0 deletions google/cloud/spanner/mocks/mock_spanner_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
*/
class MockConnection : public spanner::Connection {
public:
MOCK_METHOD(Options, options, (), (override));
MOCK_METHOD(spanner::RowStream, Read, (ReadParams), (override));
MOCK_METHOD(StatusOr<std::vector<spanner::ReadPartition>>, PartitionRead,
(PartitionReadParams), (override));
Expand Down

0 comments on commit 2e33f18

Please sign in to comment.