Skip to content

Commit

Permalink
[#12003] xCluster: Allow dropping of YSQL tables under replication
Browse files Browse the repository at this point in the history
Summary:
Currently dropping YSQL tables under replication leads to a message saying that the table cannot be
dropped followed by the table being dropped only at the YSQL layer. This leads to some confusing
behaviour where the table can't be accessed via ysqlsh, but can be seen in admin UIs.

The plan to properly fix this is being worked on in gh issue #753, but for now, re-enabling the
deletion of YSQL tables under replication, while still blocking YCQL table deletion.

Test Plan:
```
ybd --cxx-test integration-tests_twodc_ysql-test --gtest_filter "*DeleteTableChecks*"
ybd --cxx-test integration-tests_twodc-test --gtest_filter "*DeleteTableChecks*"
```

Reviewers: rahuldesirazu, slingam, nicolas

Reviewed By: nicolas

Subscribers: ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D18225
  • Loading branch information
hulien22 committed Jul 20, 2022
1 parent c3c2231 commit 48e8e32
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 29 deletions.
111 changes: 111 additions & 0 deletions ent/src/yb/integration-tests/twodc-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2514,6 +2514,117 @@ TEST_P(TwoDCTest, TestNonZeroLagMetricsWithoutGetChange) {
ASSERT_OK(DeleteUniverseReplication(kUniverseId));
}

TEST_P(TwoDCTest, DeleteTableChecksCQL) {
YB_SKIP_TEST_IN_TSAN();
// Create 3 tables with 1 tablet each.
constexpr int kNT = 1;
std::vector<uint32_t> tables_vector = {kNT, kNT, kNT};
auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 1));
std::vector<std::shared_ptr<client::YBTable>> producer_tables;
std::vector<std::shared_ptr<client::YBTable>> consumer_tables;
producer_tables.reserve(tables.size() / 2);
consumer_tables.reserve(tables.size() / 2);
for (size_t i = 0; i < tables.size(); ++i) {
if (i % 2 == 0) {
producer_tables.push_back(tables[i]);
} else {
consumer_tables.push_back(tables[i]);
}
}

// 1. Write some data.
for (const auto& producer_table : producer_tables) {
LOG(INFO) << "Writing records for table " << producer_table->name().ToString();
WriteWorkload(0, 100, producer_client(), producer_table->name());
}

// Verify data is written on the producer.
for (const auto& producer_table : producer_tables) {
auto producer_results = ScanToStrings(producer_table->name(), producer_client());
ASSERT_EQ(100, producer_results.size());
}

// Set aside one table for AlterUniverseReplication.
std::shared_ptr<client::YBTable> producer_alter_table, consumer_alter_table;
producer_alter_table = producer_tables.back();
producer_tables.pop_back();
consumer_alter_table = consumer_tables.back();
consumer_tables.pop_back();

// 2a. Setup replication.
ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(),
kUniverseId, producer_tables));

// Verify everything is setup correctly.
master::GetUniverseReplicationResponsePB get_universe_replication_resp;
ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId,
&get_universe_replication_resp));
ASSERT_OK(CorrectlyPollingAllTablets(
consumer_cluster(), narrow_cast<uint32_t>(producer_tables.size() * kNT)));

// 2b. Alter Replication
{
auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster());
auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
&consumer_client()->proxy_cache(),
consumer_leader_mini_master->bound_rpc_addr());
master::AlterUniverseReplicationRequestPB alter_req;
master::AlterUniverseReplicationResponsePB alter_resp;
alter_req.set_producer_id(kUniverseId);
alter_req.add_producer_table_ids_to_add(producer_alter_table->id());
rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc));
ASSERT_FALSE(alter_resp.has_error());
// Wait until we have the new table listed in the existing universe config.
ASSERT_OK(LoggedWaitFor(
[&]() -> Result<bool> {
master::GetUniverseReplicationResponsePB tmp_resp;
RETURN_NOT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(),
kUniverseId, &tmp_resp));
return tmp_resp.entry().tables_size() == static_cast<int64>(producer_tables.size() + 1);
},
MonoDelta::FromSeconds(kRpcTimeout), "Verify table created with alter."));

ASSERT_OK(CorrectlyPollingAllTablets(
consumer_cluster(), narrow_cast<uint32_t>((producer_tables.size() + 1) * kNT)));
}
producer_tables.push_back(producer_alter_table);
consumer_tables.push_back(consumer_alter_table);

auto data_replicated_correctly = [&](size_t num_results) -> Result<bool> {
for (const auto& consumer_table : consumer_tables) {
LOG(INFO) << "Checking records for table " << consumer_table->name().ToString();
auto consumer_results = ScanToStrings(consumer_table->name(), consumer_client());

if (num_results != consumer_results.size()) {
return false;
}
}
return true;
};
ASSERT_OK(WaitFor([&]() -> Result<bool> { return data_replicated_correctly(100); },
MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly"));

// Attempt to destroy the producer and consumer tables.
for (size_t i = 0; i < producer_tables.size(); ++i) {
string producer_table_id = producer_tables[i]->id();
string consumer_table_id = consumer_tables[i]->id();
ASSERT_NOK(producer_client()->DeleteTable(producer_table_id));
ASSERT_NOK(consumer_client()->DeleteTable(consumer_table_id));
}

// Delete replication, afterwards table deletions are no longer blocked.
ASSERT_OK(DeleteUniverseReplication(kUniverseId));

for (size_t i = 0; i < producer_tables.size(); ++i) {
string producer_table_id = producer_tables[i]->id();
string consumer_table_id = consumer_tables[i]->id();
ASSERT_OK(producer_client()->DeleteTable(producer_table_id));
ASSERT_OK(consumer_client()->DeleteTable(consumer_table_id));
}
}

class TwoDCTestWaitForReplicationDrain : public TwoDCTest {
public:
void SetUpTablesAndReplication(
Expand Down
26 changes: 13 additions & 13 deletions ent/src/yb/integration-tests/twodc_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,20 +246,20 @@ Status TwoDCTestBase::CorrectlyPollingAllTablets(
}

Status TwoDCTestBase::WaitForSetupUniverseReplicationCleanUp(string producer_uuid) {
auto proxy = std::make_shared<master::MasterReplicationProxy>(
&consumer_client()->proxy_cache(),
VERIFY_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr());

master::GetUniverseReplicationRequestPB req;
master::GetUniverseReplicationResponsePB resp;
return WaitFor([proxy, &req, &resp, producer_uuid]() -> Result<bool> {
req.set_producer_id(producer_uuid);
rpc::RpcController rpc;
Status s = proxy->GetUniverseReplication(req, &resp, &rpc);
auto proxy = std::make_shared<master::MasterReplicationProxy>(
&consumer_client()->proxy_cache(),
VERIFY_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr());

master::GetUniverseReplicationRequestPB req;
master::GetUniverseReplicationResponsePB resp;
return WaitFor([proxy, &req, &resp, producer_uuid]() -> Result<bool> {
req.set_producer_id(producer_uuid);
rpc::RpcController rpc;
Status s = proxy->GetUniverseReplication(req, &resp, &rpc);

return resp.has_error() && resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND;
}, MonoDelta::FromSeconds(kRpcTimeout), "Waiting for universe to delete");
}
return resp.has_error() && resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND;
}, MonoDelta::FromSeconds(kRpcTimeout), "Waiting for universe to delete");
}

} // namespace enterprise
} // namespace yb
27 changes: 12 additions & 15 deletions ent/src/yb/integration-tests/twodc_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -531,13 +531,6 @@ class TwoDCYsqlTest : public TwoDCTestBase, public testing::WithParamInterface<T
}, MonoDelta::FromSeconds(kRpcTimeout), "Verify number of records");
}

Status DeleteTable(Cluster* cluster,
TableId* table_id /* = nullptr */) {
RETURN_NOT_OK(cluster->client_->DeleteTable(*table_id));

return Status::OK();
}

Status TruncateTable(Cluster* cluster,
std::vector<string> table_ids) {
RETURN_NOT_OK(cluster->client_->TruncateTables(table_ids));
Expand Down Expand Up @@ -1551,18 +1544,22 @@ TEST_P(TwoDCYsqlTest, DeleteTableChecks) {
for (size_t i = 0; i < producer_tables.size(); ++i) {
string producer_table_id = producer_tables[i]->id();
string consumer_table_id = consumer_tables[i]->id();
ASSERT_NOK(DeleteTable(&producer_cluster_, &producer_table_id));
ASSERT_NOK(DeleteTable(&consumer_cluster_, &consumer_table_id));
// GH issue #12003, allow deletion of YSQL tables under replication for now.
ASSERT_OK(producer_client()->DeleteTable(producer_table_id));
ASSERT_OK(consumer_client()->DeleteTable(consumer_table_id));
}

ASSERT_OK(DeleteUniverseReplication(kUniverseId));

for (size_t i = 0; i < producer_tables.size(); ++i) {
string producer_table_id = producer_tables[i]->id();
string consumer_table_id = consumer_tables[i]->id();
ASSERT_OK(DeleteTable(&producer_cluster_, &producer_table_id));
ASSERT_OK(DeleteTable(&consumer_cluster_, &consumer_table_id));
}
// TODO(jhe) re-enable these checks after we disallow deletion of YSQL xCluster tables, part
// of gh issue #753.

// for (size_t i = 0; i < producer_tables.size(); ++i) {
// string producer_table_id = producer_tables[i]->id();
// string consumer_table_id = consumer_tables[i]->id();
// ASSERT_OK(producer_client()->DeleteTable(producer_table_id));
// ASSERT_OK(consumer_client()->DeleteTable(consumer_table_id));
// }
}

TEST_P(TwoDCYsqlTest, TruncateTableChecks) {
Expand Down
4 changes: 3 additions & 1 deletion src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5228,7 +5228,9 @@ Status CatalogManager::DeleteTable(
<< req->ShortDebugString();

scoped_refptr<TableInfo> table = VERIFY_RESULT(FindTable(req->table()));
bool result = IsCdcEnabled(*table);

// For now, only disable dropping YCQL tables under xCluster replication.
bool result = table->GetTableType() == YQL_TABLE_TYPE && IsCdcEnabled(*table);
if (!FLAGS_enable_delete_truncate_xcluster_replicated_table && result) {
return STATUS(NotSupported,
"Cannot delete a table in replication.",
Expand Down

0 comments on commit 48e8e32

Please sign in to comment.