From d6d88be2f1ea9805f3c2368629534b81585bc8f6 Mon Sep 17 00:00:00 2001 From: Sandeep L <99765181+lingamsandeep@users.noreply.github.com> Date: Sat, 20 Jul 2024 02:08:54 +0530 Subject: [PATCH] [BACKPORT 2.20][#23188] DocDB: Persist new colocated_id mapping discovered as part of processing CHANGE_METADATA_OP in xCluster ClusterConfig. Summary: Original commit: 9f82c017b59faa674364494e8630189567e1df52 / D36552 Problem: When a new table is added to a colocated database, the table needs to be created on both xCluster source and target with the same colocation_id. As part of processing the ChangeMetadataOp for the AddTable operation received from the source, the target creates a mapping of source->target schema versions of the newly created table. However, this is not getting persisted in ClusterConfig if the source/target schema versions are default values. As a result while replication may work immediately after setup, upon a restart of the T-server, the mapping of source-target schema versions may be lost and replication may stall until another schema change happens on the source. Fix: Fix is to detect the non-existent colocation_id correctly and persisting the ClusterConfig. Test Plan: ybt xcluster_ysql_colocated-test XClusterYsqlColocatedTest.DatabaseReplication The test was failing without the fix and is passing with the fix. Reviewers: hsunder, xCluster Reviewed By: hsunder Subscribers: ybase Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D36700 --- .../xcluster/xcluster_ysq_colocated-test.cc | 44 ++++++++++++++++--- src/yb/master/xrepl_catalog_manager.cc | 10 ++++- 2 files changed, 46 insertions(+), 8 deletions(-) diff --git a/src/yb/integration-tests/xcluster/xcluster_ysq_colocated-test.cc b/src/yb/integration-tests/xcluster/xcluster_ysq_colocated-test.cc index 37fa87033231..38fbb6e4b8cc 100644 --- a/src/yb/integration-tests/xcluster/xcluster_ysq_colocated-test.cc +++ b/src/yb/integration-tests/xcluster/xcluster_ysq_colocated-test.cc @@ -1,4 +1,4 @@ -// Copyright (c) YugabyteDB, Inc. +// Copyright (c) YugaByte, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except // in compliance with the License. You may obtain a copy of the License at @@ -23,6 +23,7 @@ #include "yb/master/master_ddl.proxy.h" #include "yb/master/master_replication.proxy.h" #include "yb/master/mini_master.h" +#include "yb/tserver/mini_tablet_server.h" #include "yb/util/backoff_waiter.h" DECLARE_bool(xcluster_wait_on_ddl_alter); @@ -118,6 +119,7 @@ class XClusterYsqlColocatedTest : public XClusterYsqlTestBase { auto& tables = onlyColocated ? colocated_consumer_tables : consumer_tables_; for (const auto& consumer_table : tables) { LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); + RETURN_NOT_OK(WaitForRowCount(consumer_table->name(), num_results, &consumer_cluster_)); RETURN_NOT_OK(ValidateRows(consumer_table->name(), num_results, &consumer_cluster_)); } return true; @@ -196,20 +198,50 @@ class XClusterYsqlColocatedTest : public XClusterYsqlTestBase { [&]() -> Result { LOG(INFO) << "Checking records for table " << new_colocated_consumer_table->name().ToString(); - RETURN_NOT_OK( - ValidateRows(new_colocated_consumer_table->name(), kRecordBatch, &consumer_cluster_)); + RETURN_NOT_OK(WaitForRowCount( + new_colocated_consumer_table->name(), kRecordBatch, &consumer_cluster_)); + RETURN_NOT_OK(ValidateRows( + new_colocated_consumer_table->name(), kRecordBatch, &consumer_cluster_)); return true; }, MonoDelta::FromSeconds(20 * kTimeMultiplier), "IsDataReplicatedCorrectly new colocated table")); - // 6. Drop the new table and ensure that data is getting replicated correctly for + // 6. Shutdown the colocated tablet leader and verify that replication is still happening. + { + auto tablet_ids = ListTabletIdsForTable(consumer_cluster(), colocated_parent_table_id); + auto old_ts = FindTabletLeader(consumer_cluster(), *tablet_ids.begin()); + old_ts->Shutdown(); + const MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(10 * kTimeMultiplier); + RETURN_NOT_OK(WaitUntilTabletHasLeader(consumer_cluster(), *tablet_ids.begin(), deadline)); + RETURN_NOT_OK(old_ts->RestartStoppedServer()); + RETURN_NOT_OK(old_ts->WaitStarted()); + + RETURN_NOT_OK(InsertRowsInProducer( + kRecordBatch, 2 * kRecordBatch, new_colocated_producer_table, + use_transaction)); + + RETURN_NOT_OK(WaitFor( + [&]() -> Result { + LOG(INFO) << "Checking records for table " + << new_colocated_consumer_table->name().ToString(); + RETURN_NOT_OK(WaitForRowCount( + new_colocated_consumer_table->name(), 2 * kRecordBatch, &consumer_cluster_)); + RETURN_NOT_OK(ValidateRows( + new_colocated_consumer_table->name(), 2 * kRecordBatch, &consumer_cluster_)); + return true; + }, + MonoDelta::FromSeconds(20 * kTimeMultiplier), + "IsDataReplicatedCorrectly new colocated table")); + } + + // 7. Drop the new table and ensure that data is getting replicated correctly for // the other tables RETURN_NOT_OK( DropYsqlTable(&producer_cluster_, namespace_name, "", Format("test_table_$0", idx))); LOG(INFO) << Format("Dropped test_table_$0 on Producer side", idx); - // 7. Add additional data to the original tables. + // 8. Add additional data to the original tables. for (const auto& producer_table : producer_tables_) { LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); RETURN_NOT_OK( @@ -217,7 +249,7 @@ class XClusterYsqlColocatedTest : public XClusterYsqlTestBase { } count += kRecordBatch; - // 8. Verify all tables are properly replicated. + // 9. Verify all tables are properly replicated. RETURN_NOT_OK(WaitFor( [&]() -> Result { return data_replicated_correctly(count, false); }, MonoDelta::FromSeconds(20 * kTimeMultiplier), diff --git a/src/yb/master/xrepl_catalog_manager.cc b/src/yb/master/xrepl_catalog_manager.cc index 94b9361b8f42..a360ac26e349 100644 --- a/src/yb/master/xrepl_catalog_manager.cc +++ b/src/yb/master/xrepl_catalog_manager.cc @@ -5667,16 +5667,22 @@ Status CatalogManager::UpdateConsumerOnProducerMetadata( schema_cached->Clear(); cdc::SchemaVersionsPB* schema_versions_pb = nullptr; + bool schema_versions_updated = false; // TODO (#16557): Support remove_table_id() for colocated tables / tablegroups. if (IsColocationParentTableId(consumer_table_id) && req->colocation_id() != kColocationIdNotSet) { auto map = stream_entry->mutable_colocated_schema_versions(); - schema_versions_pb = &((*map)[req->colocation_id()]); + schema_versions_pb = FindOrNull(*map, req->colocation_id()); + if (nullptr == schema_versions_pb) { + // If the colocation_id itself does not exist, it needs to be recorded in clusterconfig. + // This is to handle the case where source-target schema version mapping is 0:0. + schema_versions_updated = true; + schema_versions_pb = &((*map)[req->colocation_id()]); + } } else { schema_versions_pb = stream_entry->mutable_schema_versions(); } - bool schema_versions_updated = false; SchemaVersion current_producer_schema_version = schema_versions_pb->current_producer_schema_version(); SchemaVersion current_consumer_schema_version =