diff --git a/ent/src/yb/cdc/cdc_service.cc b/ent/src/yb/cdc/cdc_service.cc index 86e9c36b0100..543efb370cce 100644 --- a/ent/src/yb/cdc/cdc_service.cc +++ b/ent/src/yb/cdc/cdc_service.cc @@ -541,6 +541,32 @@ class CDCServiceImpl::Impl { return it->cdc_state_checkpoint.last_active_time; } + void UpdateFollowerCache(const ProducerTabletInfo& producer_tablet, const OpId& checkpoint) { + auto now = CoarseMonoClock::Now(); + SharedLock lock(mutex_); + auto it = tablet_checkpoints_.find(producer_tablet); + if (it == tablet_checkpoints_.end()) { + TabletCheckpoint sent_checkpoint = { + .op_id = OpId::Max(), + .last_update_time = now, + .last_active_time = now, + }; + TabletCheckpoint commit_checkpoint = { + .op_id = checkpoint, + .last_update_time = now, + .last_active_time = now, + }; + tablet_checkpoints_.emplace(TabletCheckpointInfo{ + .producer_tablet_info = producer_tablet, + .cdc_state_checkpoint = commit_checkpoint, + .sent_checkpoint = sent_checkpoint, + .mem_tracker = nullptr, + }); + } else { + it->cdc_state_checkpoint.last_active_time = now; + } + } + Status CheckStreamActive(const ProducerTabletInfo& producer_tablet) { SharedLock l(mutex_); auto it = tablet_checkpoints_.find(producer_tablet); @@ -1744,11 +1770,7 @@ Result> CDCServiceImpl::GetCdcStateTable() // Check stream associated with the tablet is active or not. // Don't consider those inactive stream for the min_checkpoint calculation. CoarseTimePoint latest_active_time = CoarseTimePoint ::min(); - // if current tsever is the tablet LEADER, send the FOLLOWER tablets to - // update their active_time in their CDCService Cache. - std::shared_ptr tablet_peer; - Status s = tablet_manager_->GetTabletPeer(tablet_id, &tablet_peer); - if (s.ok() && record.source_type == CDCSDK && IsTabletPeerLeader(tablet_peer)) { + if (record.source_type == CDCSDK) { auto status = impl_->CheckStreamActive(producer_tablet); if (!status.ok()) { // Inactive stream read from cdc_state table are not considered for the minimum @@ -2206,7 +2228,7 @@ Result> CDCServiceImpl::GetCdcStateTable() for (int stream_idx = 0; stream_idx < req->stream_ids_size(); stream_idx++) { ProducerTabletInfo producer_tablet = { "" /* UUID */, req->stream_ids(stream_idx), req->tablet_ids(i)}; - impl_->UpdateActiveTime(producer_tablet); + impl_->UpdateFollowerCache(producer_tablet, cdc_sdk_op); } } } diff --git a/ent/src/yb/cdc/cdcsdk_producer.cc b/ent/src/yb/cdc/cdcsdk_producer.cc index 5d470803e481..bd6967c460ff 100644 --- a/ent/src/yb/cdc/cdcsdk_producer.cc +++ b/ent/src/yb/cdc/cdcsdk_producer.cc @@ -476,6 +476,18 @@ CHECKED_STATUS ProcessIntents( auto tablet = tablet_peer->shared_tablet(); RETURN_NOT_OK(tablet->GetIntents(transaction_id, keyValueIntents, stream_state)); + const OpId& checkpoint_op_id = tablet_peer->GetLatestCheckPoint(); + if ((*keyValueIntents).size() == 0 && op_id <= checkpoint_op_id) { + LOG(ERROR) << "CDCSDK is trying to get intents for a transaction: " << transaction_id + << ", whose Apply record's OpId " << op_id + << "is lesser than the checkpoint in the tablet peer: " << checkpoint_op_id + << ", on tablet: " << tablet_peer->tablet_id() + << ". The intents would have already been removed from IntentsDB."; + return STATUS_FORMAT( + InternalError, "CDCSDK Trying to fetch already GCed intents for transaction $0", + transaction_id); + } + for (auto& keyValue : *keyValueIntents) { docdb::SubDocKey sub_doc_key; CHECK_OK( diff --git a/src/yb/docdb/docdb.cc b/src/yb/docdb/docdb.cc index a7d50083df06..7a94bec10f3a 100644 --- a/src/yb/docdb/docdb.cc +++ b/src/yb/docdb/docdb.cc @@ -1394,7 +1394,8 @@ Result GetIntentsBatch( intent_iter.Seek(reverse_index_value); if (!intent_iter.Valid() || intent_iter.key() != reverse_index_value) { LOG(WARNING) << "Unable to find intent: " << reverse_index_value.ToDebugHexString() - << " for " << key_slice.ToDebugHexString(); + << " for " << key_slice.ToDebugHexString() + << ", transactionId: " << transaction_id; return ApplyTransactionState{}; } diff --git a/src/yb/tablet/tablet_peer.cc b/src/yb/tablet/tablet_peer.cc index 923e45f8e699..73e3a61daf6f 100644 --- a/src/yb/tablet/tablet_peer.cc +++ b/src/yb/tablet/tablet_peer.cc @@ -1019,6 +1019,14 @@ CoarseTimePoint TabletPeer::cdc_sdk_min_checkpoint_op_id_expiration() { return CoarseTimePoint(); } +OpId TabletPeer::GetLatestCheckPoint() { + auto txn_participant = tablet()->transaction_participant(); + if (txn_participant) { + return txn_participant->GetLatestCheckPoint(); + } + return OpId(); +} + Status TabletPeer::SetCDCSDKRetainOpIdAndTime( const OpId& cdc_sdk_op_id, const MonoDelta& cdc_sdk_op_id_expiration) { if (cdc_sdk_op_id == OpId::Invalid()) { diff --git a/src/yb/tablet/tablet_peer.h b/src/yb/tablet/tablet_peer.h index 9d78cdc94117..0365fe478337 100644 --- a/src/yb/tablet/tablet_peer.h +++ b/src/yb/tablet/tablet_peer.h @@ -391,6 +391,8 @@ class TabletPeer : public consensus::ConsensusContext, Result GetCDCSDKIntentRetainTime(const CoarseTimePoint& cdc_sdk_latest_active_time); + OpId GetLatestCheckPoint(); + TableType table_type(); // Returns the number of segments in log_. diff --git a/src/yb/tablet/transaction_participant.cc b/src/yb/tablet/transaction_participant.cc index 37ea6ce2fd6c..08439c504354 100644 --- a/src/yb/tablet/transaction_participant.cc +++ b/src/yb/tablet/transaction_participant.cc @@ -377,6 +377,25 @@ class TransactionParticipant::Impl CleanTransactionsUnlocked(&min_running_notifier); } + OpId GetLatestCheckPoint() REQUIRES(mutex_) { + return GetLatestCheckPointUnlocked(); + } + + OpId GetLatestCheckPointUnlocked() { + OpId min_checkpoint; + if (CoarseMonoClock::Now() < cdc_sdk_min_checkpoint_op_id_expiration_ && + cdc_sdk_min_checkpoint_op_id_ != OpId::Invalid()) { + min_checkpoint = cdc_sdk_min_checkpoint_op_id_; + } else { + VLOG(1) << "Tablet peer checkpoint is expired with the current time: " + << ToSeconds(CoarseMonoClock::Now().time_since_epoch()) << " expiration time: " + << ToSeconds(cdc_sdk_min_checkpoint_op_id_expiration_.time_since_epoch()) + << " checkpoint op_id: " << cdc_sdk_min_checkpoint_op_id_; + min_checkpoint = OpId::Max(); + } + return min_checkpoint; + } + OpId GetRetainOpId() { std::lock_guard lock(mutex_); return cdc_sdk_min_checkpoint_op_id_; @@ -1523,13 +1542,6 @@ class TransactionParticipant::Impl } } - OpId GetLatestCheckPoint() REQUIRES(mutex_) { - return CoarseMonoClock::Now() < cdc_sdk_min_checkpoint_op_id_expiration_ && - cdc_sdk_min_checkpoint_op_id_ != OpId::Invalid() - ? cdc_sdk_min_checkpoint_op_id_ - : OpId::Max(); - } - TransactionStatusResolver& AddStatusResolver() override EXCLUDES(status_resolvers_mutex_) { std::lock_guard lock(status_resolvers_mutex_); status_resolvers_.emplace_back( @@ -1819,5 +1831,9 @@ CoarseTimePoint TransactionParticipant::GetCheckpointExpirationTime() const { return impl_->GetCheckpointExpirationTime(); } +OpId TransactionParticipant::GetLatestCheckPoint() const { + return impl_->GetLatestCheckPointUnlocked(); +} + } // namespace tablet } // namespace yb diff --git a/src/yb/tablet/transaction_participant.h b/src/yb/tablet/transaction_participant.h index 793f8366c9da..51e9859731ff 100644 --- a/src/yb/tablet/transaction_participant.h +++ b/src/yb/tablet/transaction_participant.h @@ -212,6 +212,8 @@ class TransactionParticipant : public TransactionStatusManager { CoarseTimePoint GetCheckpointExpirationTime() const; + OpId GetLatestCheckPoint() const; + const TabletId& tablet_id() const override; size_t TEST_GetNumRunningTransactions() const;