From 69d40529b778b2b0e8fed9eee5ac05d7ec3fe763 Mon Sep 17 00:00:00 2001 From: yusong-yan Date: Mon, 1 Jul 2024 21:32:47 +0000 Subject: [PATCH] [#22862] XCluster: Improving XCluster Index Base WAL Retention Policy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: **Background:** XCluster and CDCSDK rely on the tablet's `cdc_min_replicated_index` to retain the WAL segments that have not been replicated to the target. This value is updated by the background task in `CDCServiceImpl::UpdatePeersAndMetrics`, which is scheduled to run every 60 seconds(code link). For each update: * It first collects `cdc_min_replicated_index` and some CDCSDK checkpoints for each tablet from the CDC_State table * Then it subsequently updates the value within each tablet's WAL and then flush the values to the tablet metadata file. **Issue:** Potential WAL Over-GC Risk: * Currently, each tablet monitors its own `cdc_min_replicated_index`. If the tablet doesn't receive an update within 15 minutes, it then drops its OpId index-based WAL retention for XCluster(introduced in D7873). This implies that if there is a delay longer than 15 minutes before updating a XCluster tablet, WAL over gc could happen. For example, in `CDCServiceImpl::UpdatePeersAndMetrics`, updating all tablets cdc WAL retention barrier might take more than 15 mins. Unnecessary Flush: * Since `cdc_min_replicated_index` is already stored in the CDC_State table, the additional flush to the tablet metadata file is unnecessary. (We’ve observed this caused increased disk IO in CE-509) **Alternative for Dropping XCluster WAL Retention Policy:** In `CDCServiceImpl::UpdatePeersAndMetrics`, after collecting tablet checkpoint info, instead of individual updates, CDCService will store the checkpoint info stored in an in-memory map. And all tablets' WAL will pull their `cdc_min_replicated_index` from this map during WAL GC. If not found, CDCService will return maximum OpId, indicating removal from XCluster replication. **New Gflag: ** `xcluster_checkpoint_max_staleness_secs`(300 by default): The maximum interval in seconds that the xcluster checkpoint map can go without being refreshed. If the map is not refreshed within this interval, it is considered stale, and all WAL segments will be retained until the next refresh. Setting to 0 will disable Opid based WAL segment retention for XCluster. **If we want to disable it, It's recommended to also set Gflag`log_min_seconds_to_retain` to a large value so the WAL segments still can be retained by the time based WAL segment retention** **TServer Restart Safety Guarantee:** This change removed the periodic flushing of tablet metadata for persisting `cdc_min_replicated_index` by XCluster. Consequently, tablets no longer rely on persisted `cdc_min_replicated_index` to retain WAL segments during TServer restarts. This design ensures safety by leveraging the `xcluster_checkpoint_max_staleness_secs` mechanism. Before CDCService refresh the xcluster checkpoint map, the map is considered stale, preventing premature WAL GC. **Other Changes:** Modified CDCServiceTest unit tests to verify tablet xcluster required minimum index by checking the CDCService cached xcluster checkpoint map instead of relying on the tablet's log and metadata. Jira: DB-11766 Test Plan: CDCServiceTestFourServers.TestMinReplicatedIndexAfterTabletMove CDCServiceTestDurableMinReplicatedIndex.TestLogCDCMinReplicatedIndexIsDurable CdcTabletSplitITest.GetChangesOnSplitParentTablet Reviewers: hsunder, mlillibridge, jhe, xCluster Reviewed By: hsunder Subscribers: ycdcxcluster, slingam, rthallam, ybase Differential Revision: https://phorge.dev.yugabyte.com/D36298 --- src/yb/cdc/cdc_service.cc | 170 +++++-- src/yb/cdc/cdc_service.h | 40 +- src/yb/consensus/consensus_peers-test.cc | 1 - src/yb/consensus/consensus_queue-test.cc | 1 - src/yb/consensus/log-dump.cc | 1 - src/yb/consensus/log-test-base.h | 1 - src/yb/consensus/log.cc | 11 +- src/yb/consensus/log.h | 14 +- src/yb/consensus/log_cache-test.cc | 1 - src/yb/consensus/raft_consensus-test.cc | 1 - .../consensus/raft_consensus_quorum-test.cc | 1 - .../integration-tests/cdc_service-int-test.cc | 452 ++++++++++-------- .../integration-tests/cdc_service-txn-test.cc | 3 +- src/yb/integration-tests/cdcsdk_ysql-test.cc | 46 +- .../cdcsdk_ysql_test_base.cc | 6 +- .../integration-tests/cdcsdk_ysql_test_base.h | 5 + .../integration-tests/create-table-itest.cc | 3 + .../integration-tests/raft_consensus-itest.cc | 2 + .../remote_bootstrap-itest.cc | 3 + src/yb/integration-tests/ts_recovery-itest.cc | 3 + .../xcluster/xcluster-tablet-split-itest.cc | 38 ++ .../xcluster/xcluster-test.cc | 17 +- .../xcluster/xcluster_consistency-test.cc | 6 +- src/yb/tablet/tablet_bootstrap.cc | 1 - src/yb/tablet/tablet_peer-test.cc | 2 +- .../tserver/remote_bootstrap_session-test.cc | 1 - src/yb/tserver/ts_tablet_manager.cc | 7 + 27 files changed, 539 insertions(+), 298 deletions(-) diff --git a/src/yb/cdc/cdc_service.cc b/src/yb/cdc/cdc_service.cc index 443eaa171e41..fbacef2b58da 100644 --- a/src/yb/cdc/cdc_service.cc +++ b/src/yb/cdc/cdc_service.cc @@ -53,6 +53,7 @@ #include "yb/consensus/raft_consensus.h" #include "yb/consensus/replicate_msgs_holder.h" +#include "yb/gutil/map-util.h" #include "yb/gutil/strings/join.h" #include "yb/master/master_client.pb.h" @@ -180,6 +181,31 @@ DEFINE_test_flag(bool, cdcsdk_skip_stream_active_check, false, "When enabled, GetChanges will skip checking if stream is active as well as skip " "updating the active time."); +DEFINE_RUNTIME_uint32(xcluster_checkpoint_max_staleness_secs, 300, + "The maximum interval in seconds that the xcluster checkpoint map can go without being " + "refreshed. If the map is not refreshed within this interval, it is considered stale, " + "and all WAL segments will be retained until the next refresh. " + "Setting to 0 will disable Opid-based WAL segment retention for XCluster."); + +DECLARE_int32(log_min_seconds_to_retain); + +static bool ValidateMaxRefreshInterval(const char* flag_name, uint32 value) { + // This validation depends on the value of other flag(s): + // log_min_seconds_to_retain, update_min_cdc_indices_interval_secs. + DELAY_FLAG_VALIDATION_ON_STARTUP(flag_name); + + if (value == 0 || value < static_cast( + FLAGS_log_min_seconds_to_retain + FLAGS_update_min_cdc_indices_interval_secs)) { + return true; + } + LOG_FLAG_VALIDATION_ERROR(flag_name, value) + << "Must be less than the sume of log_min_seconds_to_retain and " + << "update_min_cdc_indices_interval_secs"; + return false; +} + +DEFINE_validator(xcluster_checkpoint_max_staleness_secs, &ValidateMaxRefreshInterval); + DECLARE_bool(enable_log_retention_by_op_idx); DECLARE_int32(cdc_checkpoint_opid_interval_ms); @@ -1096,6 +1122,13 @@ Result CDCServiceImpl::SetCDCCheckpoint( auto stream_id = VERIFY_STRING_TO_STREAM_ID(req.stream_id()); auto record = VERIFY_RESULT(GetStream(stream_id)); + + if (record->GetSourceType() != XCLUSTER) { + LOG(INFO) << "SetCDCCheckpoint is not supported for xCluster stream " << req.tablet_id(); + } + RSTATUS_DCHECK(record->GetSourceType() != XCLUSTER, + InvalidArgument, "SetCDCCheckpoint is not supported for xCluster stream $0", stream_id); + if (record->GetCheckpointType() != EXPLICIT) { LOG(WARNING) << "Setting the checkpoint explicitly even though the checkpoint type is implicit"; } @@ -2332,6 +2365,10 @@ void PopulateTabletMinCheckpointAndLatestActiveTime( const string& tablet_id, const OpId& checkpoint, CDCRequestSource cdc_source_type, const int64_t& last_active_time, TabletIdCDCCheckpointMap& tablet_min_checkpoint_index, const HybridTime cdc_sdk_safe_time = HybridTime::kInvalid) { + if (cdc_source_type == XCLUSTER) { + return; + } + auto& tablet_info = tablet_min_checkpoint_index[tablet_id]; tablet_info.cdc_op_id = min(tablet_info.cdc_op_id, checkpoint); @@ -2351,7 +2388,23 @@ void PopulateTabletMinCheckpointAndLatestActiveTime( } } -void CDCServiceImpl::ProcessEntry( +void CDCServiceImpl::ProcessEntryForXCluster( + const CDCStateTableEntry& entry, + std::unordered_map& xcluster_tablet_min_opid_map) { + const auto& checkpoint = *entry.checkpoint; + + if (checkpoint == OpId::Invalid()) { + return; + } + + auto [it, inserted] = + xcluster_tablet_min_opid_map.insert({entry.key.tablet_id, checkpoint}); + if (!inserted) { + it->second = std::min(it->second, checkpoint); + } +} + +void CDCServiceImpl::ProcessEntryForCdcsdk( const CDCStateTableEntry& entry, const StreamMetadata& stream_metadata, const std::shared_ptr& tablet_peer, @@ -2667,7 +2720,7 @@ Result CDCServiceImpl::PopulateCDCSDKTabletCheckPointIn break; } - ProcessEntry(entry, stream_metadata, tablet_peer, tablet_min_checkpoint_map); + ProcessEntryForCdcsdk(entry, stream_metadata, tablet_peer, tablet_min_checkpoint_map); } RETURN_NOT_OK(iteration_status); @@ -2679,9 +2732,47 @@ Result CDCServiceImpl::PopulateCDCSDKTabletCheckPointIn return *it; } -Result CDCServiceImpl::PopulateTabletCheckPointInfo( +int64_t CDCServiceImpl::GetXClusterMinRequiredIndex(const TabletId& tablet_id) { + if (!CDCEnabled()) { + return std::numeric_limits::max(); + } + + auto max_staleness_secs = FLAGS_xcluster_checkpoint_max_staleness_secs; + + if (max_staleness_secs == 0) { + // Feature is disabled. + return std::numeric_limits::max(); + } + + SharedLock l(xcluster_replication_maps_mutex_); + + auto seconds_since_last_refresh = + MonoTime::Now().GetDeltaSince(xcluster_map_last_refresh_time_).ToSeconds(); + + if (seconds_since_last_refresh > max_staleness_secs) { + YB_LOG_EVERY_N_SECS(WARNING, 60) + << "XCluster min opid map hasn't been refresh for a while, " + << "retain all WAL segments until the map is refreshed"; + return 0; + } + + auto* min_replicated_opid = FindOrNull(xcluster_tablet_min_opid_map_, tablet_id); + + if (min_replicated_opid == nullptr) { + // Tablet is not under XCluster replication. + return std::numeric_limits::max(); + } + + VLOG(2) << min_replicated_opid->index + << " is the xcluster min required index for tablet " << tablet_id; + + return min_replicated_opid->index; +} + +Status CDCServiceImpl::PopulateTabletCheckPointInfo( + TabletIdCDCCheckpointMap& cdcsdk_min_checkpoint_map, + std::unordered_map& xcluster_tablet_min_opid_map, TabletIdStreamIdSet& tablet_stream_to_be_deleted, StreamIdSet& slot_entries_to_be_deleted) { - TabletIdCDCCheckpointMap tablet_min_checkpoint_map; std::unordered_set refreshed_metadata_set; int count = 0; @@ -2722,13 +2813,6 @@ Result CDCServiceImpl::PopulateTabletCheckPointInfo( continue; } - auto tablet_peer = context_->LookupTablet(tablet_id); - if (!tablet_peer) { - LOG(WARNING) << "Could not find tablet peer for tablet_id: " << tablet_id - << ". Will not update its peers in this round"; - continue; - } - RefreshStreamMapOption refresh_option = RefreshStreamMapOption::kNone; if (refreshed_metadata_set.find(stream_id) == refreshed_metadata_set.end()) { refresh_option = RefreshStreamMapOption::kIfInitiatedState; @@ -2739,11 +2823,11 @@ Result CDCServiceImpl::PopulateTabletCheckPointInfo( << get_stream_metadata.status(); // The stream_id present in the cdc_state table was not found in the master cache, it means // that the stream is deleted. To update the corresponding tablet PEERs, give an entry in - // tablet_min_checkpoint_map which will update cdc_sdk_min_checkpoint_op_id to + // cdcsdk_min_checkpoint_map which will update cdc_sdk_min_checkpoint_op_id to // OpId::Max()(i.e no need to retain the intents.). And also mark the row to be deleted. - if (!tablet_min_checkpoint_map.contains(tablet_id)) { + if (!cdcsdk_min_checkpoint_map.contains(tablet_id)) { VLOG(2) << "We could not get the metadata for the stream: " << stream_id; - auto& tablet_info = tablet_min_checkpoint_map[tablet_id]; + auto& tablet_info = cdcsdk_min_checkpoint_map[tablet_id]; tablet_info.cdc_op_id = OpId::Max(); tablet_info.cdc_sdk_op_id = OpId::Max(); tablet_info.cdc_sdk_safe_time = HybridTime::kInvalid; @@ -2772,9 +2856,20 @@ Result CDCServiceImpl::PopulateTabletCheckPointInfo( streams_with_tablet_entries_to_be_deleted.insert(stream_id); } - ProcessEntry( - entry, stream_metadata, tablet_peer, tablet_min_checkpoint_map, - &slot_entries_to_be_deleted, namespace_to_min_record_id_commit_time); + if (stream_metadata.GetSourceType() == CDCSDK) { + auto tablet_peer = context_->LookupTablet(tablet_id); + if (!tablet_peer) { + LOG(WARNING) << "Could not find tablet peer for tablet_id: " << tablet_id + << ". Will not update its peers in this round"; + continue; + } + + ProcessEntryForCdcsdk( + entry, stream_metadata, tablet_peer, cdcsdk_min_checkpoint_map, + &slot_entries_to_be_deleted, namespace_to_min_record_id_commit_time); + } else if (stream_metadata.GetSourceType() == XCLUSTER) { + ProcessEntryForXCluster(entry, xcluster_tablet_min_opid_map); + } } RETURN_NOT_OK(iteration_status); @@ -2790,7 +2885,7 @@ Result CDCServiceImpl::PopulateTabletCheckPointInfo( YB_LOG_EVERY_N_SECS(INFO, 300) << "Read " << count << " records from " << kCdcStateTableName; - return tablet_min_checkpoint_map; + return Status::OK(); } void CDCServiceImpl::UpdateTabletPeersWithMaxCheckpoint( @@ -3084,23 +3179,32 @@ void CDCServiceImpl::UpdatePeersAndMetrics() { // if we fail to read cdc_state table, lets wait for the next retry after 60 secs. TabletIdStreamIdSet cdc_state_entries_to_delete; StreamIdSet slot_entries_to_be_deleted; - auto result = - PopulateTabletCheckPointInfo(cdc_state_entries_to_delete, slot_entries_to_be_deleted); - if (!result.ok()) { - LOG(WARNING) << "Failed to populate tablets checkpoint info: " << result.status(); + TabletIdCDCCheckpointMap cdcsdk_min_checkpoint_map; + std::unordered_map xcluster_tablet_min_opid_map; + const auto now = MonoTime::Now(); + + auto status = PopulateTabletCheckPointInfo( + cdcsdk_min_checkpoint_map, xcluster_tablet_min_opid_map, + cdc_state_entries_to_delete, slot_entries_to_be_deleted); + if (!status.ok()) { + LOG(WARNING) << "Failed to populate tablets checkpoint info: " << status; continue; } - TabletIdCDCCheckpointMap& tablet_checkpoint_map = *result; - VLOG(3) << "List of tablets with checkpoint info read from cdc_state table: " - << tablet_checkpoint_map.size(); + + VLOG(3) << "Number of cdcsdk tablets with checkpoint info read from cdc_state table: " + << cdcsdk_min_checkpoint_map.size() + << "\n Number of xcluster tablets with checkpoint info read from cdc_state table: " + << xcluster_tablet_min_opid_map.size(); + + UpdateXClusterReplicationMaps(std::move(xcluster_tablet_min_opid_map), now); // Collect and remove entries for the tablet_ids for which we will set the checkpoint as - // 'OpId::Max' from 'tablet_checkpoint_map', into 'tablet_ids_with_max_checkpoint'. + // 'OpId::Max' from 'cdcsdk_min_checkpoint_map', into 'tablet_ids_with_max_checkpoint'. std::unordered_set tablet_ids_with_max_checkpoint; FilterOutTabletsToBeDeletedByAllStreams( - &tablet_checkpoint_map, &tablet_ids_with_max_checkpoint); + &cdcsdk_min_checkpoint_map, &tablet_ids_with_max_checkpoint); - UpdateTabletPeersWithMinReplicatedIndex(&tablet_checkpoint_map); + UpdateTabletPeersWithMinReplicatedIndex(&cdcsdk_min_checkpoint_map); { YB_LOG_EVERY_N_SECS(INFO, 300) @@ -3122,6 +3226,16 @@ void CDCServiceImpl::UpdatePeersAndMetrics() { } while (sleep_while_not_stopped()); } +void CDCServiceImpl::UpdateXClusterReplicationMaps( + std::unordered_map new_map, + const MonoTime& last_refresh_time) { + std::lock_guard l(xcluster_replication_maps_mutex_); + + xcluster_tablet_min_opid_map_.swap(new_map); + + xcluster_map_last_refresh_time_ = last_refresh_time; +} + Status CDCServiceImpl::DeleteCDCStateTableMetadata( const TabletIdStreamIdSet& cdc_state_entries_to_delete, const std::unordered_set& failed_tablet_ids, diff --git a/src/yb/cdc/cdc_service.h b/src/yb/cdc/cdc_service.h index 1ed061fdc8fc..7ee39f51cdef 100644 --- a/src/yb/cdc/cdc_service.h +++ b/src/yb/cdc/cdc_service.h @@ -160,7 +160,7 @@ class CDCServiceImpl : public CDCServiceIf { Result TEST_GetTabletInfoFromCache(const TabletStreamInfo& producer_tablet); - void ProcessEntry( + void ProcessEntryForCdcsdk( const CDCStateTableEntry& entry, const StreamMetadata& stream_metadata, const std::shared_ptr& tablet_peer, @@ -169,6 +169,10 @@ class CDCServiceImpl : public CDCServiceIf { const std::unordered_map& namespace_to_min_record_id_commit_time = std::unordered_map{}); + void ProcessEntryForXCluster( + const CDCStateTableEntry& entry, + std::unordered_map& xcluster_tablet_min_opid_map); + // Update peers in other tablet servers about the latest minimum applied cdc index for a specific // tablet. void UpdateCdcReplicatedIndex( @@ -270,6 +274,16 @@ class CDCServiceImpl : public CDCServiceIf { const GetChangesResponsePB& resp, const TabletStreamInfo& producer_tablet, const std::shared_ptr& tablet_peer); + // Retrieves the cdc_min_replicated_index for a given tablet. + // Returns the min index that is still required for xCluster replication. + // Returns max if the tablet is not replicated by xCluster. + int64_t GetXClusterMinRequiredIndex(const TabletId& tablet_id) + EXCLUDES(xcluster_replication_maps_mutex_); + + auto GetXClusterMinRequiredIndexFunc() { + return std::bind_front(&CDCServiceImpl::GetXClusterMinRequiredIndex, this); + } + private: friend class XClusterProducerBootstrap; friend class CDCSDKVirtualWAL; @@ -403,8 +417,11 @@ class CDCServiceImpl : public CDCServiceIf { // Called periodically default 1s. void UpdateMetrics(); - // This method is used to read the cdc_state table to find the minimum replicated index for each - // tablet and then update the peers' log objects. Also used to update lag metrics. + // This method runs following tasks when xCluster or CDCSDK is enabled. + // 1.For every second, it invokes UpdateMetrics() + // 2.For every minute, it reads the cdc_state table to collect the min checkpoint info for + // each tablet. Next, it caches the min checkpoint info for tablets under XCluster replication + // then it records the checkpoint info in the tablet if it is under CDCSDK replication. void UpdatePeersAndMetrics(); Status GetTabletIdsToPoll( @@ -449,7 +466,9 @@ class CDCServiceImpl : public CDCServiceIf { const CDCStateTableRange& table_range, Status* iteration_status, StreamIdSet* slot_entries_to_be_deleted); - Result PopulateTabletCheckPointInfo( + Status PopulateTabletCheckPointInfo( + TabletIdCDCCheckpointMap& cdcsdk_min_checkpoint_map, + std::unordered_map& xcluster_tablet_min_opid_map, TabletIdStreamIdSet& tablet_stream_to_be_deleted, StreamIdSet& slot_entries_to_be_deleted); Result PopulateCDCSDKTabletCheckPointInfo( @@ -500,6 +519,10 @@ class CDCServiceImpl : public CDCServiceIf { void LogGetChangesLagForCDCSDK( const xrepl::StreamId& stream_id, const GetChangesResponsePB& resp); + void UpdateXClusterReplicationMaps( + std::unordered_map new_map, + const MonoTime& last_refresh_time) EXCLUDES(xcluster_replication_maps_mutex_); + rpc::Rpcs rpcs_; std::unique_ptr context_; @@ -561,6 +584,15 @@ class CDCServiceImpl : public CDCServiceIf { // Map of session_id (uint64) to VirtualWAL instance. std::unordered_map> session_virtual_wal_ GUARDED_BY(mutex_); + + mutable rw_spinlock xcluster_replication_maps_mutex_; + + // Cached minimum opid for each tablet under xCluster replication. + std::unordered_map xcluster_tablet_min_opid_map_ + GUARDED_BY(xcluster_replication_maps_mutex_); + + MonoTime xcluster_map_last_refresh_time_ GUARDED_BY(xcluster_replication_maps_mutex_) + = MonoTime::Min(); }; } // namespace cdc diff --git a/src/yb/consensus/consensus_peers-test.cc b/src/yb/consensus/consensus_peers-test.cc index b5bf5eb76ccd..fbe75843e1ca 100644 --- a/src/yb/consensus/consensus_peers-test.cc +++ b/src/yb/consensus/consensus_peers-test.cc @@ -108,7 +108,6 @@ class ConsensusPeersTest : public YBTest { log_thread_pool_.get(), log_thread_pool_.get(), log_thread_pool_.get(), - std::numeric_limits::max(), // cdc_min_replicated_index &log_)); clock_.reset(new server::HybridClock()); ASSERT_OK(clock_->Init()); diff --git a/src/yb/consensus/consensus_queue-test.cc b/src/yb/consensus/consensus_queue-test.cc index b41fbc18abd4..b4a010f8064f 100644 --- a/src/yb/consensus/consensus_queue-test.cc +++ b/src/yb/consensus/consensus_queue-test.cc @@ -96,7 +96,6 @@ class ConsensusQueueTest : public YBTest { log_thread_pool_.get(), log_thread_pool_.get(), log_thread_pool_.get(), - std::numeric_limits::max(), // cdc_min_replicated_index &log_)); clock_.reset(new server::HybridClock()); ASSERT_OK(clock_->Init()); diff --git a/src/yb/consensus/log-dump.cc b/src/yb/consensus/log-dump.cc index 86601f654066..abbff555b637 100644 --- a/src/yb/consensus/log-dump.cc +++ b/src/yb/consensus/log-dump.cc @@ -410,7 +410,6 @@ Status FilterLogSegment(const string& segment_path) { log_thread_pool.get(), log_thread_pool.get(), log_thread_pool.get(), - /* cdc_min_replicated_index */ 0, &log)); auto read_entries = segment->ReadEntries(); diff --git a/src/yb/consensus/log-test-base.h b/src/yb/consensus/log-test-base.h index b43b2710f334..e017600c8d07 100644 --- a/src/yb/consensus/log-test-base.h +++ b/src/yb/consensus/log-test-base.h @@ -194,7 +194,6 @@ class LogTestBase : public YBTest { log_thread_pool_.get(), log_thread_pool_.get(), log_thread_pool_.get(), - std::numeric_limits::max(), // cdc_min_replicated_index &log_)); LOG(INFO) << "Sucessfully opened the log at " << tablet_wal_path_; } diff --git a/src/yb/consensus/log.cc b/src/yb/consensus/log.cc index 44a6a94e333f..f2570d803b80 100644 --- a/src/yb/consensus/log.cc +++ b/src/yb/consensus/log.cc @@ -604,7 +604,6 @@ Status Log::Open(const LogOptions &options, ThreadPool* append_thread_pool, ThreadPool* allocation_thread_pool, ThreadPool* background_sync_threadpool, - int64_t cdc_min_replicated_index, scoped_refptr* log, const PreLogRolloverCallback& pre_log_rollover_callback, NewSegmentAllocationCallback callback, @@ -1359,10 +1358,18 @@ Status Log::GetSegmentsToGCUnlocked(int64_t min_op_idx, SegmentSequence* segment // avoid concurrently deleting those ops, we bump min_op_idx here to be at-least as // low as log_copy_min_index_. min_op_idx = std::min(log_copy_min_index_, min_op_idx); + + auto xrepl_min_replicated_index = cdc_min_replicated_index_.load(std::memory_order_acquire); + + if (get_xcluster_min_index_to_retain_) { + xrepl_min_replicated_index = + std::min(xrepl_min_replicated_index, get_xcluster_min_index_to_retain_(tablet_id_)); + } + // Find the prefix of segments in the segment sequence that is guaranteed not to include // 'min_op_idx'. RETURN_NOT_OK(reader_->GetSegmentPrefixNotIncluding( - min_op_idx, cdc_min_replicated_index_.load(std::memory_order_acquire), segments_to_gc)); + min_op_idx, xrepl_min_replicated_index, segments_to_gc)); const auto max_to_delete = std::max(reader_->num_segments() - FLAGS_log_min_segments_to_retain, 0); diff --git a/src/yb/consensus/log.h b/src/yb/consensus/log.h index db0dd4588818..9e10a2bcd6c5 100644 --- a/src/yb/consensus/log.h +++ b/src/yb/consensus/log.h @@ -153,7 +153,6 @@ class Log : public RefCountedThreadSafe { ThreadPool *append_thread_pool, ThreadPool* allocation_thread_pool, ThreadPool* background_sync_threadpool, - int64_t cdc_min_replicated_index, scoped_refptr *log, const PreLogRolloverCallback& pre_log_rollover_callback = {}, NewSegmentAllocationCallback callback = {}, @@ -339,6 +338,14 @@ class Log : public RefCountedThreadSafe { bool HasSufficientDiskSpaceForWrite(); + void SetGetXClusterMinIndexToRetainFunc( + std::function get_xcluster_required_index_func) { + std::lock_guard l(state_lock_); + if (get_xcluster_min_index_to_retain_ == nullptr) { + get_xcluster_min_index_to_retain_ = std::move(get_xcluster_required_index_func); + } + } + private: friend class LogTest; friend class LogTestBase; @@ -675,6 +682,11 @@ class Log : public RefCountedThreadSafe { std::atomic disk_space_frequent_check_interval_sec_{0}; std::shared_timed_mutex disk_space_mutex_; + // Function pointer to CDCServiceImpl::GetXClusterMinRequiredIndex. + // This function retrieves the xCluster minimum required index for a given tablet. + std::function get_xcluster_min_index_to_retain_ + GUARDED_BY(state_lock_); + DISALLOW_COPY_AND_ASSIGN(Log); }; diff --git a/src/yb/consensus/log_cache-test.cc b/src/yb/consensus/log_cache-test.cc index 777873289df4..05f7e2af6ee4 100644 --- a/src/yb/consensus/log_cache-test.cc +++ b/src/yb/consensus/log_cache-test.cc @@ -117,7 +117,6 @@ class LogCacheTest : public YBTest { log_thread_pool_.get(), log_thread_pool_.get(), log_thread_pool_.get(), - std::numeric_limits::max(), // cdc_min_replicated_index &log_)); CloseAndReopenCache(MinimumOpId()); diff --git a/src/yb/consensus/raft_consensus-test.cc b/src/yb/consensus/raft_consensus-test.cc index c31a83ceb92d..6eff5cb843fb 100644 --- a/src/yb/consensus/raft_consensus-test.cc +++ b/src/yb/consensus/raft_consensus-test.cc @@ -248,7 +248,6 @@ class RaftConsensusTest : public YBTest { log_thread_pool_.get(), log_thread_pool_.get(), log_thread_pool_.get(), - std::numeric_limits::max(), // cdc_min_replicated_index &log_)); log_->TEST_SetAllOpIdsSafe(true); diff --git a/src/yb/consensus/raft_consensus_quorum-test.cc b/src/yb/consensus/raft_consensus_quorum-test.cc index f7e06c0fc262..4a833878e1f7 100644 --- a/src/yb/consensus/raft_consensus_quorum-test.cc +++ b/src/yb/consensus/raft_consensus_quorum-test.cc @@ -149,7 +149,6 @@ class RaftConsensusQuorumTest : public YBTest { log_thread_pool_.get(), log_thread_pool_.get(), log_thread_pool_.get(), - std::numeric_limits::max(), // cdc_min_replicated_index &log)); logs_.push_back(log.get()); fs_managers_.push_back(fs_manager.release()); diff --git a/src/yb/integration-tests/cdc_service-int-test.cc b/src/yb/integration-tests/cdc_service-int-test.cc index 2144450bbe42..84ba951d2055 100644 --- a/src/yb/integration-tests/cdc_service-int-test.cc +++ b/src/yb/integration-tests/cdc_service-int-test.cc @@ -119,10 +119,10 @@ using rpc::RpcController; const std::string kCDCTestKeyspace = "my_keyspace"; const std::string kCDCTestTableName = "cdc_test_table"; const client::YBTableName kTableName(YQL_DATABASE_CQL, kCDCTestKeyspace, kCDCTestTableName); +const int kRowCount = 10; CDCServiceImpl* CDCService(tserver::TabletServer* tserver) { - return down_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + return down_cast(tserver->GetCDCService().get()); } class CDCServiceTest : public YBMiniClusterTestBase { @@ -617,7 +617,7 @@ TEST_F(CDCServiceTest, TestDeleteXClusterStream) { { const auto& tserver = cluster_->mini_tablet_server(0)->server(); std::string tablet_id = GetTablet(table_.name()); - ASSERT_NO_FATALS(WriteTestRow(0, 10, "key0", tablet_id, tserver->proxy())); + ASSERT_NO_FATALS(WriteTestRow(0, kRowCount, "key0", tablet_id, tserver->proxy())); } ASSERT_OK(DeleteXClusterStream(stream_id_)); @@ -665,8 +665,8 @@ TEST_F(CDCServiceTest, TestSafeTime) { ASSERT_OK(GetChangesInitialSchema(change_req, change_req.mutable_from_checkpoint())); auto ht_0 = ASSERT_RESULT(tablet_peer->LeaderSafeTime()).ToUint64(); - ASSERT_NO_FATALS(WriteTestRow(0, 10, "key0", tablet_id, leader_tserver->proxy())); - ASSERT_NO_FATALS(WriteTestRow(1, 11, "key0", tablet_id, leader_tserver->proxy())); + ASSERT_NO_FATALS(WriteTestRow(0, kRowCount, "key0", tablet_id, leader_tserver->proxy())); + ASSERT_NO_FATALS(WriteTestRow(1, kRowCount + 1, "key0", tablet_id, leader_tserver->proxy())); auto ht_1 = ASSERT_RESULT(tablet_peer->LeaderSafeTime()).ToUint64(); @@ -1135,7 +1135,8 @@ TEST_F(CDCServiceTestMultipleServersOneTablet, TestMetricsAfterServerFailure) { return leader_mini_tserver != nullptr; }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Wait for tablet to have a leader.")); auto timestamp_before_write = GetCurrentTimeMicros(); - ASSERT_NO_FATALS(WriteTestRow(0, 10, "key0", tablet_id, leader_mini_tserver->server()->proxy())); + ASSERT_NO_FATALS(WriteTestRow( + 0, kRowCount, "key0", tablet_id, leader_mini_tserver->server()->proxy())); ASSERT_NO_FATALS(GetChanges(tablet_id, stream_id_, term, index)); ASSERT_OK(leader_mini_tserver->Restart()); leader_mini_tserver = nullptr; @@ -1299,7 +1300,7 @@ TEST_F(CDCServiceTestMultipleServersOneTablet, TestMetricsUponRegainingLeadershi ASSERT_OK(MoveLeadersToTserver(tservers[0])); // Write data, and call GetChanges to bump up the last_x_physicaltime metrics on tserver 0. - ASSERT_NO_FATALS(WriteTestRow(0, 10, "key0", tablet_id, tservers[0]->server()->proxy())); + ASSERT_NO_FATALS(WriteTestRow(0, kRowCount, "key0", tablet_id, tservers[0]->server()->proxy())); GetChangesResponsePB change_resp; ASSERT_NO_FATALS(GetAllChanges(tablet_id, stream_id_, &change_resp)); auto ts0_metrics = ASSERT_RESULT( @@ -1311,7 +1312,8 @@ TEST_F(CDCServiceTestMultipleServersOneTablet, TestMetricsUponRegainingLeadershi } // Write more data, but don't call GetChanges so that we can test lag when switching leaders. - ASSERT_NO_FATALS(WriteTestRow(1, 11, "key0", tablet_id, tservers[0]->server()->proxy())); + ASSERT_NO_FATALS( + WriteTestRow(1, kRowCount + 1, "key0", tablet_id, tservers[0]->server()->proxy())); // [TIMESTAMP 1] - GetLastReplicatedTime of this tablet. // Move leader to ts1. @@ -1341,7 +1343,8 @@ TEST_F(CDCServiceTestMultipleServersOneTablet, TestMetricsUponRegainingLeadershi // - But the last_x_physicaltime metrics were set to [TIMESTAMP 1] (< [TIMESTAMP 2]). // Write data, so that we have lag again before swapping leaders. - ASSERT_NO_FATALS(WriteTestRow(2, 12, "key0", tablet_id, tservers[1]->server()->proxy())); + ASSERT_NO_FATALS( + WriteTestRow(2, kRowCount + 2, "key0", tablet_id, tservers[1]->server()->proxy())); // [TIMESTAMP 3] - new GetLastReplicatedTime of this tablet. // Simulate bg thread updating metrics on ts1. @@ -1814,24 +1817,86 @@ TEST_F(CDCServiceTest, TestCheckpointUpdate) { } namespace { -void WaitForCDCIndex(const std::shared_ptr& tablet_peer, +void WaitForCDCIndex(const std::string& tablet_id, + CDCServiceImpl* cdc_service, int64_t expected_index, int timeout_secs) { LOG(INFO) << "Waiting until index equals " << expected_index << ". Timeout: " << timeout_secs; ASSERT_OK(WaitFor([&](){ - if (tablet_peer->log_available() && - tablet_peer->log()->cdc_min_replicated_index() == expected_index && - tablet_peer->tablet_metadata()->cdc_min_replicated_index() == expected_index) { - return true; - } - return false; + return cdc_service->GetXClusterMinRequiredIndex(tablet_id) == expected_index; }, MonoDelta::FromSeconds(timeout_secs) * kTimeMultiplier, "Wait until cdc min replicated index.")); LOG(INFO) << "Done waiting"; } } // namespace +class CDCServiceTestFourServers : public CDCServiceTest { + public: + void SetUp() override { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_checkpoint_update_interval_ms) = 100; + CDCServiceTest::SetUp(); + } + virtual int server_count() override { return 4; } + virtual int tablet_count() override { return 1; } +}; + + +// xcluster map from cdc service should not be affected by tablet move. +TEST_F(CDCServiceTestFourServers, TestMinReplicatedIndexAfterTabletMove) { + docdb::DisableYcqlPackedRow(); + stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); + + std::string tablet_id = GetTablet(); + + tserver::MiniTabletServer* leader_mini_tserver; + ASSERT_OK(WaitFor([&]() -> Result { + leader_mini_tserver = GetLeaderForTablet(tablet_id); + return leader_mini_tserver != nullptr; + }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Wait for tablet to have a leader.")); + + const auto& proxy = leader_mini_tserver->server()->proxy(); + + WriteTestRow(0, kRowCount, "key", tablet_id, proxy); + GetChanges(tablet_id, stream_id_, /* term */ 0, /* index */ kRowCount); + + // Find the tservers that don't have the tablet. + tserver::TabletServer* move_to_server = nullptr; + size_t move_from_server_idx = 0; + for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { + const auto& tserver = cluster_->mini_tablet_server(i)->server(); + + auto tablet_peer = + tserver->tablet_manager()->GetTablet(tablet_id); + + if (tablet_peer.ok()) { + move_from_server_idx = i; + } else { + ASSERT_TRUE(move_to_server == nullptr); + move_to_server = tserver; + } + WaitForCDCIndex( + tablet_id, CDCService(tserver), 10, 4 * FLAGS_update_min_cdc_indices_interval_secs); + } + ASSERT_TRUE(move_to_server != nullptr); + + tserver::TabletServer* move_from_server = + cluster_->mini_tablet_server(move_from_server_idx)->server(); + + ASSERT_OK(cluster_->ClearBlacklist()); + ASSERT_OK(cluster_->AddTServerToBlacklist(move_from_server_idx)); + + ASSERT_OK(WaitFor([&]() -> bool { + return move_to_server->tablet_manager()->GetTablet(tablet_id).ok(); + }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Waiting for tablet move")); + + WaitForCDCIndex( + tablet_id, CDCService(move_to_server), 10, 4 * FLAGS_update_min_cdc_indices_interval_secs); + + WaitForCDCIndex( + tablet_id, CDCService(move_from_server), 10, 4 * FLAGS_update_min_cdc_indices_interval_secs); +} + class CDCServiceTestMaxRentionTime : public CDCServiceTest { public: void SetUp() override { @@ -1858,24 +1923,27 @@ TEST_F(CDCServiceTestMaxRentionTime, TestLogRetentionByOpId_MaxRentionTime) { std::string tablet_id = GetTablet(); - const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy(); + const auto& tserver = cluster_->mini_tablet_server(0)->server(); + + const auto& proxy = tserver->proxy(); auto tablet_peer = ASSERT_RESULT( - cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTablet(tablet_id)); + tserver->tablet_manager()->GetTablet(tablet_id)); // Write a row so that the next GetChanges request doesn't fail. - WriteTestRow(0, 10, "key0", tablet_id, proxy); + WriteTestRow(0, kRowCount, "key0", tablet_id, proxy); // Get CDC changes. GetChanges(tablet_id, stream_id_, /* term */ 0, /* index */ 0); - WaitForCDCIndex(tablet_peer, 0, 4 * FLAGS_update_min_cdc_indices_interval_secs); + WaitForCDCIndex( + tablet_id, CDCService(tserver), 0, 4 * FLAGS_update_min_cdc_indices_interval_secs); MonoTime start = MonoTime::Now(); // Write a lot more data to generate many log files that can be GCed. This should take less // than kMaxSecondsToRetain for the next check to succeed. for (int i = 1; i <= 100; i++) { - WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy); + WriteTestRow(i, kRowCount + i, "key" + std::to_string(i), tablet_id, proxy); } MonoDelta elapsed = MonoTime::Now().GetDeltaSince(start); ASSERT_LT(elapsed.ToSeconds(), kMaxSecondsToRetain); @@ -1924,13 +1992,15 @@ TEST_F(CDCServiceTestDurableMinReplicatedIndex, TestBootstrapProducer) { const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy(); for (int i = 0; i < kNRows; i++) { - WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy); + WriteTestRow(i, kRowCount + i, "key" + std::to_string(i), tablet_id, proxy); } + const auto& tserver = cluster_->mini_tablet_server(0)->server(); + // Verify that the cdc_min_replicated_index was initialized at the max index. - auto tablet_peer = ASSERT_RESULT( - cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTablet(tablet_id)); - WaitForCDCIndex(tablet_peer, OpId::Max().index, 4 * FLAGS_update_min_cdc_indices_interval_secs); + WaitForCDCIndex( + tablet_id, CDCService(tserver), OpId::Max().index, + 4 * FLAGS_update_min_cdc_indices_interval_secs); // Force the producer bootstrap to fail after updating the tablet replication index entries. ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdc_inject_replication_index_update_failure) = true; @@ -1945,9 +2015,9 @@ TEST_F(CDCServiceTestDurableMinReplicatedIndex, TestBootstrapProducer) { // Verify that the cdc_min_replicated_index remains in the initial state. This tests that the // the tablet replication index was rolled back after the injected failure. - tablet_peer = ASSERT_RESULT( - cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTablet(tablet_id)); - WaitForCDCIndex(tablet_peer, OpId::Max().index, 4 * FLAGS_update_min_cdc_indices_interval_secs); + WaitForCDCIndex( + tablet_id, CDCService(tserver), OpId::Max().index, + 4 * FLAGS_update_min_cdc_indices_interval_secs); // Clear the error injection. ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdc_inject_replication_index_update_failure) = false; @@ -1986,35 +2056,93 @@ TEST_F(CDCServiceTestDurableMinReplicatedIndex, TestBootstrapProducer) { ASSERT_EQ(nrows, 1); // Ensure that cdc_min_replicated_index is set to the correct value after Bootstrap. - tablet_peer = ASSERT_RESULT( - cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTablet(tablet_id)); + auto tablet_peer = ASSERT_RESULT( + tserver->tablet_manager()->GetTablet(tablet_id)); auto latest_opid = tablet_peer->log()->GetLatestEntryOpId(); - WaitForCDCIndex(tablet_peer, latest_opid.index, 4 * FLAGS_update_min_cdc_indices_interval_secs); + WaitForCDCIndex( + tablet_id, CDCService(tserver), latest_opid.index, + 4 * FLAGS_update_min_cdc_indices_interval_secs); +} + +TEST_F(CDCServiceTestDurableMinReplicatedIndex, TestMinReplicatedIndex) { + constexpr int kNStreams = 5; + + std::vector stream_ids; + + for (int i = 0; i < kNStreams; i++) { + stream_ids.push_back(ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id()))); + } + + std::string tablet_id = GetTablet(); + + const auto& tserver = cluster_->mini_tablet_server(0)->server(); + const auto &proxy = tserver->proxy(); + + // Insert test rows. + for (int i = 1; i <= kNStreams; i++) { + WriteTestRow(i, kRowCount + i, "key" + std::to_string(i), tablet_id, proxy); + } + + auto tablet_peer = ASSERT_RESULT( + tserver->tablet_manager()->GetTablet(tablet_id)); + + + // The periodic loop within'CDCServiceImpl::UpdatePeersAndMetrics' will periodically + // update the min index. This will eventually update the min index to 0. + WaitForCDCIndex( + tablet_id, CDCService(tserver), 0, 4 * FLAGS_update_min_cdc_indices_interval_secs); + + for (int i = 0; i < kNStreams; i++) { + // Get CDC changes. + GetChanges(tablet_id, stream_ids[i], /* term */ 0, /* index */ i); + } + + // After the request succeeded, verify that the min cdc limit was set correctly. In this case + // it belongs to stream_id[0] with index 0. + WaitForCDCIndex( + tablet_id, CDCService(tserver), 0, 4 * FLAGS_update_min_cdc_indices_interval_secs); + + // Changing the lowest index from all the streams should also be reflected in the log object. + GetChanges(tablet_id, stream_ids[0], /* term */ 0, /* index */ 4); + + // After the request succeeded, verify that the min cdc limit was set correctly. In this case + // it belongs to stream_id[1] with index 1. + WaitForCDCIndex( + tablet_id, CDCService(tserver), 1, 4 * FLAGS_update_min_cdc_indices_interval_secs); + + for (int i = 0; i < kNStreams; i++) { + ASSERT_OK(DeleteXClusterStream(stream_ids[i])); + } } -TEST_F(CDCServiceTestDurableMinReplicatedIndex, TestLogCDCMinReplicatedIndexIsDurable) { +TEST_F(CDCServiceTestDurableMinReplicatedIndex, TestCDCMinReplicatedIndexIsDurable) { stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); std::string tablet_id = GetTablet(); - const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy(); + auto tserver = cluster_->mini_tablet_server(0)->server(); + + const auto& proxy = tserver->proxy(); auto tablet_peer = ASSERT_RESULT( - cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTablet(tablet_id)); + tserver->tablet_manager()->GetTablet(tablet_id)); // Write a row so that the next GetChanges request doesn't fail. - WriteTestRow(0, 10, "key0", tablet_id, proxy); + WriteTestRow(0, kRowCount, "key0", tablet_id, proxy); // Get CDC changes. - GetChanges(tablet_id, stream_id_, /* term */ 0, /* index */ 10); + GetChanges(tablet_id, stream_id_, /* term */ 0, /* index */ kRowCount); - WaitForCDCIndex(tablet_peer, 10, 4 * FLAGS_update_min_cdc_indices_interval_secs); + WaitForCDCIndex( + tablet_id, CDCService(tserver), kRowCount, 4 * FLAGS_update_min_cdc_indices_interval_secs); // Restart the entire cluster to verify that the CDC tablet metadata got loaded from disk. ASSERT_OK(cluster_->RestartSync()); + tserver = cluster_->mini_tablet_server(0)->server(); + ASSERT_OK(WaitFor([&]() { auto tablet_peer = - cluster_->mini_tablet_server(0)->server()->tablet_manager()->LookupTablet(tablet_id); + tserver->tablet_manager()->LookupTablet(tablet_id); if (tablet_peer) { if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY && tablet_peer->log() != nullptr) { @@ -2025,9 +2153,61 @@ TEST_F(CDCServiceTestDurableMinReplicatedIndex, TestLogCDCMinReplicatedIndexIsDu return false; }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Wait until tablet has a leader.")); - // Verify the log and meta min replicated index was loaded correctly from disk. - ASSERT_EQ(tablet_peer->log()->cdc_min_replicated_index(), 10); - ASSERT_EQ(tablet_peer->tablet_metadata()->cdc_min_replicated_index(), 10); + WaitForCDCIndex( + tablet_id, CDCService(tserver), 10, 4 * FLAGS_update_min_cdc_indices_interval_secs); +} + +// Test that when all the streams for a specific tablet have been deleted, +// cdc min replicated index is reset to max int64. +TEST_F(CDCServiceTestDurableMinReplicatedIndex, TestCdcMinReplicatedIndexAreReset) { + constexpr int kNStreams = 5; + + std::vector stream_ids; + for (int i = 0; i < kNStreams; i++) { + stream_ids.push_back(ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id()))); + } + std::string tablet_id = GetTablet(); + + const auto& tserver = cluster_->mini_tablet_server(0)->server(); + const auto &proxy = tserver->proxy(); + + // Insert test rows. + for (int i = 1; i <= kNStreams; i++) { + WriteTestRow(i, kRowCount + i, "key" + std::to_string(i), tablet_id, proxy); + } + + // The periodic loop within'CDCServiceImpl::UpdatePeersAndMetrics' will periodically + // update the min index. This willeventually update the min index to 0. + WaitForCDCIndex( + tablet_id, CDCService(tserver), 0, 4 * FLAGS_update_min_cdc_indices_interval_secs); + + for (auto& stream_id : stream_ids) { + // Get CDC changes. + GetChanges(tablet_id, stream_id, /* term */ 0, /* index */ 5); + } + + // After the request succeeded, verify that the min cdc limit was set correctly. In this case + // all the streams have index 5. + WaitForCDCIndex( + tablet_id, CDCService(tserver), 5, 4 * FLAGS_update_min_cdc_indices_interval_secs); + + CDCStateTable cdc_state_table(client_.get()); + + std::vector keys_to_delete; + for (auto& stream_id : stream_ids) { + keys_to_delete.push_back({tablet_id, stream_id}); + } + ASSERT_OK(cdc_state_table.DeleteEntries(keys_to_delete)); + LOG(INFO) << "Successfully deleted all streams from cdc_state"; + + SleepFor(MonoDelta::FromSeconds(FLAGS_update_min_cdc_indices_interval_secs + 1)); + + ASSERT_EQ(CDCService(tserver)->GetXClusterMinRequiredIndex(tablet_id), + std::numeric_limits::max()); + + for (auto& stream_id : stream_ids) { + ASSERT_OK(DeleteXClusterStream(stream_id)); + } } class CDCServiceTestMinSpace : public CDCServiceTest { @@ -2057,22 +2237,25 @@ TEST_F(CDCServiceTestMinSpace, TestLogRetentionByOpId_MinSpace) { std::string tablet_id = GetTablet(); - const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy(); + const auto& tserver = cluster_->mini_tablet_server(0)->server(); + + const auto& proxy = tserver->proxy(); auto tablet_peer = ASSERT_RESULT( - cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTablet(tablet_id)); + tserver->tablet_manager()->GetTablet(tablet_id)); // Write a row so that the next GetChanges request doesn't fail. - WriteTestRow(0, 10, "key0", tablet_id, proxy); + WriteTestRow(0, kRowCount, "key0", tablet_id, proxy); // Get CDC changes. GetChanges(tablet_id, stream_id_, /* term */ 0, /* index */ 0); - WaitForCDCIndex(tablet_peer, 0, 4 * FLAGS_update_min_cdc_indices_interval_secs); + WaitForCDCIndex( + tablet_id, CDCService(tserver), 0, 4 * FLAGS_update_min_cdc_indices_interval_secs); // Write a lot more data to generate many log files that can be GCed. This should take less // than kMaxSecondsToRetain for the next check to succeed. for (int i = 1; i <= 5000; i++) { - WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy); + WriteTestRow(i, kRowCount + i, "key" + std::to_string(i), tablet_id, proxy); } log::SegmentSequence segment_sequence; @@ -2103,148 +2286,6 @@ TEST_F(CDCServiceTestMinSpace, TestLogRetentionByOpId_MinSpace) { GetChanges(tablet_id, stream_id_, /* term */ 0, /* index */ 0); } -class CDCLogAndMetaIndex : public CDCServiceTest { - public: - void SetUp() override { - // Immediately write any index provided by a GetChanges request to cdc_state table. - ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_checkpoint_update_interval_ms) = 0; - ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; - ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_min_replicated_index_considered_stale_secs) = 5; - ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_log_retention_by_op_idx) = true; - CDCServiceTest::SetUp(); - } -}; - -TEST_F(CDCLogAndMetaIndex, TestLogAndMetaCdcIndex) { - constexpr int kNStreams = 5; - - // This will rollover log segments a lot faster. - ANNOTATE_UNPROTECTED_WRITE(FLAGS_log_segment_size_bytes) = 100; - - std::vector stream_ids; - - for (int i = 0; i < kNStreams; i++) { - stream_ids.push_back(ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id()))); - } - - std::string tablet_id = GetTablet(); - - const auto &proxy = cluster_->mini_tablet_server(0)->server()->proxy(); - - // Insert test rows. - for (int i = 1; i <= kNStreams; i++) { - WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy); - } - - auto tablet_peer = ASSERT_RESULT( - cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTablet(tablet_id)); - - // The log and metadata min index are initialized to max value, but the periodic loop within - // 'CDCServiceImpl::UpdatePeersAndMetrics' will periodically update the min index. This will - // eventually update the min index to 0. - ASSERT_OK(WaitFor([&](){ - return tablet_peer->log()->cdc_min_replicated_index() == 0 && - tablet_peer->tablet_metadata()->cdc_min_replicated_index() == 0; - }, MonoDelta::FromSeconds(10) * kTimeMultiplier, "Wait for the min index.")); - - for (int i = 0; i < kNStreams; i++) { - // Get CDC changes. - GetChanges(tablet_id, stream_ids[i], /* term */ 0, /* index */ i); - } - - // After the request succeeded, verify that the min cdc limit was set correctly. In this case - // it belongs to stream_id[0] with index 0. - WaitForCDCIndex(tablet_peer, 0, 4 * FLAGS_update_min_cdc_indices_interval_secs); - - // Changing the lowest index from all the streams should also be reflected in the log object. - GetChanges(tablet_id, stream_ids[0], /* term */ 0, /* index */ 4); - - // After the request succeeded, verify that the min cdc limit was set correctly. In this case - // it belongs to stream_id[1] with index 1. - WaitForCDCIndex(tablet_peer, 1, 4 * FLAGS_update_min_cdc_indices_interval_secs); - - for (int i = 0; i < kNStreams; i++) { - ASSERT_OK(DeleteXClusterStream(stream_ids[i])); - } -} - -class CDCLogAndMetaIndexReset : public CDCLogAndMetaIndex { - public: - void SetUp() override { - ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_min_replicated_index_considered_stale_secs) = 5; - // This will rollover log segments a lot faster. - ANNOTATE_UNPROTECTED_WRITE(FLAGS_log_segment_size_bytes) = 100; - CDCLogAndMetaIndex::SetUp(); - } -}; - -// Test that when all the streams for a specific tablet have been deleted, the log and meta -// cdc min replicated index is reset to max int64. -TEST_F(CDCLogAndMetaIndexReset, TestLogAndMetaCdcIndexAreReset) { - constexpr int kNStreams = 5; - - // This will rollover log segments a lot faster. - ANNOTATE_UNPROTECTED_WRITE(FLAGS_log_segment_size_bytes) = 100; - - std::vector stream_ids; - for (int i = 0; i < kNStreams; i++) { - stream_ids.push_back(ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id()))); - } - std::string tablet_id = GetTablet(); - - const auto &proxy = cluster_->mini_tablet_server(0)->server()->proxy(); - - // Insert test rows. - for (int i = 1; i <= kNStreams; i++) { - WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy); - } - - auto tablet_peer = ASSERT_RESULT( - cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTablet(tablet_id)); - - // The log and metadata min index are initialized to max value, but the periodic loop within - // 'CDCServiceImpl::UpdatePeersAndMetrics' will periodically update the min index. This will - // eventually update the min index to 0. - ASSERT_OK(WaitFor([&](){ - return tablet_peer->log()->cdc_min_replicated_index() == 0 && - tablet_peer->tablet_metadata()->cdc_min_replicated_index() == 0; - }, MonoDelta::FromSeconds(10) * kTimeMultiplier, "Wait for the min index.")); - - for (auto& stream_id : stream_ids) { - // Get CDC changes. - GetChanges(tablet_id, stream_id, /* term */ 0, /* index */ 5); - } - - // After the request succeeded, verify that the min cdc limit was set correctly. In this case - // all the streams have index 5. - WaitForCDCIndex(tablet_peer, 5, 4 * FLAGS_update_min_cdc_indices_interval_secs); - - CDCStateTable cdc_state_table(client_.get()); - - std::vector keys_to_delete; - for (auto& stream_id : stream_ids) { - keys_to_delete.push_back({tablet_id, stream_id}); - } - ASSERT_OK(cdc_state_table.DeleteEntries(keys_to_delete)); - LOG(INFO) << "Successfully deleted all streams from cdc_state"; - - SleepFor(MonoDelta::FromSeconds(FLAGS_cdc_min_replicated_index_considered_stale_secs + 1)); - - LOG(INFO) << "Done sleeping"; - // RunLogGC should reset cdc min replicated index to max int64 because more than - // FLAGS_cdc_min_replicated_index_considered_stale_secs seconds have elapsed since the index - // was last updated. - ASSERT_OK(tablet_peer->RunLogGC()); - LOG(INFO) << "GC done running"; - ASSERT_EQ(tablet_peer->log()->cdc_min_replicated_index(), std::numeric_limits::max()); - ASSERT_EQ(tablet_peer->tablet_metadata()->cdc_min_replicated_index(), - std::numeric_limits::max()); - - for (auto& stream_id : stream_ids) { - ASSERT_OK(DeleteXClusterStream(stream_id)); - } -} - class CDCServiceTestThreeServers : public CDCServiceTest { public: void SetUp() override { @@ -2324,23 +2365,24 @@ TEST_F(CDCServiceTestThreeServers, TestNewLeaderUpdatesLogCDCAppliedIndex) { const auto &proxy = cluster_->mini_tablet_server(leader_idx)->server()->proxy(); for (int i = 0; i < kNRecords; i++) { - WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy); + WriteTestRow(i, kRowCount + i, "key" + std::to_string(i), tablet_id, proxy); } LOG(INFO) << "Inserted " << kNRecords << " records"; stream_id_ = ASSERT_RESULT(CreateXClusterStream(*client_, table_.table()->id())); LOG(INFO) << "Created cdc stream " << stream_id_; + // The periodic loop within'CDCServiceImpl::UpdatePeersAndMetrics' will periodically + // update the min index. This will eventually update the min index to 0. std::shared_ptr tablet_peer; - // Check that the index hasn't been updated in any of the peers. for (int idx = 0; idx < server_count(); idx++) { + const auto& tserver = cluster_->mini_tablet_server(idx)->server(); auto new_tablet_peer = - cluster_->mini_tablet_server(idx)->server()->tablet_manager()->LookupTablet(tablet_id); + tserver->tablet_manager()->LookupTablet(tablet_id); if (new_tablet_peer) { tablet_peer = new_tablet_peer; - ASSERT_EQ(tablet_peer->log()->cdc_min_replicated_index(), std::numeric_limits::max()); - ASSERT_EQ(tablet_peer->tablet_metadata()->cdc_min_replicated_index(), - std::numeric_limits::max()); + WaitForCDCIndex( + tablet_id, CDCService(tserver), 0, 4 * FLAGS_update_min_cdc_indices_interval_secs); } } @@ -2364,14 +2406,15 @@ TEST_F(CDCServiceTestThreeServers, TestNewLeaderUpdatesLogCDCAppliedIndex) { }, MonoDelta::FromSeconds(180) * kTimeMultiplier, "Wait until cdc state table can take writes.")); std::unique_ptr cdc_proxy; + tserver::TabletServer* tserver; ASSERT_OK(WaitFor([&](){ for (int idx = 0; idx < server_count(); idx++) { if (idx == leader_idx) { // This TServer is shutdown for now. continue; } - tablet_peer = - cluster_->mini_tablet_server(idx)->server()->tablet_manager()->LookupTablet(tablet_id); + tserver = cluster_->mini_tablet_server(idx)->server(); + tablet_peer = tserver->tablet_manager()->LookupTablet(tablet_id); if (tablet_peer && tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) { LOG(INFO) << "Found new leader for tablet " << tablet_id << " in TS " << idx; return true; @@ -2383,8 +2426,8 @@ TEST_F(CDCServiceTestThreeServers, TestNewLeaderUpdatesLogCDCAppliedIndex) { SleepFor(MonoDelta::FromSeconds((FLAGS_update_min_cdc_indices_interval_secs * 3))); LOG(INFO) << "Done sleeping"; - ASSERT_EQ(tablet_peer->log()->cdc_min_replicated_index(), 5); - ASSERT_EQ(tablet_peer->tablet_metadata()->cdc_min_replicated_index(), 5); + WaitForCDCIndex( + tablet_id, CDCService(tserver), 5, 4 * FLAGS_update_min_cdc_indices_interval_secs); ASSERT_OK(cluster_->mini_tablet_server(leader_idx)->Start()); } @@ -2488,7 +2531,7 @@ TEST_F(CDCServiceTestThreeServers, TestCheckpointIsMinOverMultipleStreams) { } // Test writing and sending GetChanges requests to update the checkpoints. for (int i = 0; i < kNRecords; i++) { - WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy); + WriteTestRow(i, kRowCount + i, "key" + std::to_string(i), tablet_id, proxy); // Make the three streams have different checkpoints where checkpoints are in decreasing order // in get_changes_req_resps. size_t max_get_changes_req_resps_index; @@ -2522,17 +2565,14 @@ TEST_F(CDCServiceTestThreeServers, TestCheckpointIsMinOverMultipleStreams) { if (is_leader_shutdown && idx == initial_leader_idx) { continue; } + const auto& tserver = cluster_->mini_tablet_server(idx)->server(); auto tablet_peer = - cluster_->mini_tablet_server(idx)->server()->tablet_manager()->LookupTablet(tablet_id); + tserver->tablet_manager()->LookupTablet(tablet_id); if (tablet_peer) { SCHECK_LE( - tablet_peer->log()->cdc_min_replicated_index(), lowest_checkpoint, IllegalState, + CDCService(tserver)->GetXClusterMinRequiredIndex(tablet_peer->tablet_id()), + lowest_checkpoint, IllegalState, Format("Peer on node $0 has invalid log::cdc_min_replicated_index", idx + 1)); - SCHECK_LE( - tablet_peer->tablet_metadata()->cdc_min_replicated_index(), lowest_checkpoint, - IllegalState, - Format( - "Peer on node $0 has invalid tablet_metadata::cdc_min_replicated_index", idx + 1)); } } diff --git a/src/yb/integration-tests/cdc_service-txn-test.cc b/src/yb/integration-tests/cdc_service-txn-test.cc index c2a71c982790..fa915bf5e31b 100644 --- a/src/yb/integration-tests/cdc_service-txn-test.cc +++ b/src/yb/integration-tests/cdc_service-txn-test.cc @@ -353,8 +353,7 @@ TEST_F(CDCServiceTxnTest, MetricsTest) { } const auto& tserver = cluster_->mini_tablet_server(0)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = dynamic_cast(tserver->GetCDCService().get()); auto metrics = ASSERT_RESULT(GetXClusterTabletMetrics(*cdc_service, tablet_id, stream_id)); ASSERT_OK(WaitFor( [&]() -> Result { diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index d133f4f36d08..b4df3d93bc82 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -170,8 +170,7 @@ void CDCSDKYsqlTest::TestCDCLagMetric(CDCCheckpointType checkpoint_type) { ASSERT_FALSE(resp.has_error()); const auto& tserver = test_cluster()->mini_tablet_server(0)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = CDCService(tserver); ASSERT_OK(WaitFor( [&]() { return cdc_service->CDCEnabled(); }, MonoDelta::FromSeconds(30), "IsCDCEnabled")); @@ -3738,10 +3737,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKActiveTimeCacheInSyncWi const auto& first_leader_tserver = test_cluster()->mini_tablet_server(first_leader_index)->server(); - auto cdc_service = dynamic_cast(first_leader_tserver->rpc_server() - ->TEST_service_pool("yb.cdc.CDCService") - ->TEST_get_service() - .get()); + auto cdc_service = CDCService(first_leader_tserver); auto tablet_info = ASSERT_RESULT(cdc_service->TEST_GetTabletInfoFromCache({stream_id, tablets[0].tablet_id()})); auto first_last_active_time = tablet_info.last_active_time; @@ -3767,10 +3763,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKActiveTimeCacheInSyncWi const auto& second_leader_tserver = test_cluster()->mini_tablet_server(second_leader_index)->server(); - cdc_service = dynamic_cast(second_leader_tserver->rpc_server() - ->TEST_service_pool("yb.cdc.CDCService") - ->TEST_get_service() - .get()); + cdc_service = CDCService(second_leader_tserver); tablet_info = ASSERT_RESULT( cdc_service->TEST_GetTabletInfoFromCache({stream_id, tablets[0].tablet_id()})); auto second_last_active_time = tablet_info.last_active_time; @@ -3975,8 +3968,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKLagMetrics)) { ASSERT_FALSE(resp.has_error()); const auto& tserver = test_cluster()->mini_tablet_server(0)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = CDCService(tserver); ASSERT_OK(WaitFor( [&]() { return cdc_service->CDCEnabled(); }, MonoDelta::FromSeconds(30), "IsCDCEnabled")); @@ -4034,8 +4026,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKLastSentTimeMetric)) { ASSERT_FALSE(resp.has_error()); const auto& tserver = test_cluster()->mini_tablet_server(0)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = CDCService(tserver); ASSERT_OK(WriteRowsHelper(0, 1, &test_cluster_, true)); ASSERT_OK(WaitForFlushTables( @@ -4082,8 +4073,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKExpiryMetric)) { ASSERT_FALSE(resp.has_error()); const auto& tserver = test_cluster()->mini_tablet_server(0)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = CDCService(tserver); ASSERT_OK(WriteRowsHelper(1, 100, &test_cluster_, true)); ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, @@ -4125,8 +4115,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKTrafficSentMetric)) { ASSERT_FALSE(resp.has_error()); const auto& tserver = test_cluster()->mini_tablet_server(0)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = CDCService(tserver); ASSERT_OK(WriteRowsHelper(1, 100, &test_cluster_, true)); ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, @@ -4185,8 +4174,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKChangeEventCountMetric) ASSERT_FALSE(resp.has_error()); const auto& tserver = test_cluster()->mini_tablet_server(0)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = CDCService(tserver); ASSERT_OK(WriteRowsHelper(1, 100, &test_cluster_, true)); ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, @@ -4239,8 +4227,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKMetricsTwoTablesSingleS } const auto& tserver = test_cluster()->mini_tablet_server(0)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = CDCService(tserver); int64_t current_traffic_sent_bytes = 0; vector change_resp(num_tables); @@ -4336,8 +4323,7 @@ TEST_F( ASSERT_FALSE(resp.has_error()); } const auto& tserver = test_cluster()->mini_tablet_server(0)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = CDCService(tserver); for (uint32_t idx = 0; idx < num_tables; idx++) { int64_t current_traffic_sent_bytes = 0; @@ -4405,8 +4391,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKMetricsTwoTablesTwoStre } } const auto& tserver = test_cluster()->mini_tablet_server(0)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = CDCService(tserver); for (uint32_t idx = 0; idx < num_tables; idx++) { int64_t current_traffic_sent_bytes = 0; @@ -4458,8 +4443,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKMetricsWithAddStream)) ASSERT_FALSE(resp.has_error()); const auto& tserver = test_cluster()->mini_tablet_server(0)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = CDCService(tserver); int64_t current_traffic_sent_bytes = 0; @@ -5448,8 +5432,7 @@ TEST_F( ASSERT_FALSE(resp.has_error()); const auto& tserver = test_cluster()->mini_tablet_server(0)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = CDCService(tserver); // Initiate a transaction with 'BEGIN' statement. auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); @@ -5520,8 +5503,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKLagMetricUnchangedOnEmp ASSERT_FALSE(resp.has_error()); const auto& tserver = test_cluster()->mini_tablet_server(0)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = CDCService(tserver); // Initiate a transaction with 'BEGIN' statement. auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc index e74159c8a2db..9f41f48b0151 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc @@ -1258,8 +1258,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { void CDCSDKYsqlTest::EnableCDCServiceInAllTserver(uint32_t num_tservers) { for (uint32_t i = 0; i < num_tservers; ++i) { const auto& tserver = test_cluster()->mini_tablet_server(i)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = CDCService(tserver); cdc_service->SetCDCServiceEnabled(); } } @@ -1270,8 +1269,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { // check the CDC Service Cache of all the tservers. for (uint32_t i = 0; i < num_tservers; ++i) { const auto& tserver = test_cluster()->mini_tablet_server(i)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = CDCService(tserver); auto status = cdc_service->TEST_GetTabletInfoFromCache({stream_id, tablet_id}); if (status.ok()) { count += 1; diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.h b/src/yb/integration-tests/cdcsdk_ysql_test_base.h index ee1c8a8e4baa..f09e8191881d 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.h +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.h @@ -49,6 +49,7 @@ #include "yb/tserver/mini_tablet_server.h" #include "yb/tserver/tserver_admin.proxy.h" +#include "yb/tserver/tablet_server.h" #include "yb/util/enums.h" #include "yb/util/monotime.h" @@ -144,6 +145,10 @@ static constexpr uint64_t kVWALSessionId1 = std::numeric_limits::max() static constexpr uint64_t kVWALSessionId2 = std::numeric_limits::max() / 2 + 1; static constexpr uint64_t kVWALSessionId3 = std::numeric_limits::max() / 2 + 2; +CDCServiceImpl* CDCService(tserver::TabletServer* tserver) { + return down_cast(tserver->GetCDCService().get()); +} + class CDCSDKYsqlTest : public CDCSDKTestBase { public: struct ExpectedRecord { diff --git a/src/yb/integration-tests/create-table-itest.cc b/src/yb/integration-tests/create-table-itest.cc index 55f6c3c9aafc..1c4f5374a074 100644 --- a/src/yb/integration-tests/create-table-itest.cc +++ b/src/yb/integration-tests/create-table-itest.cc @@ -1109,6 +1109,9 @@ TEST_F(CreateTableITest, LazySuperblockFlushMultiTablePersistence) { ts_flags.push_back("--log_min_segments_to_retain=1"); ts_flags.push_back("--log_min_seconds_to_retain=0"); + // Prevent the flag validator from failing when FLAGS_log_min_seconds_to_retain is also set to 0 + ts_flags.push_back("--xcluster_checkpoint_max_staleness_secs=0"); + // Minimize log replay. ts_flags.push_back("--retryable_request_timeout_secs=0"); diff --git a/src/yb/integration-tests/raft_consensus-itest.cc b/src/yb/integration-tests/raft_consensus-itest.cc index b29b5f321abf..b432998ed1a4 100644 --- a/src/yb/integration-tests/raft_consensus-itest.cc +++ b/src/yb/integration-tests/raft_consensus-itest.cc @@ -494,6 +494,8 @@ void RaftConsensusITest::AddFlagsForLogRolls(vector* extra_tserver_flags extra_tserver_flags->push_back("--log_async_preallocate_segments=false"); extra_tserver_flags->push_back("--log_min_segments_to_retain=1"); extra_tserver_flags->push_back("--log_min_seconds_to_retain=0"); + // Prevent the flag validator from failing when FLAGS_log_min_seconds_to_retain is also set to 0 + extra_tserver_flags->push_back("--xcluster_checkpoint_max_staleness_secs=0"); extra_tserver_flags->push_back("--maintenance_manager_polling_interval_ms=100"); extra_tserver_flags->push_back("--db_write_buffer_size=100000"); } diff --git a/src/yb/integration-tests/remote_bootstrap-itest.cc b/src/yb/integration-tests/remote_bootstrap-itest.cc index 2c8cad2342fa..417f8c4528b5 100644 --- a/src/yb/integration-tests/remote_bootstrap-itest.cc +++ b/src/yb/integration-tests/remote_bootstrap-itest.cc @@ -1972,6 +1972,9 @@ TEST_F(RemoteBootstrapITest, TestRBSWithLazySuperblockFlush) { ts_flags.push_back("--log_min_segments_to_retain=1"); ts_flags.push_back("--log_min_seconds_to_retain=0"); + // Prevent the flag validator from failing when FLAGS_log_min_seconds_to_retain is also set to 0 + ts_flags.push_back("--xcluster_checkpoint_max_staleness_secs=0"); + // Minimize log replay. ts_flags.push_back("--retryable_request_timeout_secs=0"); diff --git a/src/yb/integration-tests/ts_recovery-itest.cc b/src/yb/integration-tests/ts_recovery-itest.cc index 7f664d290ea6..1777db73ca27 100644 --- a/src/yb/integration-tests/ts_recovery-itest.cc +++ b/src/yb/integration-tests/ts_recovery-itest.cc @@ -124,9 +124,12 @@ TEST_F(TsRecoveryITest, TestCrashDuringLogReplay) { } TEST_F(TsRecoveryITest, CrashAfterLogSegmentPreAllocationg) { + // Set xcluster_checkpoint_max_staleness_secs to 0 to prevent the flag validator from failing + // when log_min_seconds_to_retain is also set to 0 ASSERT_NO_FATALS(StartCluster({ "--log_segment_size_bytes=2000", "--log_min_seconds_to_retain=0", + "--xcluster_checkpoint_max_staleness_secs=0", "--retryable_request_timeout_secs=0", "--db_write_buffer_size=2000", "--TEST_log_fault_after_segment_allocation_min_replicate_index=10" })); diff --git a/src/yb/integration-tests/xcluster/xcluster-tablet-split-itest.cc b/src/yb/integration-tests/xcluster/xcluster-tablet-split-itest.cc index 24d15e92008c..a2664db2be13 100644 --- a/src/yb/integration-tests/xcluster/xcluster-tablet-split-itest.cc +++ b/src/yb/integration-tests/xcluster/xcluster-tablet-split-itest.cc @@ -27,6 +27,7 @@ #include "yb/dockv/partition.h" #include "yb/common/ql_value.h" #include "yb/common/wire_protocol.h" +#include "yb/consensus/log.h" #include "yb/docdb/docdb_test_util.h" #include "yb/integration-tests/cdc_test_util.h" #include "yb/integration-tests/external_mini_cluster.h" @@ -37,11 +38,13 @@ #include "yb/master/master_client.pb.h" #include "yb/master/master_ddl.proxy.h" #include "yb/master/master_defaults.h" +#include "yb/tablet/tablet.h" #include "yb/tablet/tablet_metadata.h" #include "yb/tablet/tablet_peer.h" #include "yb/tools/admin-test-base.h" #include "yb/tserver/mini_tablet_server.h" #include "yb/tserver/tablet_server.h" +#include "yb/tserver/ts_tablet_manager.h" #include "yb/util/backoff_waiter.h" #include "yb/util/logging.h" #include "yb/util/sync_point.h" @@ -63,6 +66,7 @@ DECLARE_int32(cdc_state_checkpoint_update_interval_ms); DECLARE_bool(enable_collect_cdc_metrics); DECLARE_int32(update_metrics_interval_ms); DECLARE_int32(cleanup_split_tablets_interval_sec); +DECLARE_int32(update_min_cdc_indices_interval_secs); DECLARE_bool(enable_automatic_tablet_splitting); DECLARE_int64(tablet_split_low_phase_shard_count_per_node); @@ -72,6 +76,7 @@ DECLARE_int64(db_write_buffer_size); namespace yb { using test::Partitioning; +using cdc::CDCServiceImpl; template class XClusterTabletSplitITestBase : public TabletSplitBase { @@ -252,9 +257,30 @@ class CdcTabletSplitITest : public XClusterTabletSplitITestBase& tablet_peer, + CDCServiceImpl* cdc_service, + int64_t expected_index, + int timeout_secs) { + LOG(INFO) << "Waiting until index equals " << expected_index + << ". Timeout: " << timeout_secs; + + ASSERT_OK(WaitFor([&]() { + return cdc_service->GetXClusterMinRequiredIndex(tablet_peer->tablet_id()) == expected_index; + }, MonoDelta::FromSeconds(timeout_secs) * kTimeMultiplier, + "Wait until cdc min replicated index.")); + LOG(INFO) << "Done waiting"; +} +} // namespace + +CDCServiceImpl* CDCService(tserver::TabletServer* tserver) { + return down_cast(tserver->GetCDCService().get()); +} + TEST_F(CdcTabletSplitITest, GetChangesOnSplitParentTablet) { docdb::DisableYcqlPackedRow(); ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_checkpoint_update_interval_ms) = 0; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; constexpr auto kNumRows = kDefaultNumRows; auto cdc_proxy = std::make_unique(&client_->proxy_cache(), HostPort::FromBoundEndpoint(cluster_->mini_tablet_servers().front()->bound_rpc_addr())); @@ -317,6 +343,18 @@ TEST_F(CdcTabletSplitITest, GetChangesOnSplitParentTablet) { ASSERT_OK(s); ASSERT_EQ(children_found, 2); + // Verify cdc_min_replicated_index after split. + for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { + for (const auto& child_tablet_id : child_tablet_ids) { + const auto& tserver = cluster_->mini_tablet_server(i)->server(); + auto tablet_peer = ASSERT_RESULT( + tserver->tablet_manager()->GetTablet(child_tablet_id)); + WaitForCDCIndex( + tablet_peer, CDCService(tserver), + split_op_checkpoint.index, 4 * FLAGS_update_min_cdc_indices_interval_secs); + } + } + // Now let the parent tablet get deleted by the background task. // To do so, we need to issue a GetChanges to both children tablets. for (const auto& child_tablet_id : child_tablet_ids) { diff --git a/src/yb/integration-tests/xcluster/xcluster-test.cc b/src/yb/integration-tests/xcluster/xcluster-test.cc index a873c0bdfd8d..e79cb759e03c 100644 --- a/src/yb/integration-tests/xcluster/xcluster-test.cc +++ b/src/yb/integration-tests/xcluster/xcluster-test.cc @@ -19,6 +19,7 @@ #include #include +#include "yb/cdc/cdc_service.h" #include "yb/cdc/cdc_service.pb.h" #include "yb/cdc/cdc_service.proxy.h" #include "yb/cdc/cdc_state_table.h" @@ -151,6 +152,7 @@ DECLARE_bool(xcluster_skip_health_check_on_replication_setup); namespace yb { +using cdc::CDCServiceImpl; using client::YBClient; using client::YBSchema; using client::YBSchemaBuilder; @@ -160,6 +162,7 @@ using client::YBTableAlterer; using client::YBTableName; using master::MiniMaster; + using SessionTransactionPair = std::pair; struct XClusterTestParams { @@ -169,6 +172,10 @@ struct XClusterTestParams { bool transactional_table; // For XCluster + CQL only. All YSQL tables are transactional. }; +CDCServiceImpl* CDCService(tserver::TabletServer* tserver) { + return down_cast(tserver->GetCDCService().get()); +} + class XClusterTestNoParam : public XClusterYcqlTestBase { public: virtual Status SetUpWithParams( @@ -1383,8 +1390,7 @@ TEST_P(XClusterTest, PollAndObserveIdleDampening) { ASSERT_ONLY_NOTNULL(cdc_ts); // Find the CDCTabletMetric associated with the above pair. - auto cdc_service = dynamic_cast( - cdc_ts->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = CDCService(cdc_ts); auto metrics = ASSERT_RESULT(GetXClusterTabletMetrics(*cdc_service, tablet_id, stream_id)); /*********************************** @@ -2741,8 +2747,7 @@ TEST_P(XClusterTest, TestNonZeroLagMetricsWithoutGetChange) { // Check that the CDC enabled flag is true. tserver::TabletServer* cdc_ts = producer_cluster()->mini_tablet_server(0)->server(); - auto cdc_service = dynamic_cast( - cdc_ts->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = CDCService(cdc_ts); ASSERT_OK(WaitFor( [&]() { return cdc_service->CDCEnabled(); }, MonoDelta::FromSeconds(30), "IsCDCEnabled")); @@ -3487,13 +3492,15 @@ TEST_F_EX(XClusterTest, CdcCheckpointPeerMove, XClusterTestNoParam) { int64_t min_found = std::numeric_limits::max(); int64_t max_found = std::numeric_limits::min(); for (auto& tablet_server : producer_cluster()->mini_tablet_servers()) { + auto cdc_service = CDCService(tablet_server->server()); for (const auto& tablet_peer : tablet_server->server()->tablet_manager()->GetTabletPeers()) { if (tablet_peer->tablet_id() != tablet_id) { continue; } peer_count++; - auto cdc_checkpoint = tablet_peer->get_cdc_min_replicated_index(); + auto cdc_checkpoint = + cdc_service->GetXClusterMinRequiredIndex(tablet_peer->tablet_id()); LOG(INFO) << "TServer: " << tablet_server->server()->ToString() << ", CDC min replicated index: " << cdc_checkpoint; if (cdc_checkpoint == std::numeric_limits::max() || diff --git a/src/yb/integration-tests/xcluster/xcluster_consistency-test.cc b/src/yb/integration-tests/xcluster/xcluster_consistency-test.cc index 036b487bf6b5..c65d1ebbe1a4 100644 --- a/src/yb/integration-tests/xcluster/xcluster_consistency-test.cc +++ b/src/yb/integration-tests/xcluster/xcluster_consistency-test.cc @@ -168,8 +168,7 @@ class XClusterConsistencyTest : public XClusterYsqlTestBase { uint32_t count = 0; for (const auto& mini_tserver : producer_cluster()->mini_tablet_servers()) { auto* tserver = mini_tserver->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = dynamic_cast(tserver->GetCDCService().get()); for (const auto& stream_id : stream_ids_) { for (const auto& tablet_id : producer_tablet_ids_) { @@ -191,8 +190,7 @@ class XClusterConsistencyTest : public XClusterYsqlTestBase { uint32_t count = 0; for (const auto& mini_tserver : producer_cluster()->mini_tablet_servers()) { auto* tserver = mini_tserver->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); + auto cdc_service = dynamic_cast(tserver->GetCDCService().get()); for (const auto& stream_id : stream_ids_) { for (const auto& tablet_id : producer_tablet_ids_) { diff --git a/src/yb/tablet/tablet_bootstrap.cc b/src/yb/tablet/tablet_bootstrap.cc index ca9febc411e9..6f1b8f3a0361 100644 --- a/src/yb/tablet/tablet_bootstrap.cc +++ b/src/yb/tablet/tablet_bootstrap.cc @@ -859,7 +859,6 @@ class TabletBootstrap { append_pool_, allocation_pool_, log_sync_pool_, - metadata.cdc_min_replicated_index(), &log_, data_.pre_log_rollover_callback, new_segment_allocation_callback, diff --git a/src/yb/tablet/tablet_peer-test.cc b/src/yb/tablet/tablet_peer-test.cc index 6ad090da3021..14050b18ad59 100644 --- a/src/yb/tablet/tablet_peer-test.cc +++ b/src/yb/tablet/tablet_peer-test.cc @@ -195,7 +195,7 @@ class TabletPeerTest : public YBTabletTest { metadata->fs_manager()->uuid(), *tablet()->schema(), metadata->schema_version(), table_metric_entity_.get(), tablet_metric_entity_.get(), log_thread_pool_.get(), log_thread_pool_.get(), - log_thread_pool_.get(), metadata->cdc_min_replicated_index(), &log, + log_thread_pool_.get(), &log, pre_log_rollover_callback, new_segment_allocation_callback)); auto bootstrap_state_manager = std::make_shared( diff --git a/src/yb/tserver/remote_bootstrap_session-test.cc b/src/yb/tserver/remote_bootstrap_session-test.cc index a90b60018650..448850940e6b 100644 --- a/src/yb/tserver/remote_bootstrap_session-test.cc +++ b/src/yb/tserver/remote_bootstrap_session-test.cc @@ -88,7 +88,6 @@ void RemoteBootstrapSessionTest::SetUpTabletPeer() { log_thread_pool_.get(), log_thread_pool_.get(), log_thread_pool_.get(), - std::numeric_limits::max(), // cdc_min_replicated_index &log, {}, // pre_log_rollover_callback, new_segment_allocation_callback)); diff --git a/src/yb/tserver/ts_tablet_manager.cc b/src/yb/tserver/ts_tablet_manager.cc index f61bfa71a0ef..8fe2dbbcc283 100644 --- a/src/yb/tserver/ts_tablet_manager.cc +++ b/src/yb/tserver/ts_tablet_manager.cc @@ -46,6 +46,8 @@ #include "yb/ash/wait_state.h" +#include "yb/cdc/cdc_service.h" + #include "yb/client/client.h" #include "yb/client/meta_data_cache.h" #include "yb/client/transaction_manager.h" @@ -2098,6 +2100,11 @@ void TSTabletManager::OpenTablet(const RaftGroupMetadataPtr& meta, return; } + if (server_->GetCDCService()) { + tablet_peer->log()->SetGetXClusterMinIndexToRetainFunc( + server_->GetCDCService()->GetXClusterMinRequiredIndexFunc()); + } + tablet_peer->RegisterMaintenanceOps(server_->maintenance_manager()); }