Skip to content

Commit

Permalink
[#22862] XCluster: Improving XCluster Index Base WAL Retention Policy
Browse files Browse the repository at this point in the history
Summary:
**Background:**
XCluster and CDCSDK rely on the tablet's `cdc_min_replicated_index` to retain the WAL segments that have not been replicated to the target. This value is updated by the background task in `CDCServiceImpl::UpdatePeersAndMetrics`, which is scheduled to run every 60 seconds(code link). For each update:
* It first collects `cdc_min_replicated_index` and some CDCSDK checkpoints for each tablet from the CDC_State table
* Then it subsequently updates the value within each tablet's WAL and then flush the values to the tablet metadata file.

**Issue:**
Potential WAL Over-GC Risk:
* Currently, each tablet monitors its own `cdc_min_replicated_index`. If the tablet doesn't receive an update within 15 minutes, it then drops its OpId index-based WAL retention for XCluster(introduced in D7873). This implies that if there is a delay longer than 15 minutes before updating a XCluster tablet, WAL over gc could happen. For example, in `CDCServiceImpl::UpdatePeersAndMetrics`, updating all tablets cdc WAL retention barrier might take more than 15 mins.
Unnecessary Flush:
* Since `cdc_min_replicated_index` is already stored in the CDC_State table, the additional flush to the tablet metadata file is unnecessary. (We’ve observed this caused increased disk IO in CE-509)

**Alternative for Dropping XCluster WAL Retention Policy:**
In `CDCServiceImpl::UpdatePeersAndMetrics`, after collecting tablet checkpoint info, instead of individual updates, CDCService will store the checkpoint info stored in an in-memory map. And all tablets' WAL will pull their `cdc_min_replicated_index` from this map during WAL GC. If not found, CDCService will return maximum OpId, indicating removal from XCluster replication.

**New Gflag: **
`xcluster_checkpoint_max_staleness_secs`(300 by default): The maximum interval in seconds that the xcluster checkpoint map can go without being refreshed. If the map is not refreshed within this interval, it is considered stale, and all WAL segments will be retained until the next refresh.
Setting to 0 will disable Opid based WAL segment retention for XCluster. **If we want to disable it, It's recommended to also set Gflag`log_min_seconds_to_retain` to a large value so the WAL segments still can be retained by the time based WAL segment retention**

**TServer Restart Safety Guarantee:**
This change removed the periodic flushing of  tablet metadata for persisting `cdc_min_replicated_index` by XCluster. Consequently, tablets no longer rely on persisted `cdc_min_replicated_index` to retain WAL segments during TServer restarts.
This design ensures safety by leveraging the `xcluster_checkpoint_max_staleness_secs` mechanism. Before CDCService refresh the xcluster checkpoint map, the map is considered stale, preventing premature WAL GC.

**Other Changes:**
Modified CDCServiceTest unit tests to verify tablet xcluster required minimum index by checking the CDCService cached xcluster checkpoint map instead of relying on the tablet's log and metadata.
Jira: DB-11766

Test Plan:
CDCServiceTestFourServers.TestMinReplicatedIndexAfterTabletMove
CDCServiceTestDurableMinReplicatedIndex.TestLogCDCMinReplicatedIndexIsDurable
CdcTabletSplitITest.GetChangesOnSplitParentTablet

Reviewers: hsunder, mlillibridge, jhe, xCluster

Reviewed By: hsunder

Subscribers: ycdcxcluster, slingam, rthallam, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D36298
  • Loading branch information
yusong-yan committed Aug 12, 2024
1 parent 09d6e96 commit 69d4052
Show file tree
Hide file tree
Showing 27 changed files with 539 additions and 298 deletions.
170 changes: 142 additions & 28 deletions src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "yb/consensus/raft_consensus.h"
#include "yb/consensus/replicate_msgs_holder.h"

#include "yb/gutil/map-util.h"
#include "yb/gutil/strings/join.h"

#include "yb/master/master_client.pb.h"
Expand Down Expand Up @@ -180,6 +181,31 @@ DEFINE_test_flag(bool, cdcsdk_skip_stream_active_check, false,
"When enabled, GetChanges will skip checking if stream is active as well as skip "
"updating the active time.");

DEFINE_RUNTIME_uint32(xcluster_checkpoint_max_staleness_secs, 300,
"The maximum interval in seconds that the xcluster checkpoint map can go without being "
"refreshed. If the map is not refreshed within this interval, it is considered stale, "
"and all WAL segments will be retained until the next refresh. "
"Setting to 0 will disable Opid-based WAL segment retention for XCluster.");

DECLARE_int32(log_min_seconds_to_retain);

static bool ValidateMaxRefreshInterval(const char* flag_name, uint32 value) {
// This validation depends on the value of other flag(s):
// log_min_seconds_to_retain, update_min_cdc_indices_interval_secs.
DELAY_FLAG_VALIDATION_ON_STARTUP(flag_name);

if (value == 0 || value < static_cast<uint32>(
FLAGS_log_min_seconds_to_retain + FLAGS_update_min_cdc_indices_interval_secs)) {
return true;
}
LOG_FLAG_VALIDATION_ERROR(flag_name, value)
<< "Must be less than the sume of log_min_seconds_to_retain and "
<< "update_min_cdc_indices_interval_secs";
return false;
}

DEFINE_validator(xcluster_checkpoint_max_staleness_secs, &ValidateMaxRefreshInterval);

DECLARE_bool(enable_log_retention_by_op_idx);

DECLARE_int32(cdc_checkpoint_opid_interval_ms);
Expand Down Expand Up @@ -1096,6 +1122,13 @@ Result<SetCDCCheckpointResponsePB> CDCServiceImpl::SetCDCCheckpoint(

auto stream_id = VERIFY_STRING_TO_STREAM_ID(req.stream_id());
auto record = VERIFY_RESULT(GetStream(stream_id));

if (record->GetSourceType() != XCLUSTER) {
LOG(INFO) << "SetCDCCheckpoint is not supported for xCluster stream " << req.tablet_id();
}
RSTATUS_DCHECK(record->GetSourceType() != XCLUSTER,
InvalidArgument, "SetCDCCheckpoint is not supported for xCluster stream $0", stream_id);

if (record->GetCheckpointType() != EXPLICIT) {
LOG(WARNING) << "Setting the checkpoint explicitly even though the checkpoint type is implicit";
}
Expand Down Expand Up @@ -2332,6 +2365,10 @@ void PopulateTabletMinCheckpointAndLatestActiveTime(
const string& tablet_id, const OpId& checkpoint, CDCRequestSource cdc_source_type,
const int64_t& last_active_time, TabletIdCDCCheckpointMap& tablet_min_checkpoint_index,
const HybridTime cdc_sdk_safe_time = HybridTime::kInvalid) {
if (cdc_source_type == XCLUSTER) {
return;
}

auto& tablet_info = tablet_min_checkpoint_index[tablet_id];

tablet_info.cdc_op_id = min(tablet_info.cdc_op_id, checkpoint);
Expand All @@ -2351,7 +2388,23 @@ void PopulateTabletMinCheckpointAndLatestActiveTime(
}
}

void CDCServiceImpl::ProcessEntry(
void CDCServiceImpl::ProcessEntryForXCluster(
const CDCStateTableEntry& entry,
std::unordered_map<TabletId, OpId>& xcluster_tablet_min_opid_map) {
const auto& checkpoint = *entry.checkpoint;

if (checkpoint == OpId::Invalid()) {
return;
}

auto [it, inserted] =
xcluster_tablet_min_opid_map.insert({entry.key.tablet_id, checkpoint});
if (!inserted) {
it->second = std::min(it->second, checkpoint);
}
}

void CDCServiceImpl::ProcessEntryForCdcsdk(
const CDCStateTableEntry& entry,
const StreamMetadata& stream_metadata,
const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
Expand Down Expand Up @@ -2667,7 +2720,7 @@ Result<TabletCDCCheckpointInfo> CDCServiceImpl::PopulateCDCSDKTabletCheckPointIn
break;
}

ProcessEntry(entry, stream_metadata, tablet_peer, tablet_min_checkpoint_map);
ProcessEntryForCdcsdk(entry, stream_metadata, tablet_peer, tablet_min_checkpoint_map);
}

RETURN_NOT_OK(iteration_status);
Expand All @@ -2679,9 +2732,47 @@ Result<TabletCDCCheckpointInfo> CDCServiceImpl::PopulateCDCSDKTabletCheckPointIn
return *it;
}

Result<TabletIdCDCCheckpointMap> CDCServiceImpl::PopulateTabletCheckPointInfo(
int64_t CDCServiceImpl::GetXClusterMinRequiredIndex(const TabletId& tablet_id) {
if (!CDCEnabled()) {
return std::numeric_limits<int64_t>::max();
}

auto max_staleness_secs = FLAGS_xcluster_checkpoint_max_staleness_secs;

if (max_staleness_secs == 0) {
// Feature is disabled.
return std::numeric_limits<int64_t>::max();
}

SharedLock l(xcluster_replication_maps_mutex_);

auto seconds_since_last_refresh =
MonoTime::Now().GetDeltaSince(xcluster_map_last_refresh_time_).ToSeconds();

if (seconds_since_last_refresh > max_staleness_secs) {
YB_LOG_EVERY_N_SECS(WARNING, 60)
<< "XCluster min opid map hasn't been refresh for a while, "
<< "retain all WAL segments until the map is refreshed";
return 0;
}

auto* min_replicated_opid = FindOrNull(xcluster_tablet_min_opid_map_, tablet_id);

if (min_replicated_opid == nullptr) {
// Tablet is not under XCluster replication.
return std::numeric_limits<int64_t>::max();
}

VLOG(2) << min_replicated_opid->index
<< " is the xcluster min required index for tablet " << tablet_id;

return min_replicated_opid->index;
}

Status CDCServiceImpl::PopulateTabletCheckPointInfo(
TabletIdCDCCheckpointMap& cdcsdk_min_checkpoint_map,
std::unordered_map<TabletId, OpId>& xcluster_tablet_min_opid_map,
TabletIdStreamIdSet& tablet_stream_to_be_deleted, StreamIdSet& slot_entries_to_be_deleted) {
TabletIdCDCCheckpointMap tablet_min_checkpoint_map;
std::unordered_set<xrepl::StreamId> refreshed_metadata_set;

int count = 0;
Expand Down Expand Up @@ -2722,13 +2813,6 @@ Result<TabletIdCDCCheckpointMap> CDCServiceImpl::PopulateTabletCheckPointInfo(
continue;
}

auto tablet_peer = context_->LookupTablet(tablet_id);
if (!tablet_peer) {
LOG(WARNING) << "Could not find tablet peer for tablet_id: " << tablet_id
<< ". Will not update its peers in this round";
continue;
}

RefreshStreamMapOption refresh_option = RefreshStreamMapOption::kNone;
if (refreshed_metadata_set.find(stream_id) == refreshed_metadata_set.end()) {
refresh_option = RefreshStreamMapOption::kIfInitiatedState;
Expand All @@ -2739,11 +2823,11 @@ Result<TabletIdCDCCheckpointMap> CDCServiceImpl::PopulateTabletCheckPointInfo(
<< get_stream_metadata.status();
// The stream_id present in the cdc_state table was not found in the master cache, it means
// that the stream is deleted. To update the corresponding tablet PEERs, give an entry in
// tablet_min_checkpoint_map which will update cdc_sdk_min_checkpoint_op_id to
// cdcsdk_min_checkpoint_map which will update cdc_sdk_min_checkpoint_op_id to
// OpId::Max()(i.e no need to retain the intents.). And also mark the row to be deleted.
if (!tablet_min_checkpoint_map.contains(tablet_id)) {
if (!cdcsdk_min_checkpoint_map.contains(tablet_id)) {
VLOG(2) << "We could not get the metadata for the stream: " << stream_id;
auto& tablet_info = tablet_min_checkpoint_map[tablet_id];
auto& tablet_info = cdcsdk_min_checkpoint_map[tablet_id];
tablet_info.cdc_op_id = OpId::Max();
tablet_info.cdc_sdk_op_id = OpId::Max();
tablet_info.cdc_sdk_safe_time = HybridTime::kInvalid;
Expand Down Expand Up @@ -2772,9 +2856,20 @@ Result<TabletIdCDCCheckpointMap> CDCServiceImpl::PopulateTabletCheckPointInfo(
streams_with_tablet_entries_to_be_deleted.insert(stream_id);
}

ProcessEntry(
entry, stream_metadata, tablet_peer, tablet_min_checkpoint_map,
&slot_entries_to_be_deleted, namespace_to_min_record_id_commit_time);
if (stream_metadata.GetSourceType() == CDCSDK) {
auto tablet_peer = context_->LookupTablet(tablet_id);
if (!tablet_peer) {
LOG(WARNING) << "Could not find tablet peer for tablet_id: " << tablet_id
<< ". Will not update its peers in this round";
continue;
}

ProcessEntryForCdcsdk(
entry, stream_metadata, tablet_peer, cdcsdk_min_checkpoint_map,
&slot_entries_to_be_deleted, namespace_to_min_record_id_commit_time);
} else if (stream_metadata.GetSourceType() == XCLUSTER) {
ProcessEntryForXCluster(entry, xcluster_tablet_min_opid_map);
}
}

RETURN_NOT_OK(iteration_status);
Expand All @@ -2790,7 +2885,7 @@ Result<TabletIdCDCCheckpointMap> CDCServiceImpl::PopulateTabletCheckPointInfo(

YB_LOG_EVERY_N_SECS(INFO, 300) << "Read " << count << " records from "
<< kCdcStateTableName;
return tablet_min_checkpoint_map;
return Status::OK();
}

void CDCServiceImpl::UpdateTabletPeersWithMaxCheckpoint(
Expand Down Expand Up @@ -3084,23 +3179,32 @@ void CDCServiceImpl::UpdatePeersAndMetrics() {
// if we fail to read cdc_state table, lets wait for the next retry after 60 secs.
TabletIdStreamIdSet cdc_state_entries_to_delete;
StreamIdSet slot_entries_to_be_deleted;
auto result =
PopulateTabletCheckPointInfo(cdc_state_entries_to_delete, slot_entries_to_be_deleted);
if (!result.ok()) {
LOG(WARNING) << "Failed to populate tablets checkpoint info: " << result.status();
TabletIdCDCCheckpointMap cdcsdk_min_checkpoint_map;
std::unordered_map<TabletId, OpId> xcluster_tablet_min_opid_map;
const auto now = MonoTime::Now();

auto status = PopulateTabletCheckPointInfo(
cdcsdk_min_checkpoint_map, xcluster_tablet_min_opid_map,
cdc_state_entries_to_delete, slot_entries_to_be_deleted);
if (!status.ok()) {
LOG(WARNING) << "Failed to populate tablets checkpoint info: " << status;
continue;
}
TabletIdCDCCheckpointMap& tablet_checkpoint_map = *result;
VLOG(3) << "List of tablets with checkpoint info read from cdc_state table: "
<< tablet_checkpoint_map.size();

VLOG(3) << "Number of cdcsdk tablets with checkpoint info read from cdc_state table: "
<< cdcsdk_min_checkpoint_map.size()
<< "\n Number of xcluster tablets with checkpoint info read from cdc_state table: "
<< xcluster_tablet_min_opid_map.size();

UpdateXClusterReplicationMaps(std::move(xcluster_tablet_min_opid_map), now);

// Collect and remove entries for the tablet_ids for which we will set the checkpoint as
// 'OpId::Max' from 'tablet_checkpoint_map', into 'tablet_ids_with_max_checkpoint'.
// 'OpId::Max' from 'cdcsdk_min_checkpoint_map', into 'tablet_ids_with_max_checkpoint'.
std::unordered_set<TabletId> tablet_ids_with_max_checkpoint;
FilterOutTabletsToBeDeletedByAllStreams(
&tablet_checkpoint_map, &tablet_ids_with_max_checkpoint);
&cdcsdk_min_checkpoint_map, &tablet_ids_with_max_checkpoint);

UpdateTabletPeersWithMinReplicatedIndex(&tablet_checkpoint_map);
UpdateTabletPeersWithMinReplicatedIndex(&cdcsdk_min_checkpoint_map);

{
YB_LOG_EVERY_N_SECS(INFO, 300)
Expand All @@ -3122,6 +3226,16 @@ void CDCServiceImpl::UpdatePeersAndMetrics() {
} while (sleep_while_not_stopped());
}

void CDCServiceImpl::UpdateXClusterReplicationMaps(
std::unordered_map<TabletId, OpId> new_map,
const MonoTime& last_refresh_time) {
std::lock_guard l(xcluster_replication_maps_mutex_);

xcluster_tablet_min_opid_map_.swap(new_map);

xcluster_map_last_refresh_time_ = last_refresh_time;
}

Status CDCServiceImpl::DeleteCDCStateTableMetadata(
const TabletIdStreamIdSet& cdc_state_entries_to_delete,
const std::unordered_set<TabletId>& failed_tablet_ids,
Expand Down
40 changes: 36 additions & 4 deletions src/yb/cdc/cdc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class CDCServiceImpl : public CDCServiceIf {

Result<TabletCheckpoint> TEST_GetTabletInfoFromCache(const TabletStreamInfo& producer_tablet);

void ProcessEntry(
void ProcessEntryForCdcsdk(
const CDCStateTableEntry& entry,
const StreamMetadata& stream_metadata,
const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
Expand All @@ -169,6 +169,10 @@ class CDCServiceImpl : public CDCServiceIf {
const std::unordered_map<NamespaceId, uint64_t>& namespace_to_min_record_id_commit_time =
std::unordered_map<NamespaceId, uint64_t>{});

void ProcessEntryForXCluster(
const CDCStateTableEntry& entry,
std::unordered_map<TabletId, OpId>& xcluster_tablet_min_opid_map);

// Update peers in other tablet servers about the latest minimum applied cdc index for a specific
// tablet.
void UpdateCdcReplicatedIndex(
Expand Down Expand Up @@ -270,6 +274,16 @@ class CDCServiceImpl : public CDCServiceIf {
const GetChangesResponsePB& resp, const TabletStreamInfo& producer_tablet,
const std::shared_ptr<tablet::TabletPeer>& tablet_peer);

// Retrieves the cdc_min_replicated_index for a given tablet.
// Returns the min index that is still required for xCluster replication.
// Returns max if the tablet is not replicated by xCluster.
int64_t GetXClusterMinRequiredIndex(const TabletId& tablet_id)
EXCLUDES(xcluster_replication_maps_mutex_);

auto GetXClusterMinRequiredIndexFunc() {
return std::bind_front(&CDCServiceImpl::GetXClusterMinRequiredIndex, this);
}

private:
friend class XClusterProducerBootstrap;
friend class CDCSDKVirtualWAL;
Expand Down Expand Up @@ -403,8 +417,11 @@ class CDCServiceImpl : public CDCServiceIf {
// Called periodically default 1s.
void UpdateMetrics();

// This method is used to read the cdc_state table to find the minimum replicated index for each
// tablet and then update the peers' log objects. Also used to update lag metrics.
// This method runs following tasks when xCluster or CDCSDK is enabled.
// 1.For every second, it invokes UpdateMetrics()
// 2.For every minute, it reads the cdc_state table to collect the min checkpoint info for
// each tablet. Next, it caches the min checkpoint info for tablets under XCluster replication
// then it records the checkpoint info in the tablet if it is under CDCSDK replication.
void UpdatePeersAndMetrics();

Status GetTabletIdsToPoll(
Expand Down Expand Up @@ -449,7 +466,9 @@ class CDCServiceImpl : public CDCServiceIf {
const CDCStateTableRange& table_range, Status* iteration_status,
StreamIdSet* slot_entries_to_be_deleted);

Result<TabletIdCDCCheckpointMap> PopulateTabletCheckPointInfo(
Status PopulateTabletCheckPointInfo(
TabletIdCDCCheckpointMap& cdcsdk_min_checkpoint_map,
std::unordered_map<TabletId, OpId>& xcluster_tablet_min_opid_map,
TabletIdStreamIdSet& tablet_stream_to_be_deleted, StreamIdSet& slot_entries_to_be_deleted);

Result<TabletCDCCheckpointInfo> PopulateCDCSDKTabletCheckPointInfo(
Expand Down Expand Up @@ -500,6 +519,10 @@ class CDCServiceImpl : public CDCServiceIf {
void LogGetChangesLagForCDCSDK(
const xrepl::StreamId& stream_id, const GetChangesResponsePB& resp);

void UpdateXClusterReplicationMaps(
std::unordered_map<TabletId, OpId> new_map,
const MonoTime& last_refresh_time) EXCLUDES(xcluster_replication_maps_mutex_);

rpc::Rpcs rpcs_;

std::unique_ptr<CDCServiceContext> context_;
Expand Down Expand Up @@ -561,6 +584,15 @@ class CDCServiceImpl : public CDCServiceIf {
// Map of session_id (uint64) to VirtualWAL instance.
std::unordered_map<uint64_t, std::shared_ptr<CDCSDKVirtualWAL>> session_virtual_wal_
GUARDED_BY(mutex_);

mutable rw_spinlock xcluster_replication_maps_mutex_;

// Cached minimum opid for each tablet under xCluster replication.
std::unordered_map<TabletId, OpId> xcluster_tablet_min_opid_map_
GUARDED_BY(xcluster_replication_maps_mutex_);

MonoTime xcluster_map_last_refresh_time_ GUARDED_BY(xcluster_replication_maps_mutex_)
= MonoTime::Min();
};

} // namespace cdc
Expand Down
1 change: 0 additions & 1 deletion src/yb/consensus/consensus_peers-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ class ConsensusPeersTest : public YBTest {
log_thread_pool_.get(),
log_thread_pool_.get(),
log_thread_pool_.get(),
std::numeric_limits<int64_t>::max(), // cdc_min_replicated_index
&log_));
clock_.reset(new server::HybridClock());
ASSERT_OK(clock_->Init());
Expand Down
1 change: 0 additions & 1 deletion src/yb/consensus/consensus_queue-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ class ConsensusQueueTest : public YBTest {
log_thread_pool_.get(),
log_thread_pool_.get(),
log_thread_pool_.get(),
std::numeric_limits<int64_t>::max(), // cdc_min_replicated_index
&log_));
clock_.reset(new server::HybridClock());
ASSERT_OK(clock_->Init());
Expand Down
1 change: 0 additions & 1 deletion src/yb/consensus/log-dump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ Status FilterLogSegment(const string& segment_path) {
log_thread_pool.get(),
log_thread_pool.get(),
log_thread_pool.get(),
/* cdc_min_replicated_index */ 0,
&log));

auto read_entries = segment->ReadEntries();
Expand Down
1 change: 0 additions & 1 deletion src/yb/consensus/log-test-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ class LogTestBase : public YBTest {
log_thread_pool_.get(),
log_thread_pool_.get(),
log_thread_pool_.get(),
std::numeric_limits<int64_t>::max(), // cdc_min_replicated_index
&log_));
LOG(INFO) << "Sucessfully opened the log at " << tablet_wal_path_;
}
Expand Down
Loading

0 comments on commit 69d4052

Please sign in to comment.