Skip to content

Commit

Permalink
fix(bigtable)!: pass app profile id to connection as options (#9388)
Browse files Browse the repository at this point in the history
  • Loading branch information
dbolduc authored Jun 30, 2022
1 parent ec741cb commit c290d9a
Show file tree
Hide file tree
Showing 9 changed files with 282 additions and 301 deletions.
30 changes: 14 additions & 16 deletions google/cloud/bigtable/data_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,66 +43,65 @@ std::vector<FailedMutation> MakeUnimplementedFailedMutations(std::size_t n) {

DataConnection::~DataConnection() = default;

Status DataConnection::Apply(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string const&, std::string const&, SingleRowMutation) {
// NOLINTNEXTLINE(performance-unnecessary-value-param)
Status DataConnection::Apply(std::string const&, SingleRowMutation) {
return Status(StatusCode::kUnimplemented, "not implemented");
}

future<Status> DataConnection::AsyncApply(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string const&, std::string const&, SingleRowMutation) {
std::string const&, SingleRowMutation) {
return make_ready_future(
Status(StatusCode::kUnimplemented, "not implemented"));
}

std::vector<FailedMutation> DataConnection::BulkApply(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string const&, std::string const&, BulkMutation mut) {
std::string const&, BulkMutation mut) {
return MakeUnimplementedFailedMutations(mut.size());
}

future<std::vector<FailedMutation>> DataConnection::AsyncBulkApply(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string const&, std::string const&, BulkMutation mut) {
std::string const&, BulkMutation mut) {
return make_ready_future(MakeUnimplementedFailedMutations(mut.size()));
}

RowReader DataConnection::ReadRows(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string const&, std::string const&, RowSet, std::int64_t, Filter) {
std::string const&, RowSet, std::int64_t, Filter) {
return MakeRowReader(std::make_shared<bigtable_internal::StatusOnlyRowReader>(
Status(StatusCode::kUnimplemented, "not implemented")));
}

StatusOr<std::pair<bool, Row>> DataConnection::ReadRow(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string const&, std::string const&, std::string, Filter) {
std::string const&, std::string, Filter) {
return Status(StatusCode::kUnimplemented, "not implemented");
}

StatusOr<MutationBranch> DataConnection::CheckAndMutateRow(
std::string const&, std::string const&,
std::string const&,
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string, Filter, std::vector<Mutation>, std::vector<Mutation>) {
return Status(StatusCode::kUnimplemented, "not implemented");
}

future<StatusOr<MutationBranch>> DataConnection::AsyncCheckAndMutateRow(
std::string const&, std::string const&,
std::string const&,
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string, Filter, std::vector<Mutation>, std::vector<Mutation>) {
return make_ready_future<StatusOr<MutationBranch>>(
Status(StatusCode::kUnimplemented, "not implemented"));
}

StatusOr<std::vector<RowKeySample>> DataConnection::SampleRows(
std::string const&, std::string const&) {
std::string const&) {
return Status(StatusCode::kUnimplemented, "not implemented");
}

future<StatusOr<std::vector<RowKeySample>>> DataConnection::AsyncSampleRows(
std::string const&, std::string const&) {
std::string const&) {
return make_ready_future<StatusOr<std::vector<RowKeySample>>>(
Status(StatusCode::kUnimplemented, "not implemented"));
}
Expand All @@ -121,17 +120,16 @@ future<StatusOr<Row>> DataConnection::AsyncReadModifyWriteRow(
}

void DataConnection::AsyncReadRows(
std::string const&, std::string const&,
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::function<future<bool>(Row)>, std::function<void(Status)> on_finish,
std::string const&, std::function<future<bool>(Row)>,
// NOLINTNEXTLINE(performance-unnecessary-value-param)
RowSet, std::int64_t, Filter) {
std::function<void(Status)> on_finish, RowSet, std::int64_t, Filter) {
on_finish(Status(StatusCode::kUnimplemented, "not implemented"));
}

future<StatusOr<std::pair<bool, Row>>> DataConnection::AsyncReadRow(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string const&, std::string const&, std::string, Filter) {
std::string const&, std::string, Filter) {
return make_ready_future<StatusOr<std::pair<bool, Row>>>(
Status(StatusCode::kUnimplemented, "not implemented"));
}
Expand Down
41 changes: 17 additions & 24 deletions google/cloud/bigtable/data_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,61 +52,54 @@ class DataConnection {

virtual Options options() { return Options{}; }

virtual Status Apply(std::string const& app_profile_id,
std::string const& table_name, SingleRowMutation mut);
virtual Status Apply(std::string const& table_name, SingleRowMutation mut);

virtual future<Status> AsyncApply(std::string const& app_profile_id,
std::string const& table_name,
virtual future<Status> AsyncApply(std::string const& table_name,
SingleRowMutation mut);

virtual std::vector<FailedMutation> BulkApply(
std::string const& app_profile_id, std::string const& table_name,
BulkMutation mut);
virtual std::vector<FailedMutation> BulkApply(std::string const& table_name,
BulkMutation mut);

virtual future<std::vector<FailedMutation>> AsyncBulkApply(
std::string const& app_profile_id, std::string const& table_name,
BulkMutation mut);
std::string const& table_name, BulkMutation mut);

virtual RowReader ReadRows(std::string const& app_profile_id,
std::string const& table_name, RowSet row_set,
virtual RowReader ReadRows(std::string const& table_name, RowSet row_set,
std::int64_t rows_limit, Filter filter);

virtual StatusOr<std::pair<bool, Row>> ReadRow(
std::string const& app_profile_id, std::string const& table_name,
std::string row_key, Filter filter);
virtual StatusOr<std::pair<bool, Row>> ReadRow(std::string const& table_name,
std::string row_key,
Filter filter);

virtual StatusOr<MutationBranch> CheckAndMutateRow(
std::string const& app_profile_id, std::string const& table_name,
std::string row_key, Filter filter, std::vector<Mutation> true_mutations,
std::string const& table_name, std::string row_key, Filter filter,
std::vector<Mutation> true_mutations,
std::vector<Mutation> false_mutations);

virtual future<StatusOr<MutationBranch>> AsyncCheckAndMutateRow(
std::string const& app_profile_id, std::string const& table_name,
std::string row_key, Filter filter, std::vector<Mutation> true_mutations,
std::string const& table_name, std::string row_key, Filter filter,
std::vector<Mutation> true_mutations,
std::vector<Mutation> false_mutations);

virtual StatusOr<std::vector<RowKeySample>> SampleRows(
std::string const& app_profile_id, std::string const& table_name);
std::string const& table_name);

virtual future<StatusOr<std::vector<RowKeySample>>> AsyncSampleRows(
std::string const& app_profile_id, std::string const& table_name);
std::string const& table_name);

virtual StatusOr<Row> ReadModifyWriteRow(
google::bigtable::v2::ReadModifyWriteRowRequest request);

virtual future<StatusOr<Row>> AsyncReadModifyWriteRow(
google::bigtable::v2::ReadModifyWriteRowRequest request);

virtual void AsyncReadRows(std::string const& app_profile_id,
std::string const& table_name,
virtual void AsyncReadRows(std::string const& table_name,
std::function<future<bool>(Row)> on_row,
std::function<void(Status)> on_finish,
RowSet row_set, std::int64_t rows_limit,
Filter filter);

virtual future<StatusOr<std::pair<bool, Row>>> AsyncReadRow(
std::string const& app_profile_id, std::string const& table_name,
std::string row_key, Filter filter);
std::string const& table_name, std::string row_key, Filter filter);
};

/**
Expand Down
67 changes: 30 additions & 37 deletions google/cloud/bigtable/internal/data_connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,10 @@ DataConnectionImpl::DataConnectionImpl(
std::move(options),
bigtable::internal::DefaultDataOptions(DataConnection::options()))) {}

Status DataConnectionImpl::Apply(std::string const& app_profile_id,
std::string const& table_name,
Status DataConnectionImpl::Apply(std::string const& table_name,
bigtable::SingleRowMutation mut) {
google::bigtable::v2::MutateRowRequest request;
request.set_app_profile_id(app_profile_id);
request.set_app_profile_id(app_profile_id());
request.set_table_name(table_name);
mut.MoveTo(request);

Expand All @@ -86,11 +85,10 @@ Status DataConnectionImpl::Apply(std::string const& app_profile_id,
return Status{};
}

future<Status> DataConnectionImpl::AsyncApply(std::string const& app_profile_id,
std::string const& table_name,
future<Status> DataConnectionImpl::AsyncApply(std::string const& table_name,
bigtable::SingleRowMutation mut) {
google::bigtable::v2::MutateRowRequest request;
request.set_app_profile_id(app_profile_id);
request.set_app_profile_id(app_profile_id());
request.set_table_name(table_name);
mut.MoveTo(request);

Expand Down Expand Up @@ -121,11 +119,10 @@ future<Status> DataConnectionImpl::AsyncApply(std::string const& app_profile_id,
}

std::vector<bigtable::FailedMutation> DataConnectionImpl::BulkApply(
std::string const& app_profile_id, std::string const& table_name,
bigtable::BulkMutation mut) {
std::string const& table_name, bigtable::BulkMutation mut) {
if (mut.empty()) return {};
bigtable::internal::BulkMutator mutator(
app_profile_id, table_name, *idempotency_policy(), std::move(mut));
app_profile_id(), table_name, *idempotency_policy(), std::move(mut));
// We wait to allocate the policies until they are needed as a
// micro-optimization.
std::unique_ptr<bigtable::DataRetryPolicy> retry;
Expand All @@ -143,31 +140,30 @@ std::vector<bigtable::FailedMutation> DataConnectionImpl::BulkApply(
}

future<std::vector<bigtable::FailedMutation>>
DataConnectionImpl::AsyncBulkApply(std::string const& app_profile_id,
std::string const& table_name,
DataConnectionImpl::AsyncBulkApply(std::string const& table_name,
bigtable::BulkMutation mut) {
return AsyncBulkApplier::Create(background_->cq(), stub_, retry_policy(),
backoff_policy(), *idempotency_policy(),
app_profile_id, table_name, std::move(mut));
app_profile_id(), table_name, std::move(mut));
}

bigtable::RowReader DataConnectionImpl::ReadRows(
std::string const& app_profile_id, std::string const& table_name,
bigtable::RowSet row_set, std::int64_t rows_limit,
bigtable::Filter filter) {
bigtable::RowReader DataConnectionImpl::ReadRows(std::string const& table_name,
bigtable::RowSet row_set,
std::int64_t rows_limit,
bigtable::Filter filter) {
auto impl = std::make_shared<DefaultRowReader>(
stub_, app_profile_id, table_name, std::move(row_set), rows_limit,
stub_, app_profile_id(), table_name, std::move(row_set), rows_limit,
std::move(filter), retry_policy(), backoff_policy());
return MakeRowReader(std::move(impl));
}

StatusOr<std::pair<bool, bigtable::Row>> DataConnectionImpl::ReadRow(
std::string const& app_profile_id, std::string const& table_name,
std::string row_key, bigtable::Filter filter) {
std::string const& table_name, std::string row_key,
bigtable::Filter filter) {
bigtable::RowSet row_set(std::move(row_key));
std::int64_t const rows_limit = 1;
auto reader = ReadRows(app_profile_id, table_name, std::move(row_set),
rows_limit, std::move(filter));
auto reader =
ReadRows(table_name, std::move(row_set), rows_limit, std::move(filter));

auto it = reader.begin();
if (it == reader.end()) return std::make_pair(false, bigtable::Row("", {}));
Expand All @@ -182,12 +178,11 @@ StatusOr<std::pair<bool, bigtable::Row>> DataConnectionImpl::ReadRow(
}

StatusOr<bigtable::MutationBranch> DataConnectionImpl::CheckAndMutateRow(
std::string const& app_profile_id, std::string const& table_name,
std::string row_key, bigtable::Filter filter,
std::string const& table_name, std::string row_key, bigtable::Filter filter,
std::vector<bigtable::Mutation> true_mutations,
std::vector<bigtable::Mutation> false_mutations) {
google::bigtable::v2::CheckAndMutateRowRequest request;
request.set_app_profile_id(app_profile_id);
request.set_app_profile_id(app_profile_id());
request.set_table_name(table_name);
request.set_row_key(std::move(row_key));
*request.mutable_predicate_filter() = std::move(filter).as_proto();
Expand Down Expand Up @@ -216,12 +211,11 @@ StatusOr<bigtable::MutationBranch> DataConnectionImpl::CheckAndMutateRow(

future<StatusOr<bigtable::MutationBranch>>
DataConnectionImpl::AsyncCheckAndMutateRow(
std::string const& app_profile_id, std::string const& table_name,
std::string row_key, bigtable::Filter filter,
std::string const& table_name, std::string row_key, bigtable::Filter filter,
std::vector<bigtable::Mutation> true_mutations,
std::vector<bigtable::Mutation> false_mutations) {
google::bigtable::v2::CheckAndMutateRowRequest request;
request.set_app_profile_id(app_profile_id);
request.set_app_profile_id(app_profile_id());
request.set_table_name(table_name);
request.set_row_key(std::move(row_key));
*request.mutable_predicate_filter() = std::move(filter).as_proto();
Expand Down Expand Up @@ -258,9 +252,9 @@ DataConnectionImpl::AsyncCheckAndMutateRow(
}

StatusOr<std::vector<bigtable::RowKeySample>> DataConnectionImpl::SampleRows(
std::string const& app_profile_id, std::string const& table_name) {
std::string const& table_name) {
google::bigtable::v2::SampleRowKeysRequest request;
request.set_app_profile_id(app_profile_id);
request.set_app_profile_id(app_profile_id());
request.set_table_name(table_name);

Status status;
Expand Down Expand Up @@ -307,10 +301,10 @@ StatusOr<std::vector<bigtable::RowKeySample>> DataConnectionImpl::SampleRows(
}

future<StatusOr<std::vector<bigtable::RowKeySample>>>
DataConnectionImpl::AsyncSampleRows(std::string const& app_profile_id,
std::string const& table_name) {
DataConnectionImpl::AsyncSampleRows(std::string const& table_name) {
return AsyncRowSampler::Create(background_->cq(), stub_, retry_policy(),
backoff_policy(), app_profile_id, table_name);
backoff_policy(), app_profile_id(),
table_name);
}

StatusOr<bigtable::Row> DataConnectionImpl::ReadModifyWriteRow(
Expand Down Expand Up @@ -350,19 +344,18 @@ future<StatusOr<bigtable::Row>> DataConnectionImpl::AsyncReadModifyWriteRow(
}

void DataConnectionImpl::AsyncReadRows(
std::string const& app_profile_id, std::string const& table_name,
std::string const& table_name,
std::function<future<bool>(bigtable::Row)> on_row,
std::function<void(Status)> on_finish, bigtable::RowSet row_set,
std::int64_t rows_limit, bigtable::Filter filter) {
bigtable_internal::AsyncRowReader::Create(
background_->cq(), stub_, app_profile_id, table_name, std::move(on_row),
background_->cq(), stub_, app_profile_id(), table_name, std::move(on_row),
std::move(on_finish), std::move(row_set), rows_limit, std::move(filter),
retry_policy(), backoff_policy());
}

future<StatusOr<std::pair<bool, bigtable::Row>>>
DataConnectionImpl::AsyncReadRow(std::string const& app_profile_id,
std::string const& table_name,
DataConnectionImpl::AsyncReadRow(std::string const& table_name,
std::string row_key, bigtable::Filter filter) {
class AsyncReadRowHandler {
public:
Expand Down Expand Up @@ -409,7 +402,7 @@ DataConnectionImpl::AsyncReadRow(std::string const& app_profile_id,
std::int64_t const rows_limit = 1;
auto handler = std::make_shared<AsyncReadRowHandler>();
AsyncReadRows(
app_profile_id, table_name,
table_name,
[handler](bigtable::Row row) { return handler->OnRow(std::move(row)); },
[handler](Status status) {
handler->OnStreamFinished(std::move(status));
Expand Down
Loading

0 comments on commit c290d9a

Please sign in to comment.