From 5404cf38a0fb567a6bb5689f383678c77f0097a7 Mon Sep 17 00:00:00 2001 From: jhe Date: Fri, 19 Aug 2022 12:13:53 -0700 Subject: [PATCH] [BACKPORT 2.14][#12003] xCluster: Allow dropping of YSQL tables under replication Summary: Currently dropping YSQL tables under replication leads to a message saying that the table cannot be dropped followed by the table being dropped only at the YSQL layer. This leads to some confusing behaviour where the table can't be accessed via ysqlsh, but can be seen in admin UIs. The plan to properly fix this is being worked on in gh issue #753, but for now, re-enabling the deletion of YSQL tables under replication, while still blocking YCQL table deletion. Original commit: 48e8e32c39322a1163ff2f43c058b6d383784ece / D18225 Test Plan: ``` ybd --cxx-test integration-tests_twodc_ysql-test --gtest_filter "*DeleteTableChecks*" ybd --cxx-test integration-tests_twodc-test --gtest_filter "*DeleteTableChecks*" ``` Reviewers: rahuldesirazu, nicolas, slingam Reviewed By: slingam Subscribers: bogdan, ybase Differential Revision: https://phabricator.dev.yugabyte.com/D18472 --- .../yb/integration-tests/cdcsdk_ysql-test.cc | 4 +- ent/src/yb/integration-tests/twodc-test.cc | 111 ++++++++++++++++++ .../yb/integration-tests/twodc_test_base.cc | 26 ++-- .../yb/integration-tests/twodc_ysql-test.cc | 27 ++--- src/yb/master/catalog_manager.cc | 3 +- 5 files changed, 141 insertions(+), 30 deletions(-) diff --git a/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc b/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc index 9f81eea46b1d..acc5f3ad74a0 100644 --- a/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -1494,7 +1494,9 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestDropTableBeforeXClusterStream delete_req.add_stream_id(create_resp.stream_id()); // The following line assumes that cdc_proxy_ has been initialized in the test already ASSERT_OK(cdc_proxy_->DeleteCDCStream(delete_req, &delete_resp, &delete_rpc)); - ASSERT_EQ(!delete_resp.has_error(), true); + // Currently drop table on YSQL tables doesn't delete xCluster streams, so this will fail. + ASSERT_EQ(delete_resp.has_error(), true); + ASSERT_TRUE(delete_resp.error().status().message().find(create_resp.stream_id()) != string::npos); } TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCheckPointPersistencyNodeRestart)) { diff --git a/ent/src/yb/integration-tests/twodc-test.cc b/ent/src/yb/integration-tests/twodc-test.cc index 5a17a7c582d2..346f636e7bff 100644 --- a/ent/src/yb/integration-tests/twodc-test.cc +++ b/ent/src/yb/integration-tests/twodc-test.cc @@ -2127,5 +2127,116 @@ TEST_P(TwoDCTest, TestAlterUniverseRemoveTableAndDrop) { ASSERT_OK(DeleteUniverseReplication(kUniverseId)); } +TEST_P(TwoDCTest, DeleteTableChecksCQL) { + YB_SKIP_TEST_IN_TSAN(); + // Create 3 tables with 1 tablet each. + constexpr int kNT = 1; + std::vector tables_vector = {kNT, kNT, kNT}; + auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 1)); + std::vector> producer_tables; + std::vector> consumer_tables; + producer_tables.reserve(tables.size() / 2); + consumer_tables.reserve(tables.size() / 2); + for (size_t i = 0; i < tables.size(); ++i) { + if (i % 2 == 0) { + producer_tables.push_back(tables[i]); + } else { + consumer_tables.push_back(tables[i]); + } + } + + // 1. Write some data. + for (const auto& producer_table : producer_tables) { + LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); + WriteWorkload(0, 100, producer_client(), producer_table->name()); + } + + // Verify data is written on the producer. + for (const auto& producer_table : producer_tables) { + auto producer_results = ScanToStrings(producer_table->name(), producer_client()); + ASSERT_EQ(100, producer_results.size()); + } + + // Set aside one table for AlterUniverseReplication. + std::shared_ptr producer_alter_table, consumer_alter_table; + producer_alter_table = producer_tables.back(); + producer_tables.pop_back(); + consumer_alter_table = consumer_tables.back(); + consumer_tables.pop_back(); + + // 2a. Setup replication. + ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), + kUniverseId, producer_tables)); + + // Verify everything is setup correctly. + master::GetUniverseReplicationResponsePB get_universe_replication_resp; + ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, + &get_universe_replication_resp)); + ASSERT_OK(CorrectlyPollingAllTablets( + consumer_cluster(), narrow_cast(producer_tables.size() * kNT))); + + // 2b. Alter Replication + { + auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster()); + auto master_proxy = std::make_shared( + &consumer_client()->proxy_cache(), + consumer_leader_mini_master->bound_rpc_addr()); + master::AlterUniverseReplicationRequestPB alter_req; + master::AlterUniverseReplicationResponsePB alter_resp; + alter_req.set_producer_id(kUniverseId); + alter_req.add_producer_table_ids_to_add(producer_alter_table->id()); + rpc::RpcController rpc; + rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); + ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc)); + ASSERT_FALSE(alter_resp.has_error()); + // Wait until we have the new table listed in the existing universe config. + ASSERT_OK(LoggedWaitFor( + [&]() -> Result { + master::GetUniverseReplicationResponsePB tmp_resp; + RETURN_NOT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), + kUniverseId, &tmp_resp)); + return tmp_resp.entry().tables_size() == static_cast(producer_tables.size() + 1); + }, + MonoDelta::FromSeconds(kRpcTimeout), "Verify table created with alter.")); + + ASSERT_OK(CorrectlyPollingAllTablets( + consumer_cluster(), narrow_cast((producer_tables.size() + 1) * kNT))); + } + producer_tables.push_back(producer_alter_table); + consumer_tables.push_back(consumer_alter_table); + + auto data_replicated_correctly = [&](size_t num_results) -> Result { + for (const auto& consumer_table : consumer_tables) { + LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); + auto consumer_results = ScanToStrings(consumer_table->name(), consumer_client()); + + if (num_results != consumer_results.size()) { + return false; + } + } + return true; + }; + ASSERT_OK(WaitFor([&]() -> Result { return data_replicated_correctly(100); }, + MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); + + // Attempt to destroy the producer and consumer tables. + for (size_t i = 0; i < producer_tables.size(); ++i) { + string producer_table_id = producer_tables[i]->id(); + string consumer_table_id = consumer_tables[i]->id(); + ASSERT_NOK(producer_client()->DeleteTable(producer_table_id)); + ASSERT_NOK(consumer_client()->DeleteTable(consumer_table_id)); + } + + // Delete replication, afterwards table deletions are no longer blocked. + ASSERT_OK(DeleteUniverseReplication(kUniverseId)); + + for (size_t i = 0; i < producer_tables.size(); ++i) { + string producer_table_id = producer_tables[i]->id(); + string consumer_table_id = consumer_tables[i]->id(); + ASSERT_OK(producer_client()->DeleteTable(producer_table_id)); + ASSERT_OK(consumer_client()->DeleteTable(consumer_table_id)); + } +} + } // namespace enterprise } // namespace yb diff --git a/ent/src/yb/integration-tests/twodc_test_base.cc b/ent/src/yb/integration-tests/twodc_test_base.cc index 1494e8fefa48..070b568b231b 100644 --- a/ent/src/yb/integration-tests/twodc_test_base.cc +++ b/ent/src/yb/integration-tests/twodc_test_base.cc @@ -246,20 +246,20 @@ Status TwoDCTestBase::CorrectlyPollingAllTablets( } Status TwoDCTestBase::WaitForSetupUniverseReplicationCleanUp(string producer_uuid) { - auto proxy = std::make_shared( - &consumer_client()->proxy_cache(), - VERIFY_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); - - master::GetUniverseReplicationRequestPB req; - master::GetUniverseReplicationResponsePB resp; - return WaitFor([proxy, &req, &resp, producer_uuid]() -> Result { - req.set_producer_id(producer_uuid); - rpc::RpcController rpc; - Status s = proxy->GetUniverseReplication(req, &resp, &rpc); + auto proxy = std::make_shared( + &consumer_client()->proxy_cache(), + VERIFY_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); + + master::GetUniverseReplicationRequestPB req; + master::GetUniverseReplicationResponsePB resp; + return WaitFor([proxy, &req, &resp, producer_uuid]() -> Result { + req.set_producer_id(producer_uuid); + rpc::RpcController rpc; + Status s = proxy->GetUniverseReplication(req, &resp, &rpc); - return resp.has_error() && resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND; - }, MonoDelta::FromSeconds(kRpcTimeout), "Waiting for universe to delete"); - } + return resp.has_error() && resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND; + }, MonoDelta::FromSeconds(kRpcTimeout), "Waiting for universe to delete"); +} } // namespace enterprise } // namespace yb diff --git a/ent/src/yb/integration-tests/twodc_ysql-test.cc b/ent/src/yb/integration-tests/twodc_ysql-test.cc index 1ebf30252bad..0df23e5acfd9 100644 --- a/ent/src/yb/integration-tests/twodc_ysql-test.cc +++ b/ent/src/yb/integration-tests/twodc_ysql-test.cc @@ -498,13 +498,6 @@ class TwoDCYsqlTest : public TwoDCTestBase, public testing::WithParamInterfaceclient_->DeleteTable(*table_id)); - - return Status::OK(); - } - Status TruncateTable(Cluster* cluster, std::vector table_ids) { RETURN_NOT_OK(cluster->client_->TruncateTables(table_ids)); @@ -1480,18 +1473,22 @@ TEST_P(TwoDCYsqlTest, DeleteTableChecks) { for (size_t i = 0; i < producer_tables.size(); ++i) { string producer_table_id = producer_tables[i]->id(); string consumer_table_id = consumer_tables[i]->id(); - ASSERT_NOK(DeleteTable(&producer_cluster_, &producer_table_id)); - ASSERT_NOK(DeleteTable(&consumer_cluster_, &consumer_table_id)); + // GH issue #12003, allow deletion of YSQL tables under replication for now. + ASSERT_OK(producer_client()->DeleteTable(producer_table_id)); + ASSERT_OK(consumer_client()->DeleteTable(consumer_table_id)); } ASSERT_OK(DeleteUniverseReplication(kUniverseId)); - for (size_t i = 0; i < producer_tables.size(); ++i) { - string producer_table_id = producer_tables[i]->id(); - string consumer_table_id = consumer_tables[i]->id(); - ASSERT_OK(DeleteTable(&producer_cluster_, &producer_table_id)); - ASSERT_OK(DeleteTable(&consumer_cluster_, &consumer_table_id)); - } + // TODO(jhe) re-enable these checks after we disallow deletion of YSQL xCluster tables, part + // of gh issue #753. + + // for (size_t i = 0; i < producer_tables.size(); ++i) { + // string producer_table_id = producer_tables[i]->id(); + // string consumer_table_id = consumer_tables[i]->id(); + // ASSERT_OK(producer_client()->DeleteTable(producer_table_id)); + // ASSERT_OK(consumer_client()->DeleteTable(consumer_table_id)); + // } } TEST_P(TwoDCYsqlTest, TruncateTableChecks) { diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index b27d138dbba9..5c36c23c19f3 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -5267,7 +5267,8 @@ Status CatalogManager::DeleteTable( return Status::OK(); } - bool result = IsCdcEnabled(*table); + // For now, only disable dropping YCQL tables under xCluster replication. + bool result = table->GetTableType() == YQL_TABLE_TYPE && IsCdcEnabled(*table); if (!FLAGS_enable_delete_truncate_xcluster_replicated_table && result) { return STATUS(NotSupported, "Cannot delete a table in replication.",