diff --git a/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc b/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc index 9f81eea46b1d..acc5f3ad74a0 100644 --- a/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -1494,7 +1494,9 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestDropTableBeforeXClusterStream delete_req.add_stream_id(create_resp.stream_id()); // The following line assumes that cdc_proxy_ has been initialized in the test already ASSERT_OK(cdc_proxy_->DeleteCDCStream(delete_req, &delete_resp, &delete_rpc)); - ASSERT_EQ(!delete_resp.has_error(), true); + // Currently drop table on YSQL tables doesn't delete xCluster streams, so this will fail. + ASSERT_EQ(delete_resp.has_error(), true); + ASSERT_TRUE(delete_resp.error().status().message().find(create_resp.stream_id()) != string::npos); } TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCheckPointPersistencyNodeRestart)) { diff --git a/ent/src/yb/integration-tests/twodc-test.cc b/ent/src/yb/integration-tests/twodc-test.cc index 5a17a7c582d2..346f636e7bff 100644 --- a/ent/src/yb/integration-tests/twodc-test.cc +++ b/ent/src/yb/integration-tests/twodc-test.cc @@ -2127,5 +2127,116 @@ TEST_P(TwoDCTest, TestAlterUniverseRemoveTableAndDrop) { ASSERT_OK(DeleteUniverseReplication(kUniverseId)); } +TEST_P(TwoDCTest, DeleteTableChecksCQL) { + YB_SKIP_TEST_IN_TSAN(); + // Create 3 tables with 1 tablet each. + constexpr int kNT = 1; + std::vector tables_vector = {kNT, kNT, kNT}; + auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 1)); + std::vector> producer_tables; + std::vector> consumer_tables; + producer_tables.reserve(tables.size() / 2); + consumer_tables.reserve(tables.size() / 2); + for (size_t i = 0; i < tables.size(); ++i) { + if (i % 2 == 0) { + producer_tables.push_back(tables[i]); + } else { + consumer_tables.push_back(tables[i]); + } + } + + // 1. Write some data. + for (const auto& producer_table : producer_tables) { + LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); + WriteWorkload(0, 100, producer_client(), producer_table->name()); + } + + // Verify data is written on the producer. + for (const auto& producer_table : producer_tables) { + auto producer_results = ScanToStrings(producer_table->name(), producer_client()); + ASSERT_EQ(100, producer_results.size()); + } + + // Set aside one table for AlterUniverseReplication. + std::shared_ptr producer_alter_table, consumer_alter_table; + producer_alter_table = producer_tables.back(); + producer_tables.pop_back(); + consumer_alter_table = consumer_tables.back(); + consumer_tables.pop_back(); + + // 2a. Setup replication. + ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), + kUniverseId, producer_tables)); + + // Verify everything is setup correctly. + master::GetUniverseReplicationResponsePB get_universe_replication_resp; + ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, + &get_universe_replication_resp)); + ASSERT_OK(CorrectlyPollingAllTablets( + consumer_cluster(), narrow_cast(producer_tables.size() * kNT))); + + // 2b. Alter Replication + { + auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster()); + auto master_proxy = std::make_shared( + &consumer_client()->proxy_cache(), + consumer_leader_mini_master->bound_rpc_addr()); + master::AlterUniverseReplicationRequestPB alter_req; + master::AlterUniverseReplicationResponsePB alter_resp; + alter_req.set_producer_id(kUniverseId); + alter_req.add_producer_table_ids_to_add(producer_alter_table->id()); + rpc::RpcController rpc; + rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); + ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc)); + ASSERT_FALSE(alter_resp.has_error()); + // Wait until we have the new table listed in the existing universe config. + ASSERT_OK(LoggedWaitFor( + [&]() -> Result { + master::GetUniverseReplicationResponsePB tmp_resp; + RETURN_NOT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), + kUniverseId, &tmp_resp)); + return tmp_resp.entry().tables_size() == static_cast(producer_tables.size() + 1); + }, + MonoDelta::FromSeconds(kRpcTimeout), "Verify table created with alter.")); + + ASSERT_OK(CorrectlyPollingAllTablets( + consumer_cluster(), narrow_cast((producer_tables.size() + 1) * kNT))); + } + producer_tables.push_back(producer_alter_table); + consumer_tables.push_back(consumer_alter_table); + + auto data_replicated_correctly = [&](size_t num_results) -> Result { + for (const auto& consumer_table : consumer_tables) { + LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); + auto consumer_results = ScanToStrings(consumer_table->name(), consumer_client()); + + if (num_results != consumer_results.size()) { + return false; + } + } + return true; + }; + ASSERT_OK(WaitFor([&]() -> Result { return data_replicated_correctly(100); }, + MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); + + // Attempt to destroy the producer and consumer tables. + for (size_t i = 0; i < producer_tables.size(); ++i) { + string producer_table_id = producer_tables[i]->id(); + string consumer_table_id = consumer_tables[i]->id(); + ASSERT_NOK(producer_client()->DeleteTable(producer_table_id)); + ASSERT_NOK(consumer_client()->DeleteTable(consumer_table_id)); + } + + // Delete replication, afterwards table deletions are no longer blocked. + ASSERT_OK(DeleteUniverseReplication(kUniverseId)); + + for (size_t i = 0; i < producer_tables.size(); ++i) { + string producer_table_id = producer_tables[i]->id(); + string consumer_table_id = consumer_tables[i]->id(); + ASSERT_OK(producer_client()->DeleteTable(producer_table_id)); + ASSERT_OK(consumer_client()->DeleteTable(consumer_table_id)); + } +} + } // namespace enterprise } // namespace yb diff --git a/ent/src/yb/integration-tests/twodc_test_base.cc b/ent/src/yb/integration-tests/twodc_test_base.cc index 1494e8fefa48..070b568b231b 100644 --- a/ent/src/yb/integration-tests/twodc_test_base.cc +++ b/ent/src/yb/integration-tests/twodc_test_base.cc @@ -246,20 +246,20 @@ Status TwoDCTestBase::CorrectlyPollingAllTablets( } Status TwoDCTestBase::WaitForSetupUniverseReplicationCleanUp(string producer_uuid) { - auto proxy = std::make_shared( - &consumer_client()->proxy_cache(), - VERIFY_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); - - master::GetUniverseReplicationRequestPB req; - master::GetUniverseReplicationResponsePB resp; - return WaitFor([proxy, &req, &resp, producer_uuid]() -> Result { - req.set_producer_id(producer_uuid); - rpc::RpcController rpc; - Status s = proxy->GetUniverseReplication(req, &resp, &rpc); + auto proxy = std::make_shared( + &consumer_client()->proxy_cache(), + VERIFY_RESULT(consumer_cluster()->GetLeaderMiniMaster())->bound_rpc_addr()); + + master::GetUniverseReplicationRequestPB req; + master::GetUniverseReplicationResponsePB resp; + return WaitFor([proxy, &req, &resp, producer_uuid]() -> Result { + req.set_producer_id(producer_uuid); + rpc::RpcController rpc; + Status s = proxy->GetUniverseReplication(req, &resp, &rpc); - return resp.has_error() && resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND; - }, MonoDelta::FromSeconds(kRpcTimeout), "Waiting for universe to delete"); - } + return resp.has_error() && resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND; + }, MonoDelta::FromSeconds(kRpcTimeout), "Waiting for universe to delete"); +} } // namespace enterprise } // namespace yb diff --git a/ent/src/yb/integration-tests/twodc_ysql-test.cc b/ent/src/yb/integration-tests/twodc_ysql-test.cc index 1ebf30252bad..0df23e5acfd9 100644 --- a/ent/src/yb/integration-tests/twodc_ysql-test.cc +++ b/ent/src/yb/integration-tests/twodc_ysql-test.cc @@ -498,13 +498,6 @@ class TwoDCYsqlTest : public TwoDCTestBase, public testing::WithParamInterfaceclient_->DeleteTable(*table_id)); - - return Status::OK(); - } - Status TruncateTable(Cluster* cluster, std::vector table_ids) { RETURN_NOT_OK(cluster->client_->TruncateTables(table_ids)); @@ -1480,18 +1473,22 @@ TEST_P(TwoDCYsqlTest, DeleteTableChecks) { for (size_t i = 0; i < producer_tables.size(); ++i) { string producer_table_id = producer_tables[i]->id(); string consumer_table_id = consumer_tables[i]->id(); - ASSERT_NOK(DeleteTable(&producer_cluster_, &producer_table_id)); - ASSERT_NOK(DeleteTable(&consumer_cluster_, &consumer_table_id)); + // GH issue #12003, allow deletion of YSQL tables under replication for now. + ASSERT_OK(producer_client()->DeleteTable(producer_table_id)); + ASSERT_OK(consumer_client()->DeleteTable(consumer_table_id)); } ASSERT_OK(DeleteUniverseReplication(kUniverseId)); - for (size_t i = 0; i < producer_tables.size(); ++i) { - string producer_table_id = producer_tables[i]->id(); - string consumer_table_id = consumer_tables[i]->id(); - ASSERT_OK(DeleteTable(&producer_cluster_, &producer_table_id)); - ASSERT_OK(DeleteTable(&consumer_cluster_, &consumer_table_id)); - } + // TODO(jhe) re-enable these checks after we disallow deletion of YSQL xCluster tables, part + // of gh issue #753. + + // for (size_t i = 0; i < producer_tables.size(); ++i) { + // string producer_table_id = producer_tables[i]->id(); + // string consumer_table_id = consumer_tables[i]->id(); + // ASSERT_OK(producer_client()->DeleteTable(producer_table_id)); + // ASSERT_OK(consumer_client()->DeleteTable(consumer_table_id)); + // } } TEST_P(TwoDCYsqlTest, TruncateTableChecks) { diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index b27d138dbba9..5c36c23c19f3 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -5267,7 +5267,8 @@ Status CatalogManager::DeleteTable( return Status::OK(); } - bool result = IsCdcEnabled(*table); + // For now, only disable dropping YCQL tables under xCluster replication. + bool result = table->GetTableType() == YQL_TABLE_TYPE && IsCdcEnabled(*table); if (!FLAGS_enable_delete_truncate_xcluster_replicated_table && result) { return STATUS(NotSupported, "Cannot delete a table in replication.",