Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): per-operation Options (pt. 2) #9627

Merged
merged 1 commit into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions google/cloud/bigtable/legacy_table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,20 @@ TEST_F(TableTest, AsyncSampleRowsWithOptions) {
EXPECT_THAT(rows.get(), StatusIs(StatusCode::kInvalidArgument));
}

TEST_F(TableTest, ReadModifyWriteRowWithOptions) {
Table table(client_, kTableId);
auto r = ReadModifyWriteRule::AppendValue("cf", "cq", "v");
auto row = table.ReadModifyWriteRow("row", r, r, NonEmptyOptions(), r);
EXPECT_THAT(row, StatusIs(StatusCode::kInvalidArgument));
}

TEST_F(TableTest, AsyncReadModifyWriteRowWithOptions) {
Table table(client_, kTableId);
auto r = ReadModifyWriteRule::AppendValue("cf", "cq", "v");
auto row = table.AsyncReadModifyWriteRow("row", r, r, NonEmptyOptions(), r);
EXPECT_THAT(row.get(), StatusIs(StatusCode::kInvalidArgument));
}

TEST_F(TableTest, AsyncReadRowsWithOptions) {
MockFunction<future<bool>(bigtable::Row const&)> on_row;
EXPECT_CALL(on_row, Call).Times(0);
Expand Down
19 changes: 15 additions & 4 deletions google/cloud/bigtable/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -450,14 +450,19 @@ future<StatusOr<std::vector<bigtable::RowKeySample>>> Table::AsyncSampleRows(
}

StatusOr<Row> Table::ReadModifyWriteRowImpl(
btproto::ReadModifyWriteRowRequest request) {
btproto::ReadModifyWriteRowRequest request, Options opts) {
SetCommonTableOperationRequest<
::google::bigtable::v2::ReadModifyWriteRowRequest>(
request, app_profile_id(), table_name_);
if (connection_) {
google::cloud::internal::OptionsSpan span(options_);
OptionsSpan span(MergeOptions(std::move(opts), options_));
return connection_->ReadModifyWriteRow(std::move(request));
}
if (!google::cloud::internal::IsEmpty(opts)) {
return Status(StatusCode::kInvalidArgument,
"Per-operation options only apply to `Table`s constructed "
"with a `DataConnection`.");
}

grpc::Status status;
auto response = ClientUtils::MakeNonIdempotentCall(
Expand All @@ -472,14 +477,20 @@ StatusOr<Row> Table::ReadModifyWriteRowImpl(
}

future<StatusOr<Row>> Table::AsyncReadModifyWriteRowImpl(
::google::bigtable::v2::ReadModifyWriteRowRequest request) {
::google::bigtable::v2::ReadModifyWriteRowRequest request, Options opts) {
SetCommonTableOperationRequest<
::google::bigtable::v2::ReadModifyWriteRowRequest>(
request, app_profile_id(), table_name_);
if (connection_) {
google::cloud::internal::OptionsSpan span(options_);
OptionsSpan span(MergeOptions(std::move(opts), options_));
return connection_->AsyncReadModifyWriteRow(std::move(request));
}
if (!google::cloud::internal::IsEmpty(opts)) {
return make_ready_future<StatusOr<Row>>(
Status(StatusCode::kInvalidArgument,
"Per-operation options only apply to `Table`s constructed "
"with a `DataConnection`."));
}

auto cq = background_threads_->cq();
auto& client = client_;
Expand Down
60 changes: 42 additions & 18 deletions google/cloud/bigtable/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -743,15 +743,20 @@ class Table {
* Atomically read and modify the row in the server, returning the
* resulting row
*
* @tparam Args this is zero or more ReadModifyWriteRules to apply on a row
* @tparam Args this is zero or more ReadModifyWriteRules to apply on a row.
* Options to override the class-level options, such as retry, backoff,
* and idempotency policies are also be passed via this parameter pack.
* @param row_key the row to read
* @param rule to modify the row. Two types of rules are applied here
* AppendValue which will read the existing value and append the
* text provided to the value.
* IncrementAmount which will read the existing uint64 big-endian-int
* and add the value provided.
* Both rules accept the family and column identifier to modify.
* @param rules is the zero or more ReadModifyWriteRules to apply on a row.
* @param rules_and_options is the zero or more ReadModifyWriteRules to apply
* on a row. Options to override the class-level options, such as retry,
* backoff, and idempotency policies are also be passed via this parameter
* pack.
* @returns the new contents of all modified cells.
*
* @par Idempotency
Expand All @@ -768,21 +773,25 @@ class Table {
template <typename... Args>
StatusOr<Row> ReadModifyWriteRow(std::string row_key,
bigtable::ReadModifyWriteRule rule,
Args&&... rules) {
Args&&... rules_and_options) {
::google::bigtable::v2::ReadModifyWriteRowRequest request;
request.set_row_key(std::move(row_key));

// Generate a better compile time error message than the default one
// if the types do not match
static_assert(
absl::conjunction<
std::is_convertible<Args, bigtable::ReadModifyWriteRule>...>::value,
absl::conjunction<absl::disjunction<
std::is_convertible<Args, bigtable::ReadModifyWriteRule>,
std::is_same<typename std::decay<Args>::type, Options>>...>::value,
"The arguments passed to ReadModifyWriteRow(row_key,...) must be "
"convertible to bigtable::ReadModifyWriteRule");
"convertible to bigtable::ReadModifyWriteRule, or of type "
"google::cloud::Options");

*request.add_rules() = std::move(rule).as_proto();
AddRules(request, std::forward<Args>(rules)...);
return ReadModifyWriteRowImpl(std::move(request));
AddRules(request, std::forward<Args>(rules_and_options)...);
auto opts = google::cloud::internal::GroupOptions(
std::forward<Args>(rules_and_options)...);
return ReadModifyWriteRowImpl(std::move(request), std::move(opts));
}

/**
Expand All @@ -792,15 +801,20 @@ class Table {
* Bigtable. These APIs might be changed in backward-incompatible ways. It
* is not subject to any SLA or deprecation policy.
*
* @tparam Args this is zero or more ReadModifyWriteRules to apply on a row.
* Options to override the class-level options, such as retry, backoff,
* and idempotency policies are also be passed via this parameter pack.
* @param row_key the row key on which modification will be performed
*
* @param rule to modify the row. Two types of rules are applied here
* AppendValue which will read the existing value and append the
* text provided to the value.
* IncrementAmount which will read the existing uint64 big-endian-int
* and add the value provided.
* Both rules accept the family and column identifier to modify.
* @param rules is the zero or more ReadModifyWriteRules to apply on a row.
* @param rules_and_options is the zero or more ReadModifyWriteRules to apply
* on a row. Options to override the class-level options, such as retry,
* backoff, and idempotency policies are also be passed via this parameter
* pack.
* @returns a future, that becomes satisfied when the operation completes,
* at that point the future has the contents of all modified cells.
*
Expand All @@ -817,21 +831,25 @@ class Table {
template <typename... Args>
future<StatusOr<Row>> AsyncReadModifyWriteRow(
std::string row_key, bigtable::ReadModifyWriteRule rule,
Args&&... rules) {
Args&&... rules_and_options) {
::google::bigtable::v2::ReadModifyWriteRowRequest request;
request.set_row_key(std::move(row_key));

// Generate a better compile time error message than the default one
// if the types do not match
static_assert(
absl::conjunction<
std::is_convertible<Args, bigtable::ReadModifyWriteRule>...>::value,
absl::conjunction<absl::disjunction<
std::is_convertible<Args, bigtable::ReadModifyWriteRule>,
std::is_same<typename std::decay<Args>::type, Options>>...>::value,
"The arguments passed to AsyncReadModifyWriteRow(row_key,...) must be "
"convertible to bigtable::ReadModifyWriteRule");
"convertible to bigtable::ReadModifyWriteRule, or of type "
"google::cloud::Options");

*request.add_rules() = std::move(rule).as_proto();
AddRules(request, std::forward<Args>(rules)...);
return AsyncReadModifyWriteRowImpl(std::move(request));
AddRules(request, std::forward<Args>(rules_and_options)...);
auto opts = google::cloud::internal::GroupOptions(
std::forward<Args>(rules_and_options)...);
return AsyncReadModifyWriteRowImpl(std::move(request), std::move(opts));
}

/**
Expand Down Expand Up @@ -1003,10 +1021,10 @@ class Table {
* Send request ReadModifyWriteRowRequest to modify the row and get it back
*/
StatusOr<Row> ReadModifyWriteRowImpl(
::google::bigtable::v2::ReadModifyWriteRowRequest request);
::google::bigtable::v2::ReadModifyWriteRowRequest request, Options opts);

future<StatusOr<Row>> AsyncReadModifyWriteRowImpl(
::google::bigtable::v2::ReadModifyWriteRowRequest request);
::google::bigtable::v2::ReadModifyWriteRowRequest request, Options opts);

void AddRules(google::bigtable::v2::ReadModifyWriteRowRequest&) {
// no-op for empty list
Expand All @@ -1019,6 +1037,12 @@ class Table {
AddRules(request, std::forward<Args>(args)...);
}

template <typename... Args>
void AddRules(google::bigtable::v2::ReadModifyWriteRowRequest& request,
Options const&, Args&&... args) {
AddRules(request, std::forward<Args>(args)...);
}

std::unique_ptr<RPCRetryPolicy> clone_rpc_retry_policy() {
return rpc_retry_policy_prototype_->clone();
}
Expand Down
56 changes: 51 additions & 5 deletions google/cloud/bigtable/table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ TEST(TableTest, ReadModifyWriteRow) {
auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, ReadModifyWriteRow)
.WillOnce([](v2::ReadModifyWriteRowRequest const& request) {
// TODO (#7688) - CheckCurrentOptions();
CheckCurrentOptions();
EXPECT_EQ(kTableName, request.table_name());
EXPECT_THAT(request.rules(),
ElementsAre(MatchRule(TestAppendRule()),
Expand All @@ -405,16 +405,38 @@ TEST(TableTest, ReadModifyWriteRow) {
});

auto table = TestTable(std::move(mock));
auto row =
table.ReadModifyWriteRow("row", TestAppendRule(), TestIncrementRule());
auto row = table.ReadModifyWriteRow("row", TestAppendRule(),
TestIncrementRule(), CallOptions());
EXPECT_THAT(row, StatusIs(StatusCode::kPermissionDenied));
}

TEST(TableTest, ReadModifyWriteRowOptionsMerge) {
auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, ReadModifyWriteRow)
.WillOnce([](v2::ReadModifyWriteRowRequest const& request) {
auto const& options = google::cloud::internal::CurrentOptions();
EXPECT_EQ("latter", options.get<TestOption1>());
EXPECT_EQ("former", options.get<TestOption2>());
EXPECT_EQ("latter", options.get<TestOption3>());
EXPECT_THAT(request.rules(),
ElementsAre(MatchRule(TestAppendRule()),
MatchRule(TestIncrementRule())));
return PermanentError();
});

auto former = Options{}.set<TestOption1>("former").set<TestOption2>("former");
auto latter = Options{}.set<TestOption1>("latter").set<TestOption3>("latter");

auto table = TestTable(std::move(mock));
(void)table.ReadModifyWriteRow("row", TestAppendRule(), former,
TestIncrementRule(), latter);
}

TEST(TableTest, AsyncReadModifyWriteRow) {
auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncReadModifyWriteRow)
.WillOnce([](v2::ReadModifyWriteRowRequest const& request) {
// TODO (#7688) - CheckCurrentOptions();
CheckCurrentOptions();
EXPECT_EQ(kTableName, request.table_name());
EXPECT_THAT(request.rules(),
ElementsAre(MatchRule(TestAppendRule()),
Expand All @@ -424,10 +446,34 @@ TEST(TableTest, AsyncReadModifyWriteRow) {

auto table = TestTable(std::move(mock));
auto row = table.AsyncReadModifyWriteRow("row", TestAppendRule(),
TestIncrementRule());
TestIncrementRule(), CallOptions());
EXPECT_THAT(row.get(), StatusIs(StatusCode::kPermissionDenied));
}

TEST(TableTest, AsyncReadModifyWriteRowOptionsMerge) {
auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncReadModifyWriteRow)
.WillOnce([](v2::ReadModifyWriteRowRequest const& request) {
auto const& options = google::cloud::internal::CurrentOptions();
EXPECT_EQ("latter", options.get<TestOption1>());
EXPECT_EQ("former", options.get<TestOption2>());
EXPECT_EQ("latter", options.get<TestOption3>());
EXPECT_THAT(request.rules(),
ElementsAre(MatchRule(TestAppendRule()),
MatchRule(TestIncrementRule())));
return make_ready_future<StatusOr<Row>>(PermanentError());
});

auto former = Options{}.set<TestOption1>("former").set<TestOption2>("former");
auto latter = Options{}.set<TestOption1>("latter").set<TestOption3>("latter");

auto table = TestTable(std::move(mock));
(void)table
.AsyncReadModifyWriteRow("row", TestAppendRule(), former,
TestIncrementRule(), latter)
.get();
}

TEST(TableTest, AsyncReadRows) {
auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncReadRows)
Expand Down