From 48e8e32c39322a1163ff2f43c058b6d383784ece Mon Sep 17 00:00:00 2001 From: jhe Date: Mon, 11 Jul 2022 14:54:37 -0700 Subject: [PATCH] [#12003] xCluster: Allow dropping of YSQL tables under replication Summary: Currently dropping YSQL tables under replication leads to a message saying that the table cannot be dropped followed by the table being dropped only at the YSQL layer. This leads to some confusing behaviour where the table can't be accessed via ysqlsh, but can be seen in admin UIs. The plan to properly fix this is being worked on in gh issue #753, but for now, re-enabling the deletion of YSQL tables under replication, while still blocking YCQL table deletion. Test Plan: ``` ybd --cxx-test integration-tests_twodc_ysql-test --gtest_filter "*DeleteTableChecks*" ybd --cxx-test integration-tests_twodc-test --gtest_filter "*DeleteTableChecks*" ``` Reviewers: rahuldesirazu, slingam, nicolas Reviewed By: nicolas Subscribers: ybase, bogdan Differential Revision: https://phabricator.dev.yugabyte.com/D18225 --- ent/src/yb/integration-tests/twodc-test.cc | 111 ++++++++++++++++++ .../yb/integration-tests/twodc_test_base.cc | 26 ++-- .../yb/integration-tests/twodc_ysql-test.cc | 27 ++--- src/yb/master/catalog_manager.cc | 4 +- 4 files changed, 139 insertions(+), 29 deletions(-) diff --git a/ent/src/yb/integration-tests/twodc-test.cc b/ent/src/yb/integration-tests/twodc-test.cc index 6b68dbedcf4a..0cbcb7ea69aa 100644 --- a/ent/src/yb/integration-tests/twodc-test.cc +++ b/ent/src/yb/integration-tests/twodc-test.cc @@ -2514,6 +2514,117 @@ TEST_P(TwoDCTest, TestNonZeroLagMetricsWithoutGetChange) { ASSERT_OK(DeleteUniverseReplication(kUniverseId)); } +TEST_P(TwoDCTest, DeleteTableChecksCQL) { + YB_SKIP_TEST_IN_TSAN(); + // Create 3 tables with 1 tablet each. + constexpr int kNT = 1; + std::vector tables_vector = {kNT, kNT, kNT}; + auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 1)); + std::vector> producer_tables; + std::vector> consumer_tables; + producer_tables.reserve(tables.size() / 2); + consumer_tables.reserve(tables.size() / 2); + for (size_t i = 0; i < tables.size(); ++i) { + if (i % 2 == 0) { + producer_tables.push_back(tables[i]); + } else { + consumer_tables.push_back(tables[i]); + } + } + + // 1. Write some data. + for (const auto& producer_table : producer_tables) { + LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); + WriteWorkload(0, 100, producer_client(), producer_table->name()); + } + + // Verify data is written on the producer. + for (const auto& producer_table : producer_tables) { + auto producer_results = ScanToStrings(producer_table->name(), producer_client()); + ASSERT_EQ(100, producer_results.size()); + } + + // Set aside one table for AlterUniverseReplication. + std::shared_ptr producer_alter_table, consumer_alter_table; + producer_alter_table = producer_tables.back(); + producer_tables.pop_back(); + consumer_alter_table = consumer_tables.back(); + consumer_tables.pop_back(); + + // 2a. Setup replication. + ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), + kUniverseId, producer_tables)); + + // Verify everything is setup correctly. + master::GetUniverseReplicationResponsePB get_universe_replication_resp; + ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, + &get_universe_replication_resp)); + ASSERT_OK(CorrectlyPollingAllTablets( + consumer_cluster(), narrow_cast(producer_tables.size() * kNT))); + + // 2b. Alter Replication + { + auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster()); + auto master_proxy = std::make_shared( + &consumer_client()->proxy_cache(), + consumer_leader_mini_master->bound_rpc_addr()); + master::AlterUniverseReplicationRequestPB alter_req; + master::AlterUniverseReplicationResponsePB alter_resp; + alter_req.set_producer_id(kUniverseId); + alter_req.add_producer_table_ids_to_add(producer_alter_table->id()); + rpc::RpcController rpc; + rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); + ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc)); + ASSERT_FALSE(alter_resp.has_error()); + // Wait until we have the new table listed in the existing universe config. + ASSERT_OK(LoggedWaitFor( + [&]() -> Result { + master::GetUniverseReplicationResponsePB tmp_resp; + RETURN_NOT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), + kUniverseId, &tmp_resp)); + return tmp_resp.entry().tables_size() == static_cast(producer_tables.size() + 1); + }, + MonoDelta::FromSeconds(kRpcTimeout), "Verify table created with alter.")); + + ASSERT_OK(CorrectlyPollingAllTablets( + consumer_cluster(), narrow_cast((producer_tables.size() + 1) * kNT))); + } + producer_tables.push_back(producer_alter_table); + consumer_tables.push_back(consumer_alter_table); + + auto data_replicated_correctly = [&](size_t num_results) -> Result { + for (const auto& consumer_table : consumer_tables) { + LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); + auto consumer_results = ScanToStrings(consumer_table->name(), consumer_client()); + + if (num_results != consumer_results.size()) { + return false; + } + } + return true; + }; + ASSERT_OK(WaitFor([&]() -> Result { return data_replicated_correctly(100); }, + MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); + + // Attempt to destroy the producer and consumer tables. + for (size_t i = 0; i < producer_tables.size(); ++i) { + string producer_table_id = producer_tables[i]->id(); + string consumer_table_id = consumer_tables[i]->id(); + ASSERT_NOK(producer_client()->DeleteTable(producer_table_id)); + ASSERT_NOK(consumer_client()->DeleteTable(consumer_table_id)); + } + + // Delete replication, afterwards table deletions are no longer blocked. + ASSERT_OK(DeleteUniverseReplication(kUniverseId)); + + for (size_t i = 0; i < producer_tables.size(); ++i) { + string producer_table_id = producer_tables[i]->id(); + string consumer_table_id = consumer_tables[i]->id(); + ASSERT_OK(producer_client()->DeleteTable(producer_table_id)); + ASSERT_OK(consumer_client()->DeleteTable(consumer_table_id)); + } +} + class TwoDCTestWaitForReplicationDrain : public TwoDCTest { public: void SetUpTablesAndReplication( diff --git a/ent/src/yb/integration-tests/twodc_test_base.cc b/ent/src/yb/integration-tests/twodc_test_base.cc index 1494e8fefa48..070b568b231b 100644 --- a/ent/src/yb/integration-tests/twodc_test_base.cc +++ b/ent/src/yb/integration-tests/twodc_test_base.cc @@ -246,20 +246,20 @@ Status TwoDCTestBase::CorrectlyPollingAllTablets( } Status TwoDCTestBase::WaitForSetupUniverseReplicationCleanUp(string producer_uuid) { - auto proxy = std::make_shared( - &consumer_client()->proxy_cache(), - VERIFY_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); - - master::GetUniverseReplicationRequestPB req; - master::GetUniverseReplicationResponsePB resp; - return WaitFor([proxy, &req, &resp, producer_uuid]() -> Result { - req.set_producer_id(producer_uuid); - rpc::RpcController rpc; - Status s = proxy->GetUniverseReplication(req, &resp, &rpc); + auto proxy = std::make_shared( + &consumer_client()->proxy_cache(), + VERIFY_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); + + master::GetUniverseReplicationRequestPB req; + master::GetUniverseReplicationResponsePB resp; + return WaitFor([proxy, &req, &resp, producer_uuid]() -> Result { + req.set_producer_id(producer_uuid); + rpc::RpcController rpc; + Status s = proxy->GetUniverseReplication(req, &resp, &rpc); - return resp.has_error() && resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND; - }, MonoDelta::FromSeconds(kRpcTimeout), "Waiting for universe to delete"); - } + return resp.has_error() && resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND; + }, MonoDelta::FromSeconds(kRpcTimeout), "Waiting for universe to delete"); +} } // namespace enterprise } // namespace yb diff --git a/ent/src/yb/integration-tests/twodc_ysql-test.cc b/ent/src/yb/integration-tests/twodc_ysql-test.cc index b3b097c05557..7fd0e2fc2e7b 100644 --- a/ent/src/yb/integration-tests/twodc_ysql-test.cc +++ b/ent/src/yb/integration-tests/twodc_ysql-test.cc @@ -531,13 +531,6 @@ class TwoDCYsqlTest : public TwoDCTestBase, public testing::WithParamInterfaceclient_->DeleteTable(*table_id)); - - return Status::OK(); - } - Status TruncateTable(Cluster* cluster, std::vector table_ids) { RETURN_NOT_OK(cluster->client_->TruncateTables(table_ids)); @@ -1551,18 +1544,22 @@ TEST_P(TwoDCYsqlTest, DeleteTableChecks) { for (size_t i = 0; i < producer_tables.size(); ++i) { string producer_table_id = producer_tables[i]->id(); string consumer_table_id = consumer_tables[i]->id(); - ASSERT_NOK(DeleteTable(&producer_cluster_, &producer_table_id)); - ASSERT_NOK(DeleteTable(&consumer_cluster_, &consumer_table_id)); + // GH issue #12003, allow deletion of YSQL tables under replication for now. + ASSERT_OK(producer_client()->DeleteTable(producer_table_id)); + ASSERT_OK(consumer_client()->DeleteTable(consumer_table_id)); } ASSERT_OK(DeleteUniverseReplication(kUniverseId)); - for (size_t i = 0; i < producer_tables.size(); ++i) { - string producer_table_id = producer_tables[i]->id(); - string consumer_table_id = consumer_tables[i]->id(); - ASSERT_OK(DeleteTable(&producer_cluster_, &producer_table_id)); - ASSERT_OK(DeleteTable(&consumer_cluster_, &consumer_table_id)); - } + // TODO(jhe) re-enable these checks after we disallow deletion of YSQL xCluster tables, part + // of gh issue #753. + + // for (size_t i = 0; i < producer_tables.size(); ++i) { + // string producer_table_id = producer_tables[i]->id(); + // string consumer_table_id = consumer_tables[i]->id(); + // ASSERT_OK(producer_client()->DeleteTable(producer_table_id)); + // ASSERT_OK(consumer_client()->DeleteTable(consumer_table_id)); + // } } TEST_P(TwoDCYsqlTest, TruncateTableChecks) { diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 3a323063eaf4..46174a07400f 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -5228,7 +5228,9 @@ Status CatalogManager::DeleteTable( << req->ShortDebugString(); scoped_refptr table = VERIFY_RESULT(FindTable(req->table())); - bool result = IsCdcEnabled(*table); + + // For now, only disable dropping YCQL tables under xCluster replication. + bool result = table->GetTableType() == YQL_TABLE_TYPE && IsCdcEnabled(*table); if (!FLAGS_enable_delete_truncate_xcluster_replicated_table && result) { return STATUS(NotSupported, "Cannot delete a table in replication.",