Skip to content

Commit

Permalink
[BACKPORT 2.14][#12003] xCluster: Allow dropping of YSQL tables under…
Browse files Browse the repository at this point in the history
… replication

Summary:
Currently dropping YSQL tables under replication leads to a message saying that the table cannot be
dropped followed by the table being dropped only at the YSQL layer. This leads to some confusing
behaviour where the table can't be accessed via ysqlsh, but can be seen in admin UIs.

The plan to properly fix this is being worked on in gh issue #753, but for now, re-enabling the
deletion of YSQL tables under replication, while still blocking YCQL table deletion.

Original commit: 48e8e32 / D18225

Test Plan:
```
ybd --cxx-test integration-tests_twodc_ysql-test --gtest_filter "*DeleteTableChecks*"
ybd --cxx-test integration-tests_twodc-test --gtest_filter "*DeleteTableChecks*"
```

Reviewers: rahuldesirazu, nicolas, slingam

Reviewed By: slingam

Subscribers: bogdan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D18472
  • Loading branch information
hulien22 committed Aug 19, 2022
1 parent ab41147 commit 5404cf3
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 30 deletions.
4 changes: 3 additions & 1 deletion ent/src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1494,7 +1494,9 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestDropTableBeforeXClusterStream
delete_req.add_stream_id(create_resp.stream_id());
// The following line assumes that cdc_proxy_ has been initialized in the test already
ASSERT_OK(cdc_proxy_->DeleteCDCStream(delete_req, &delete_resp, &delete_rpc));
ASSERT_EQ(!delete_resp.has_error(), true);
// Currently drop table on YSQL tables doesn't delete xCluster streams, so this will fail.
ASSERT_EQ(delete_resp.has_error(), true);
ASSERT_TRUE(delete_resp.error().status().message().find(create_resp.stream_id()) != string::npos);
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCheckPointPersistencyNodeRestart)) {
Expand Down
111 changes: 111 additions & 0 deletions ent/src/yb/integration-tests/twodc-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2127,5 +2127,116 @@ TEST_P(TwoDCTest, TestAlterUniverseRemoveTableAndDrop) {
ASSERT_OK(DeleteUniverseReplication(kUniverseId));
}

TEST_P(TwoDCTest, DeleteTableChecksCQL) {
YB_SKIP_TEST_IN_TSAN();
// Create 3 tables with 1 tablet each.
constexpr int kNT = 1;
std::vector<uint32_t> tables_vector = {kNT, kNT, kNT};
auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 1));
std::vector<std::shared_ptr<client::YBTable>> producer_tables;
std::vector<std::shared_ptr<client::YBTable>> consumer_tables;
producer_tables.reserve(tables.size() / 2);
consumer_tables.reserve(tables.size() / 2);
for (size_t i = 0; i < tables.size(); ++i) {
if (i % 2 == 0) {
producer_tables.push_back(tables[i]);
} else {
consumer_tables.push_back(tables[i]);
}
}

// 1. Write some data.
for (const auto& producer_table : producer_tables) {
LOG(INFO) << "Writing records for table " << producer_table->name().ToString();
WriteWorkload(0, 100, producer_client(), producer_table->name());
}

// Verify data is written on the producer.
for (const auto& producer_table : producer_tables) {
auto producer_results = ScanToStrings(producer_table->name(), producer_client());
ASSERT_EQ(100, producer_results.size());
}

// Set aside one table for AlterUniverseReplication.
std::shared_ptr<client::YBTable> producer_alter_table, consumer_alter_table;
producer_alter_table = producer_tables.back();
producer_tables.pop_back();
consumer_alter_table = consumer_tables.back();
consumer_tables.pop_back();

// 2a. Setup replication.
ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(),
kUniverseId, producer_tables));

// Verify everything is setup correctly.
master::GetUniverseReplicationResponsePB get_universe_replication_resp;
ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId,
&get_universe_replication_resp));
ASSERT_OK(CorrectlyPollingAllTablets(
consumer_cluster(), narrow_cast<uint32_t>(producer_tables.size() * kNT)));

// 2b. Alter Replication
{
auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster());
auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
&consumer_client()->proxy_cache(),
consumer_leader_mini_master->bound_rpc_addr());
master::AlterUniverseReplicationRequestPB alter_req;
master::AlterUniverseReplicationResponsePB alter_resp;
alter_req.set_producer_id(kUniverseId);
alter_req.add_producer_table_ids_to_add(producer_alter_table->id());
rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc));
ASSERT_FALSE(alter_resp.has_error());
// Wait until we have the new table listed in the existing universe config.
ASSERT_OK(LoggedWaitFor(
[&]() -> Result<bool> {
master::GetUniverseReplicationResponsePB tmp_resp;
RETURN_NOT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(),
kUniverseId, &tmp_resp));
return tmp_resp.entry().tables_size() == static_cast<int64>(producer_tables.size() + 1);
},
MonoDelta::FromSeconds(kRpcTimeout), "Verify table created with alter."));

ASSERT_OK(CorrectlyPollingAllTablets(
consumer_cluster(), narrow_cast<uint32_t>((producer_tables.size() + 1) * kNT)));
}
producer_tables.push_back(producer_alter_table);
consumer_tables.push_back(consumer_alter_table);

auto data_replicated_correctly = [&](size_t num_results) -> Result<bool> {
for (const auto& consumer_table : consumer_tables) {
LOG(INFO) << "Checking records for table " << consumer_table->name().ToString();
auto consumer_results = ScanToStrings(consumer_table->name(), consumer_client());

if (num_results != consumer_results.size()) {
return false;
}
}
return true;
};
ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100); },
MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly"));

// Attempt to destroy the producer and consumer tables.
for (size_t i = 0; i < producer_tables.size(); ++i) {
string producer_table_id = producer_tables[i]->id();
string consumer_table_id = consumer_tables[i]->id();
ASSERT_NOK(producer_client()->DeleteTable(producer_table_id));
ASSERT_NOK(consumer_client()->DeleteTable(consumer_table_id));
}

// Delete replication, afterwards table deletions are no longer blocked.
ASSERT_OK(DeleteUniverseReplication(kUniverseId));

for (size_t i = 0; i < producer_tables.size(); ++i) {
string producer_table_id = producer_tables[i]->id();
string consumer_table_id = consumer_tables[i]->id();
ASSERT_OK(producer_client()->DeleteTable(producer_table_id));
ASSERT_OK(consumer_client()->DeleteTable(consumer_table_id));
}
}

} // namespace enterprise
} // namespace yb
26 changes: 13 additions & 13 deletions ent/src/yb/integration-tests/twodc_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,20 +246,20 @@ Status TwoDCTestBase::CorrectlyPollingAllTablets(
}

Status TwoDCTestBase::WaitForSetupUniverseReplicationCleanUp(string producer_uuid) {
auto proxy = std::make_shared<master::MasterReplicationProxy>(
&consumer_client()->proxy_cache(),
VERIFY_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr());

master::GetUniverseReplicationRequestPB req;
master::GetUniverseReplicationResponsePB resp;
return WaitFor([proxy, &req, &resp, producer_uuid]() -> Result<bool> {
req.set_producer_id(producer_uuid);
rpc::RpcController rpc;
Status s = proxy->GetUniverseReplication(req, &resp, &rpc);
auto proxy = std::make_shared<master::MasterReplicationProxy>(
&consumer_client()->proxy_cache(),
VERIFY_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr());

master::GetUniverseReplicationRequestPB req;
master::GetUniverseReplicationResponsePB resp;
return WaitFor([proxy, &req, &resp, producer_uuid]() -> Result<bool> {
req.set_producer_id(producer_uuid);
rpc::RpcController rpc;
Status s = proxy->GetUniverseReplication(req, &resp, &rpc);

return resp.has_error() && resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND;
}, MonoDelta::FromSeconds(kRpcTimeout), "Waiting for universe to delete");
}
return resp.has_error() && resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND;
}, MonoDelta::FromSeconds(kRpcTimeout), "Waiting for universe to delete");
}

} // namespace enterprise
} // namespace yb
27 changes: 12 additions & 15 deletions ent/src/yb/integration-tests/twodc_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -498,13 +498,6 @@ class TwoDCYsqlTest : public TwoDCTestBase, public testing::WithParamInterface<T
}, MonoDelta::FromSeconds(kRpcTimeout), "Verify number of records");
}

Status DeleteTable(Cluster* cluster,
TableId* table_id /* = nullptr */) {
RETURN_NOT_OK(cluster->client_->DeleteTable(*table_id));

return Status::OK();
}

Status TruncateTable(Cluster* cluster,
std::vector<string> table_ids) {
RETURN_NOT_OK(cluster->client_->TruncateTables(table_ids));
Expand Down Expand Up @@ -1480,18 +1473,22 @@ TEST_P(TwoDCYsqlTest, DeleteTableChecks) {
for (size_t i = 0; i < producer_tables.size(); ++i) {
string producer_table_id = producer_tables[i]->id();
string consumer_table_id = consumer_tables[i]->id();
ASSERT_NOK(DeleteTable(&producer_cluster_, &producer_table_id));
ASSERT_NOK(DeleteTable(&consumer_cluster_, &consumer_table_id));
// GH issue #12003, allow deletion of YSQL tables under replication for now.
ASSERT_OK(producer_client()->DeleteTable(producer_table_id));
ASSERT_OK(consumer_client()->DeleteTable(consumer_table_id));
}

ASSERT_OK(DeleteUniverseReplication(kUniverseId));

for (size_t i = 0; i < producer_tables.size(); ++i) {
string producer_table_id = producer_tables[i]->id();
string consumer_table_id = consumer_tables[i]->id();
ASSERT_OK(DeleteTable(&producer_cluster_, &producer_table_id));
ASSERT_OK(DeleteTable(&consumer_cluster_, &consumer_table_id));
}
// TODO(jhe) re-enable these checks after we disallow deletion of YSQL xCluster tables, part
// of gh issue #753.

// for (size_t i = 0; i < producer_tables.size(); ++i) {
// string producer_table_id = producer_tables[i]->id();
// string consumer_table_id = consumer_tables[i]->id();
// ASSERT_OK(producer_client()->DeleteTable(producer_table_id));
// ASSERT_OK(consumer_client()->DeleteTable(consumer_table_id));
// }
}

TEST_P(TwoDCYsqlTest, TruncateTableChecks) {
Expand Down
3 changes: 2 additions & 1 deletion src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5267,7 +5267,8 @@ Status CatalogManager::DeleteTable(
return Status::OK();
}

bool result = IsCdcEnabled(*table);
// For now, only disable dropping YCQL tables under xCluster replication.
bool result = table->GetTableType() == YQL_TABLE_TYPE && IsCdcEnabled(*table);
if (!FLAGS_enable_delete_truncate_xcluster_replicated_table && result) {
return STATUS(NotSupported,
"Cannot delete a table in replication.",
Expand Down

0 comments on commit 5404cf3

Please sign in to comment.