Skip to content

Commit

Permalink
[BACKPORT 2.12.9][#13770] CDCSDK: Intents are getting GCed after Tabl…
Browse files Browse the repository at this point in the history
…et LEADER changes

Summary:
"Original commit: 8eaec8c/D19149"
CDC Service retains intents for a tablet based on the flag //cdc_intent_retention_ms// (default value is 4 hours), to track the intent expiration per tablet level, there is a separate thread //UpdatePeersAndMetrics// which periodically does the following:- 
1. Read //cdc_state// table using the method //PopulateTabletCheckPointInfo//.Each row in the cdc_state table is uniquely identified by the tablet_id, and stream_id pair. so it calculates the minimum checkpoint among all active streams belonging to the same tablet_id. and max remaining time among all streams by reading the tablet LEADER cache and creating map // std::unordered_map<TabletId, TabletCDCCheckpointInfo>. //

2. The above map is passed to the method //UpdateTabletPeersWithMinReplicatedIndex//, which internally does the following:-
    a. pick each tablet_id from the map, if the current Tablet is not a LEADER then it continues for the next tablet_id in the map.
    b. If the current tablet is a LEADER then it sends the minimum checkpoint as well as intent expiration time remaining for the FOLLOWERS tablets.

Above 2 operations are not atomic, that means during step:-1 (//PopulateTabletCheckPointInfo// call) whatever maximum remaining time we have calculated based on the tablet LEADER cache reference and created a map. but when it comes to step:-2(UpdateTabletPeersWithMinReplicatedIndex) the same tablet_id may become FOLLOWER or vice versa.

In our scenario, we have a single stream (stream_1) and single tablet (tabet_1) and 3 tservers(TS-1, TS-2, TS-3).
   i). Initially the Tablet LEADER is present in TS1, the client is called GetChanges, so the UpdatePeersAndMetrics thread will be enabled in TS1. 
  ii). Now there is a LEADER switch happening to TS2, so the UpdatePeersAndMetrics thread will be active in TS2. 
  iii) now TS1 which is now a FOLLOWER, has UpdatePeersAndMetrics activated which will periodically do the above 2 steps described.

In our FAILURE scenario TS1 which is FOLLOWER, create a map as part //PopulateTabletCheckPointInfo// call with active time remaining set 0, because it doesn't contain the tablet LEADER. Before //UpdateTabletPeersWithMinReplicatedIndex// call TS1 is becoming LEADER so as part of the above description for step:-2, the LEADER tablet will send the remaining expiration as 0, causing whole intents GCed.

To handle this scenario, in step:-1(//PopulateTabletCheckPointInfo//), the maximum active time we will calculate based on tablet cache(either it can be LEADER or FOLLOWER) and in regular interval stream active time we update in the FOLLOWERS cache so that they are in sync.

Test Plan: Running all the c and java testcase

Reviewers: abharadwaj, srangavajjula, skumar

Reviewed By: skumar

Subscribers: ycdcxcluster

Differential Revision: https://phabricator.dev.yugabyte.com/D19181
  • Loading branch information
sureshdash2022-yb committed Aug 25, 2022
1 parent d788192 commit 1170d3a
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 14 deletions.
34 changes: 28 additions & 6 deletions ent/src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,32 @@ class CDCServiceImpl::Impl {
return it->cdc_state_checkpoint.last_active_time;
}

void UpdateFollowerCache(const ProducerTabletInfo& producer_tablet, const OpId& checkpoint) {
auto now = CoarseMonoClock::Now();
SharedLock<rw_spinlock> lock(mutex_);
auto it = tablet_checkpoints_.find(producer_tablet);
if (it == tablet_checkpoints_.end()) {
TabletCheckpoint sent_checkpoint = {
.op_id = OpId::Max(),
.last_update_time = now,
.last_active_time = now,
};
TabletCheckpoint commit_checkpoint = {
.op_id = checkpoint,
.last_update_time = now,
.last_active_time = now,
};
tablet_checkpoints_.emplace(TabletCheckpointInfo{
.producer_tablet_info = producer_tablet,
.cdc_state_checkpoint = commit_checkpoint,
.sent_checkpoint = sent_checkpoint,
.mem_tracker = nullptr,
});
} else {
it->cdc_state_checkpoint.last_active_time = now;
}
}

Status CheckStreamActive(const ProducerTabletInfo& producer_tablet) {
SharedLock<rw_spinlock> l(mutex_);
auto it = tablet_checkpoints_.find(producer_tablet);
Expand Down Expand Up @@ -1744,11 +1770,7 @@ Result<std::shared_ptr<client::TableHandle>> CDCServiceImpl::GetCdcStateTable()
// Check stream associated with the tablet is active or not.
// Don't consider those inactive stream for the min_checkpoint calculation.
CoarseTimePoint latest_active_time = CoarseTimePoint ::min();
// if current tsever is the tablet LEADER, send the FOLLOWER tablets to
// update their active_time in their CDCService Cache.
std::shared_ptr<tablet::TabletPeer> tablet_peer;
Status s = tablet_manager_->GetTabletPeer(tablet_id, &tablet_peer);
if (s.ok() && record.source_type == CDCSDK && IsTabletPeerLeader(tablet_peer)) {
if (record.source_type == CDCSDK) {
auto status = impl_->CheckStreamActive(producer_tablet);
if (!status.ok()) {
// Inactive stream read from cdc_state table are not considered for the minimum
Expand Down Expand Up @@ -2206,7 +2228,7 @@ Result<std::shared_ptr<client::TableHandle>> CDCServiceImpl::GetCdcStateTable()
for (int stream_idx = 0; stream_idx < req->stream_ids_size(); stream_idx++) {
ProducerTabletInfo producer_tablet = {
"" /* UUID */, req->stream_ids(stream_idx), req->tablet_ids(i)};
impl_->UpdateActiveTime(producer_tablet);
impl_->UpdateFollowerCache(producer_tablet, cdc_sdk_op);
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions ent/src/yb/cdc/cdcsdk_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,18 @@ CHECKED_STATUS ProcessIntents(
auto tablet = tablet_peer->shared_tablet();
RETURN_NOT_OK(tablet->GetIntents(transaction_id, keyValueIntents, stream_state));

const OpId& checkpoint_op_id = tablet_peer->GetLatestCheckPoint();
if ((*keyValueIntents).size() == 0 && op_id <= checkpoint_op_id) {
LOG(ERROR) << "CDCSDK is trying to get intents for a transaction: " << transaction_id
<< ", whose Apply record's OpId " << op_id
<< "is lesser than the checkpoint in the tablet peer: " << checkpoint_op_id
<< ", on tablet: " << tablet_peer->tablet_id()
<< ". The intents would have already been removed from IntentsDB.";
return STATUS_FORMAT(
InternalError, "CDCSDK Trying to fetch already GCed intents for transaction $0",
transaction_id);
}

for (auto& keyValue : *keyValueIntents) {
docdb::SubDocKey sub_doc_key;
CHECK_OK(
Expand Down
3 changes: 2 additions & 1 deletion src/yb/docdb/docdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1394,7 +1394,8 @@ Result<ApplyTransactionState> GetIntentsBatch(
intent_iter.Seek(reverse_index_value);
if (!intent_iter.Valid() || intent_iter.key() != reverse_index_value) {
LOG(WARNING) << "Unable to find intent: " << reverse_index_value.ToDebugHexString()
<< " for " << key_slice.ToDebugHexString();
<< " for " << key_slice.ToDebugHexString()
<< ", transactionId: " << transaction_id;
return ApplyTransactionState{};
}

Expand Down
8 changes: 8 additions & 0 deletions src/yb/tablet/tablet_peer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,14 @@ CoarseTimePoint TabletPeer::cdc_sdk_min_checkpoint_op_id_expiration() {
return CoarseTimePoint();
}

OpId TabletPeer::GetLatestCheckPoint() {
auto txn_participant = tablet()->transaction_participant();
if (txn_participant) {
return txn_participant->GetLatestCheckPoint();
}
return OpId();
}

Status TabletPeer::SetCDCSDKRetainOpIdAndTime(
const OpId& cdc_sdk_op_id, const MonoDelta& cdc_sdk_op_id_expiration) {
if (cdc_sdk_op_id == OpId::Invalid()) {
Expand Down
2 changes: 2 additions & 0 deletions src/yb/tablet/tablet_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ class TabletPeer : public consensus::ConsensusContext,

Result<MonoDelta> GetCDCSDKIntentRetainTime(const CoarseTimePoint& cdc_sdk_latest_active_time);

OpId GetLatestCheckPoint();

TableType table_type();

// Returns the number of segments in log_.
Expand Down
30 changes: 23 additions & 7 deletions src/yb/tablet/transaction_participant.cc
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,25 @@ class TransactionParticipant::Impl
CleanTransactionsUnlocked(&min_running_notifier);
}

OpId GetLatestCheckPoint() REQUIRES(mutex_) {
return GetLatestCheckPointUnlocked();
}

OpId GetLatestCheckPointUnlocked() {
OpId min_checkpoint;
if (CoarseMonoClock::Now() < cdc_sdk_min_checkpoint_op_id_expiration_ &&
cdc_sdk_min_checkpoint_op_id_ != OpId::Invalid()) {
min_checkpoint = cdc_sdk_min_checkpoint_op_id_;
} else {
VLOG(1) << "Tablet peer checkpoint is expired with the current time: "
<< ToSeconds(CoarseMonoClock::Now().time_since_epoch()) << " expiration time: "
<< ToSeconds(cdc_sdk_min_checkpoint_op_id_expiration_.time_since_epoch())
<< " checkpoint op_id: " << cdc_sdk_min_checkpoint_op_id_;
min_checkpoint = OpId::Max();
}
return min_checkpoint;
}

OpId GetRetainOpId() {
std::lock_guard<std::mutex> lock(mutex_);
return cdc_sdk_min_checkpoint_op_id_;
Expand Down Expand Up @@ -1523,13 +1542,6 @@ class TransactionParticipant::Impl
}
}

OpId GetLatestCheckPoint() REQUIRES(mutex_) {
return CoarseMonoClock::Now() < cdc_sdk_min_checkpoint_op_id_expiration_ &&
cdc_sdk_min_checkpoint_op_id_ != OpId::Invalid()
? cdc_sdk_min_checkpoint_op_id_
: OpId::Max();
}

TransactionStatusResolver& AddStatusResolver() override EXCLUDES(status_resolvers_mutex_) {
std::lock_guard<std::mutex> lock(status_resolvers_mutex_);
status_resolvers_.emplace_back(
Expand Down Expand Up @@ -1819,5 +1831,9 @@ CoarseTimePoint TransactionParticipant::GetCheckpointExpirationTime() const {
return impl_->GetCheckpointExpirationTime();
}

OpId TransactionParticipant::GetLatestCheckPoint() const {
return impl_->GetLatestCheckPointUnlocked();
}

} // namespace tablet
} // namespace yb
2 changes: 2 additions & 0 deletions src/yb/tablet/transaction_participant.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ class TransactionParticipant : public TransactionStatusManager {

CoarseTimePoint GetCheckpointExpirationTime() const;

OpId GetLatestCheckPoint() const;

const TabletId& tablet_id() const override;

size_t TEST_GetNumRunningTransactions() const;
Expand Down

0 comments on commit 1170d3a

Please sign in to comment.