From 2b8a52bb7c65e97138a3683b2fd0822c556ad353 Mon Sep 17 00:00:00 2001 From: Adithya Bharadwaj Date: Wed, 31 Aug 2022 00:09:28 +0530 Subject: [PATCH] [#13693] CDCSDK: Add last_active_time to cdc_state table Summary: When CDC Clients are not running for a long period of time, we don't want to retain intents as it may cause a resource utilization issue in a YugabyteDB cluster. We have a GFLAG cdc_intent_retention_ms (default value of 4 hours) that determines when can we mark a stream as inactive. To determine if a stream is active, we track the timestamp of the GetChanges API call for the stream/tablet pair. We introduce last_active_time in the cdc_state table and store this value in the table against each stream/tablet pair. To improve performance, we also maintain an in-memory cache to ensure we don't write this into cdc_state table every time. The TabletPeer tracks the 'min' OpId and the 'oldest' active time among all the 'active' streams. If all the streams are inactive, the min OpId is returned as OpId::max() to ensure all the intents are garbage collected. This information is also sent to follower tablets along with min OpId so that they can also decide when to clear the intents. Test Plan: Jenkins: skip Existing cdcsdk test cases Reviewers: sdash, srangavajjula, mbautin, skumar Reviewed By: skumar Subscribers: bogdan Differential Revision: https://phabricator.dev.yugabyte.com/D19201 --- ent/src/yb/cdc/cdc_service.cc | 385 +++++++++++------- ent/src/yb/cdc/cdc_service.h | 33 +- .../yb/integration-tests/cdcsdk_ysql-test.cc | 260 +++++++++--- ent/src/yb/master/catalog_manager_ent.cc | 8 + src/yb/client/table_handle.cc | 21 + src/yb/client/table_handle.h | 7 + src/yb/tablet/tablet_peer.cc | 8 +- src/yb/tablet/tablet_peer.h | 2 +- 8 files changed, 493 insertions(+), 231 deletions(-) diff --git a/ent/src/yb/cdc/cdc_service.cc b/ent/src/yb/cdc/cdc_service.cc index 507ec47cff41..722401ae4bee 100644 --- a/ent/src/yb/cdc/cdc_service.cc +++ b/ent/src/yb/cdc/cdc_service.cc @@ -72,6 +72,7 @@ #include "yb/util/shared_lock.h" #include "yb/util/status_format.h" #include "yb/util/status_log.h" +#include "yb/util/stol_utils.h" #include "yb/util/trace.h" #include "yb/yql/cql/ql/util/statement_result.h" @@ -324,23 +325,25 @@ class CDCServiceImpl::Impl { .tablet_id = tablet_id }; CoarseTimePoint time; + int64_t active_time; if (producer_entries_modified) { producer_entries_modified->push_back(producer_tablet); time = CoarseMonoClock::Now(); + active_time = GetCurrentTimeMicros(); } else { time = CoarseTimePoint::min(); + active_time = 0; } std::lock_guard l(mutex_); if (!producer_entries_modified && tablet_checkpoints_.count(producer_tablet)) { return; } - tablet_checkpoints_.emplace(TabletCheckpointInfo { - .producer_tablet_info = producer_tablet, - .cdc_state_checkpoint = {op_id, time, time}, - .sent_checkpoint = {op_id, time, time}, - .mem_tracker = nullptr, - }); + tablet_checkpoints_.emplace(TabletCheckpointInfo{ + .producer_tablet_info = producer_tablet, + .cdc_state_checkpoint = {op_id, time, active_time}, + .sent_checkpoint = {op_id, time, active_time}, + .mem_tracker = nullptr}); } void EraseTablets(const std::vector& producer_entries_modified, @@ -354,6 +357,35 @@ class CDCServiceImpl::Impl { } } + boost::optional GetLastActiveTime(const ProducerTabletInfo& producer_tablet) { + SharedLock lock(mutex_); + auto it = tablet_checkpoints_.find(producer_tablet); + if (it != tablet_checkpoints_.end()) { + // Use last_active_time from cache only if it is current. + if (it->cdc_state_checkpoint.last_active_time > 0) { + if (!it->cdc_state_checkpoint.ExpiredAt( + FLAGS_cdc_state_checkpoint_update_interval_ms * 1ms, CoarseMonoClock::Now())) { + VLOG(2) << "Found recent entry in cache with active time: " + << it->cdc_state_checkpoint.last_active_time + << ", for tablet: " << producer_tablet.tablet_id + << ", and stream: " << producer_tablet.stream_id; + return it->cdc_state_checkpoint.last_active_time; + } else { + VLOG(2) << "Found stale entry in cache with active time: " + << it->cdc_state_checkpoint.last_active_time + << ", for tablet: " << producer_tablet.tablet_id + << ", and stream: " << producer_tablet.stream_id + << ". We will read from the cdc_state table"; + } + } + } else { + VLOG(1) << "Did not find entry in 'tablet_checkpoints_' cache for tablet: " + << producer_tablet.tablet_id << ", stream: " << producer_tablet.stream_id; + } + + return boost::none; + } + boost::optional GetLastCheckpoint(const ProducerTabletInfo& producer_tablet) { SharedLock lock(mutex_); auto it = tablet_checkpoints_.find(producer_tablet); @@ -373,16 +405,17 @@ class CDCServiceImpl::Impl { const OpId& commit_op_id) { VLOG(1) << "Going to update the checkpoint with " << commit_op_id; auto now = CoarseMonoClock::Now(); + auto active_time = GetCurrentTimeMicros(); TabletCheckpoint sent_checkpoint = { .op_id = sent_op_id, .last_update_time = now, - .last_active_time = now, + .last_active_time = active_time, }; TabletCheckpoint commit_checkpoint = { .op_id = commit_op_id, .last_update_time = now, - .last_active_time = now, + .last_active_time = active_time, }; std::lock_guard l(mutex_); @@ -537,89 +570,6 @@ class CDCServiceImpl::Impl { return tablet_checkpoints_; } - CoarseTimePoint GetLatestActiveTime( - const ProducerTabletInfo& producer_tablet, const OpId& checkpoint) { - auto now = CoarseMonoClock::Now(); - TabletCheckpoint sent_checkpoint = { - .op_id = OpId::Max(), - .last_update_time = now, - .last_active_time = now, - }; - TabletCheckpoint commit_checkpoint = { - .op_id = checkpoint, - .last_update_time = now, - .last_active_time = now, - }; - - SharedLock lock(mutex_); - auto it = tablet_checkpoints_.find(producer_tablet); - // This happens when node is restarted, but Getchange is called for some other stream, not for - // this stream, so there is no cache entry for the searched producer_tablet. In which case we - // create an entry for this producer_tablet. - if (it == tablet_checkpoints_.end()) { - tablet_checkpoints_.emplace(TabletCheckpointInfo{ - .producer_tablet_info = producer_tablet, - .cdc_state_checkpoint = commit_checkpoint, - .sent_checkpoint = sent_checkpoint, - .mem_tracker = nullptr, - }); - } - it = tablet_checkpoints_.find(producer_tablet); - return it->cdc_state_checkpoint.last_active_time; - } - - void UpdateFollowerCache(const ProducerTabletInfo& producer_tablet, const OpId& checkpoint) { - auto now = CoarseMonoClock::Now(); - SharedLock lock(mutex_); - auto it = tablet_checkpoints_.find(producer_tablet); - if (it == tablet_checkpoints_.end()) { - TabletCheckpoint sent_checkpoint = { - .op_id = OpId::Max(), - .last_update_time = now, - .last_active_time = now, - }; - TabletCheckpoint commit_checkpoint = { - .op_id = checkpoint, - .last_update_time = now, - .last_active_time = now, - }; - tablet_checkpoints_.emplace(TabletCheckpointInfo{ - .producer_tablet_info = producer_tablet, - .cdc_state_checkpoint = commit_checkpoint, - .sent_checkpoint = sent_checkpoint, - .mem_tracker = nullptr, - }); - } else { - it->cdc_state_checkpoint.last_active_time = now; - } - } - - Status CheckStreamActive(const ProducerTabletInfo& producer_tablet) { - SharedLock l(mutex_); - auto it = tablet_checkpoints_.find(producer_tablet); - - if (it != tablet_checkpoints_.end()) { - if (it->cdc_state_checkpoint.last_active_time.time_since_epoch().count() != 0 && - CoarseMonoClock::Now() > - it->cdc_state_checkpoint.last_active_time + - MonoDelta::FromMilliseconds(GetAtomicFlag(&FLAGS_cdc_intent_retention_ms))) { - LOG(ERROR) << "Stream ID: " << producer_tablet.stream_id - << " expired for Tablet ID: " << producer_tablet.tablet_id - << " with active time :" - << ToSeconds((it->cdc_state_checkpoint.last_active_time.time_since_epoch())) - << " current time: " << ToSeconds(CoarseMonoClock::Now().time_since_epoch()); - return STATUS_FORMAT( - InternalError, "stream ID $0 is expired for Tablet ID $1", producer_tablet.stream_id, - producer_tablet.tablet_id); - } - VLOG(1) << "Tablet :" << producer_tablet.ToString() - << " found in CDCSerive Cache with active time: " - << ": " << it->cdc_state_checkpoint.last_active_time.time_since_epoch() - << " current time: " << ToSeconds(CoarseMonoClock::Now().time_since_epoch()); - } - return Status::OK(); - } - Result TEST_GetTabletInfoFromCache(const ProducerTabletInfo& producer_tablet) { SharedLock l(mutex_); auto it = tablet_checkpoints_.find(producer_tablet); @@ -634,7 +584,11 @@ class CDCServiceImpl::Impl { SharedLock l(mutex_); auto it = tablet_checkpoints_.find(producer_tablet); if (it != tablet_checkpoints_.end()) { - it->cdc_state_checkpoint.last_active_time = CoarseMonoClock::Now(); + auto active_time = GetCurrentTimeMicros(); + VLOG(2) << "Updating active time for tablet: " << producer_tablet.tablet_id + << ", stream: " << producer_tablet.stream_id << ", as: " << active_time + << ", previous value: " << it->cdc_state_checkpoint.last_active_time; + it->cdc_state_checkpoint.last_active_time = active_time; } } @@ -647,6 +601,12 @@ class CDCServiceImpl::Impl { } } + void ClearCaches() { + std::lock_guard l(mutex_); + tablet_checkpoints_.clear(); + cdc_state_metadata_.clear(); + } + std::unique_ptr async_client_init_; // this will be used for the std::call_once call while caching the client @@ -869,6 +829,8 @@ void CDCServiceImpl::CreateEntryInCdcStateTable( QLAddStringRangeValue(cdc_state_table_write_req, stream_id); cdc_state_table->AddStringColumnValue(cdc_state_table_write_req, master::kCdcCheckpoint, op_id.ToString()); + auto column_id = cdc_state_table->ColumnId(master::kCdcData); + cdc_state_table->AddMapColumnValue(cdc_state_table_write_req, column_id, "active_time", "0"); ops->push_back(std::move(cdc_state_table_op)); impl_->AddTabletCheckpoint(op_id, stream_id, tablet_id, producer_entries_modified); @@ -1141,8 +1103,9 @@ Result CDCServiceImpl::SetCDCCheckpoint( session->SetDeadline(deadline); RETURN_NOT_OK_SET_CODE( - UpdateCheckpoint( - producer_tablet, checkpoint, checkpoint, session, GetCurrentTimeMicros(), true), + UpdateCheckpointAndActiveTime( + producer_tablet, checkpoint, checkpoint, session, GetCurrentTimeMicros(), + CDCRequestSource::CDCSDK, true), CDCError(CDCErrorPB::INTERNAL_ERROR)); if (req.has_initial_checkpoint() || set_latest_entry) { @@ -1333,7 +1296,7 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req, StreamMetadata record = **res; if (record.source_type == CDCSDK) { - auto result = impl_->CheckStreamActive(producer_tablet); + auto result = CheckStreamActive(producer_tablet, session); RPC_STATUS_RETURN_ERROR(result, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); impl_->UpdateActiveTime(producer_tablet); } @@ -1472,9 +1435,9 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req, if (record.checkpoint_type == IMPLICIT) { if (UpdateCheckpointRequired(record, cdc_sdk_op_id)) { RPC_STATUS_RETURN_ERROR( - UpdateCheckpoint( + UpdateCheckpointAndActiveTime( producer_tablet, OpId::FromPB(resp->checkpoint().op_id()), op_id, session, - last_record_hybrid_time), + last_record_hybrid_time, record.source_type), resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); } @@ -1493,7 +1456,6 @@ Status CDCServiceImpl::UpdatePeersCdcMinReplicatedIndex( std::vector servers; RETURN_NOT_OK(GetTServers(tablet_id, &servers)); - auto ts_leader = VERIFY_RESULT(GetLeaderTServer(tablet_id)); for (const auto &server : servers) { if (server->IsLocal()) { // We modify our log directly. Avoid calling itself through the proxy. @@ -1509,18 +1471,20 @@ Status CDCServiceImpl::UpdatePeersCdcMinReplicatedIndex( cdc_checkpoint_min.cdc_sdk_op_id.ToPB(update_index_req.add_cdc_sdk_consumed_ops()); update_index_req.add_cdc_sdk_ops_expiration_ms( cdc_checkpoint_min.cdc_sdk_op_id_expiration.ToMilliseconds()); - // Don't update active time for the TABLET LEADER. Only update in FOLLOWERS. - if (server->permanent_uuid() != ts_leader->permanent_uuid()) { - for (auto& stream_id : cdc_checkpoint_min.active_stream_list) { - update_index_req.add_stream_ids(stream_id); - } - } rpc::RpcController rpc; rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms)); - RETURN_NOT_OK(proxy->UpdateCdcReplicatedIndex(update_index_req, &update_index_resp, &rpc)); - if (update_index_resp.has_error()) { - return StatusFromPB(update_index_resp.error().status()); + auto result = proxy->UpdateCdcReplicatedIndex(update_index_req, &update_index_resp, &rpc); + // If UpdateCdcReplicatedIndex failed for one of the tablet peers, don't stop to update + // the minimum checkpoint to other FOLLOWERs. + if (!result.ok() || update_index_resp.has_error()) { + std::stringstream msg; + msg << "Failed to update cdc replicated index for tablet: " << tablet_id + << " in remote peer: " << server->ToString(); + if (update_index_resp.has_error()) { + msg << ":" << StatusFromPB(update_index_resp.error().status()); + } + LOG(WARNING) << msg.str(); } } return Status::OK(); @@ -1722,9 +1686,9 @@ void SetMinCDCSDKCheckpoint(const OpId& checkpoint, OpId* cdc_sdk_op_id) { } } -void PopulateTabletMinCheckpoint( +void PopulateTabletMinCheckpointAndLatestActiveTime( const string& tablet_id, const OpId& checkpoint, CDCRequestSource cdc_source_type, - const CoarseTimePoint& last_active_time, TabletOpIdMap* tablet_min_checkpoint_index) { + const int64_t& last_active_time, TabletIdCDCCheckpointMap* tablet_min_checkpoint_index) { auto& tablet_info = (*tablet_min_checkpoint_index)[tablet_id]; tablet_info.cdc_op_id = min(tablet_info.cdc_op_id, checkpoint); @@ -1738,8 +1702,8 @@ void PopulateTabletMinCheckpoint( // if (cdc_source_type == CDCSDK) { SetMinCDCSDKCheckpoint(checkpoint, &tablet_info.cdc_sdk_op_id); - tablet_info.cdc_sdk_most_active_time = - max(tablet_info.cdc_sdk_most_active_time, last_active_time); + tablet_info.cdc_sdk_latest_active_time = + max(tablet_info.cdc_sdk_latest_active_time, last_active_time); } } @@ -1750,7 +1714,7 @@ Status CDCServiceImpl::SetInitialCheckPoint( << " and the latest entry OpID is " << tablet_peer->log()->GetLatestEntryOpId(); auto result = PopulateTabletCheckPointInfo(tablet_id); RETURN_NOT_OK_SET_CODE(result, CDCError(CDCErrorPB::INTERNAL_ERROR)); - TabletOpIdMap& tablet_min_checkpoint_map = *result; + TabletIdCDCCheckpointMap& tablet_min_checkpoint_map = *result; auto& tablet_op_id = tablet_min_checkpoint_map[tablet_id]; SetMinCDCSDKCheckpoint(checkpoint, &tablet_op_id.cdc_sdk_op_id); tablet_op_id.cdc_sdk_op_id_expiration = @@ -1772,9 +1736,9 @@ Status CDCServiceImpl::SetInitialCheckPoint( return UpdatePeersCdcMinReplicatedIndex(tablet_id, tablet_op_id); } -Result CDCServiceImpl::PopulateTabletCheckPointInfo( +Result CDCServiceImpl::PopulateTabletCheckPointInfo( const TabletId& input_tablet_id, TabletIdStreamIdSet* tablet_stream_to_be_deleted) { - TabletOpIdMap tablet_min_checkpoint_map; + TabletIdCDCCheckpointMap tablet_min_checkpoint_map; auto cdc_state_table_result = GetCdcStateTable(); if (!cdc_state_table_result.ok()) { @@ -1794,22 +1758,37 @@ Result CDCServiceImpl::PopulateTabletCheckPointInfo( }; options.columns = std::vector{ master::kCdcTabletId, master::kCdcStreamId, master::kCdcCheckpoint, - master::kCdcLastReplicationTime}; + master::kCdcLastReplicationTime, master::kCdcData}; for (const auto& row : client::TableRange(**cdc_state_table_result, options)) { count++; auto tablet_id = row.column(master::kCdcTabletIdIdx).string_value(); auto stream_id = row.column(master::kCdcStreamIdIdx).string_value(); auto checkpoint = row.column(master::kCdcCheckpointIdx).string_value(); + + // Find the minimum checkpoint op_id per tablet. This minimum op_id + // will be passed to LEADER and it's peers for log cache eviction and clean the consumed intents + // in a regular interval. + if (!input_tablet_id.empty() && input_tablet_id != tablet_id) { + continue; + } + std::string last_replicated_time_str; const auto& timestamp_ql_value = row.column(3); if (!timestamp_ql_value.IsNull()) { last_replicated_time_str = timestamp_ql_value.timestamp_value().ToFormattedString(); } + int64_t last_active_time_cdc_state_table; + if (!row.column(4).IsNull()) { + last_active_time_cdc_state_table = VERIFY_RESULT( + CheckedStoInt(row.column(4).map_value().values(0).string_value())); + } + VLOG(1) << "stream_id: " << stream_id << ", tablet_id: " << tablet_id << ", checkpoint: " << checkpoint - << ", last replicated time: " << last_replicated_time_str; + << ", last replicated time: " << last_replicated_time_str + << ", last active time: " << last_active_time_cdc_state_table; // Add the {tablet_id, stream_id} pair to the set if its checkpoint is OpId::Max(). if (tablet_stream_to_be_deleted && checkpoint == OpId::Max().ToString()) { @@ -1824,7 +1803,9 @@ Result CDCServiceImpl::PopulateTabletCheckPointInfo( // is deleted. To update the corresponding tablet PEERs, give an entry in // tablet_min_checkpoint_map which will update cdc_sdk_min_checkpoint_op_id to // OpId::Max()(i.e no need to retain the intents.) - if (tablet_min_checkpoint_map.find(tablet_id) == tablet_min_checkpoint_map.end()) { + if (!tablet_min_checkpoint_map.contains(tablet_id) && + get_stream_metadata.status().IsNotFound()) { + VLOG(2) << "We could not get the metadata for the stream: " << stream_id; auto& tablet_info = tablet_min_checkpoint_map[tablet_id]; tablet_info.cdc_op_id = OpId::Max(); tablet_info.cdc_sdk_op_id = OpId::Max(); @@ -1833,51 +1814,47 @@ Result CDCServiceImpl::PopulateTabletCheckPointInfo( } StreamMetadata& record = **get_stream_metadata; - auto result = OpId::FromString(checkpoint); - if (!result.ok()) { - LOG(WARNING) << "Read invalid op id " << row.column(1).string_value() - << " for tablet " << tablet_id << ": " << result.status(); - continue; - } - - // Find the minimum checkpoint op_id per tablet. This minimum op_id - // will be passed to LEADER and it's peers for log cache eviction and clean the consumed intents - // in a regular interval. - if (!input_tablet_id.empty() && input_tablet_id != tablet_id) { + auto op_id_result = OpId::FromString(checkpoint); + if (!op_id_result.ok()) { + LOG(WARNING) << "Read invalid op id " << row.column(1).string_value() << " for tablet " + << tablet_id << ": " << op_id_result.status(); continue; } + const auto& op_id = *op_id_result; // Check that requested tablet_id is part of the CDC stream. ProducerTabletInfo producer_tablet = {"" /* UUID */, stream_id, tablet_id}; // Check stream associated with the tablet is active or not. // Don't consider those inactive stream for the min_checkpoint calculation. - CoarseTimePoint latest_active_time = CoarseTimePoint ::min(); - // if current tsever is the tablet LEADER, send the FOLLOWER tablets to - // update their active_time in their CDCService Cache. + int64_t latest_active_time = 0; if (record.source_type == CDCSDK) { - auto status = impl_->CheckStreamActive(producer_tablet); + auto session = client()->NewSession(); + auto status = CheckStreamActive(producer_tablet, session, last_active_time_cdc_state_table); if (!status.ok()) { - // Inactive stream read from cdc_state table are not considered for the minimum - // cdc_sdk_op_id calculation except if tablet is associated with a single stream, This is - // required to update the cdc_sdk_op_id_expiration in the tablet_min_checkpoint_map for - // the corresponding tablet, so that the tablet PEERS will be updated with - // cdc_sdk_min_checkpoint_op_id_expiration_ as EXPIRED. + // It is possible that all streams associated with a tablet have expired, in which case we + // have to create a default entry in 'tablet_min_checkpoint_map' corresponding to the + // tablet. This way the fact that all the streams have expired will be communicated to the + // tablet_peer as well, through the method: "UpdateTabletPeersWithMinReplicatedIndex". If + // 'tablet_min_checkpoint_map' already had an entry corresponding to the tablet, then + // either we already saw an inactive stream assocaited with the tablet and created the + // default entry or we saw an active stream and the map has a legitimate entry, in both + // cases repopulating the map is not needed. if (tablet_min_checkpoint_map.find(tablet_id) == tablet_min_checkpoint_map.end()) { + VLOG(2) << "Stream: " << stream_id << ", is expired for tablet: " << tablet_id + << ", hence we are adding default entries to tablet_min_checkpoint_map"; auto& tablet_info = tablet_min_checkpoint_map[tablet_id]; - tablet_info.cdc_op_id = *result; - tablet_info.cdc_sdk_op_id = *result; + tablet_info.cdc_sdk_op_id = OpId::Max(); } continue; } - tablet_min_checkpoint_map[tablet_id].active_stream_list.insert(stream_id); - latest_active_time = impl_->GetLatestActiveTime(producer_tablet, *result); + latest_active_time = last_active_time_cdc_state_table; } // Ignoring those non-bootstarped CDCSDK stream - if (*result != OpId::Invalid()) { - PopulateTabletMinCheckpoint( - tablet_id, *result, record.source_type, latest_active_time, &tablet_min_checkpoint_map); + if (op_id != OpId::Invalid()) { + PopulateTabletMinCheckpointAndLatestActiveTime( + tablet_id, op_id, record.source_type, latest_active_time, &tablet_min_checkpoint_map); } } @@ -1893,7 +1870,7 @@ Result CDCServiceImpl::PopulateTabletCheckPointInfo( } void CDCServiceImpl::UpdateTabletPeersWithMinReplicatedIndex( - TabletOpIdMap* tablet_min_checkpoint_map) { + TabletIdCDCCheckpointMap* tablet_min_checkpoint_map) { auto enable_update_local_peer_min_index = GetAtomicFlag(&FLAGS_enable_update_local_peer_min_index); @@ -1927,7 +1904,7 @@ void CDCServiceImpl::UpdateTabletPeersWithMinReplicatedIndex( continue; } - auto result = tablet_peer->GetCDCSDKIntentRetainTime(tablet_info.cdc_sdk_most_active_time); + auto result = tablet_peer->GetCDCSDKIntentRetainTime(tablet_info.cdc_sdk_latest_active_time); if (!result.ok()) { LOG(WARNING) << "Unable to get the intent retain time for tablet peer " << tablet_peer->permanent_uuid() << " and tablet " << tablet_peer->tablet_id() @@ -2002,7 +1979,7 @@ void CDCServiceImpl::UpdatePeersAndMetrics() { LOG(WARNING) << "Failed to populate tablets checkpoint info: " << result.status(); continue; } - TabletOpIdMap& tablet_checkpoint_map = *result; + TabletIdCDCCheckpointMap& tablet_checkpoint_map = *result; VLOG(3) << "List of tablets with checkpoint info read from cdc_state table: " << tablet_checkpoint_map.size(); UpdateTabletPeersWithMinReplicatedIndex(&tablet_checkpoint_map); @@ -2287,14 +2264,6 @@ void CDCServiceImpl::UpdateCdcReplicatedIndex(const UpdateCdcReplicatedIndexRequ cdc_sdk_op, cdc_sdk_op_id_expiration); RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context); - - if (req->stream_ids_size() > 0) { - for (int stream_idx = 0; stream_idx < req->stream_ids_size(); stream_idx++) { - ProducerTabletInfo producer_tablet = { - "" /* UUID */, req->stream_ids(stream_idx), req->tablet_ids(i)}; - impl_->UpdateFollowerCache(producer_tablet, cdc_sdk_op); - } - } } context.RespondSuccess(); } @@ -2882,7 +2851,94 @@ void CDCServiceImpl::Shutdown() { update_peers_and_metrics_thread_->join(); } impl_->async_client_init_ = nullptr; + impl_->ClearCaches(); + } +} + +Status CDCServiceImpl::CheckStreamActive( + const ProducerTabletInfo& producer_tablet, const client::YBSessionPtr& session, + const int64_t& last_active_time_passed) { + auto last_active_time = (last_active_time_passed == 0) + ? VERIFY_RESULT(GetLastActiveTime(producer_tablet, session)) + : last_active_time_passed; + + auto now = GetCurrentTimeMicros(); + if (now < last_active_time + 1000 * (GetAtomicFlag(&FLAGS_cdc_intent_retention_ms))) { + VLOG(1) << "Tablet: " << producer_tablet.ToString() + << " found in CDCState table/ cache with active time: " << last_active_time + << " current time:" << now << ", for stream: " << producer_tablet.stream_id; + return Status::OK(); + } + + last_active_time = VERIFY_RESULT(GetLastActiveTime(producer_tablet, session, true)); + if (now < last_active_time + 1000 * (GetAtomicFlag(&FLAGS_cdc_intent_retention_ms))) { + VLOG(1) << "Tablet: " << producer_tablet.ToString() + << " found in CDCState table with active time: " << last_active_time + << " current time:" << now << ", for stream: " << producer_tablet.stream_id; + return Status::OK(); } + + VLOG(1) << "Stream: " << producer_tablet.stream_id + << ", is expired for tablet: " << producer_tablet.tablet_id + << ", active time in CDCState table: " << last_active_time << ", current time: " << now; + return STATUS_FORMAT( + InternalError, "Stream ID $0 is expired for Tablet ID $1", producer_tablet.stream_id, + producer_tablet.tablet_id); +} + +Result CDCServiceImpl::GetLastActiveTime( + const ProducerTabletInfo& producer_tablet, const client::YBSessionPtr& session, + bool ignore_cache) { + DCHECK(!producer_tablet.stream_id.empty() && !producer_tablet.tablet_id.empty()); + + if (!ignore_cache) { + auto result = impl_->GetLastActiveTime(producer_tablet); + if (result) { + return *result; + } + } + + auto cdc_state_table_result = GetCdcStateTable(); + RETURN_NOT_OK(cdc_state_table_result); + + const auto readop = (*cdc_state_table_result)->NewReadOp(); + auto* const readreq = readop->mutable_request(); + QLAddStringHashValue(readreq, producer_tablet.tablet_id); + + auto cond = readreq->mutable_where_expr()->mutable_condition(); + cond->set_op(QLOperator::QL_OP_AND); + QLAddStringCondition( + cond, Schema::first_column_id() + master::kCdcStreamIdIdx, QL_OP_EQUAL, + producer_tablet.stream_id); + readreq->mutable_column_refs()->add_ids(Schema::first_column_id() + master::kCdcTabletIdIdx); + readreq->mutable_column_refs()->add_ids(Schema::first_column_id() + master::kCdcStreamIdIdx); + (*cdc_state_table_result)->AddColumns({master::kCdcData}, readreq); + + // TODO(async_flush): https://github.com/yugabyte/yugabyte-db/issues/12173 + RETURN_NOT_OK(session->TEST_ReadSync(readop)); + auto row_block = ql::RowsResult(readop.get()).GetRowBlock(); + + if (row_block->row_count() != 1) { + // This could happen when conncurently as this function is running the stram is deleted, in + // which case we return last active_time as "0". + return 0; + } + if (!row_block->row(0).column(0).IsNull()) { + DCHECK_EQ(row_block->row(0).column(0).type(), InternalType::kMapValue); + if (row_block->row(0).column(0).map_value().values().size() == 1) { + auto last_active_time_string = + row_block->row(0).column(0).map_value().values().Get(0).string_value(); + auto last_active_time = VERIFY_RESULT(CheckedStoInt(last_active_time_string)); + + VLOG(2) << "Found entry in cdc_state table with active time: " << last_active_time + << ", for tablet: " << producer_tablet.tablet_id + << ", and stream: " << producer_tablet.stream_id; + + return last_active_time; + } + } + + return GetCurrentTimeMicros(); } Result CDCServiceImpl::GetLastCheckpoint( @@ -2972,12 +3028,13 @@ void CDCServiceImpl::UpdateCDCTabletMetrics( } } -Status CDCServiceImpl::UpdateCheckpoint( +Status CDCServiceImpl::UpdateCheckpointAndActiveTime( const ProducerTabletInfo& producer_tablet, const OpId& sent_op_id, const OpId& commit_op_id, const client::YBSessionPtr& session, uint64_t last_record_hybrid_time, + const CDCRequestSource& request_source, const bool force_update) { bool update_cdc_state = impl_->UpdateCheckpoint(producer_tablet, sent_op_id, commit_op_id); if (update_cdc_state || force_update) { @@ -2993,8 +3050,20 @@ Status CDCServiceImpl::UpdateCheckpoint( // caught up, so the current time. uint64_t last_replication_time_micros = last_record_hybrid_time != 0 ? HybridTime(last_record_hybrid_time).GetPhysicalValueMicros() : GetCurrentTimeMicros(); - cdc_state->AddTimestampColumnValue(req, master::kCdcLastReplicationTime, - last_replication_time_micros); + cdc_state->AddTimestampColumnValue( + req, master::kCdcLastReplicationTime, last_replication_time_micros); + + if (request_source == CDCSDK) { + auto last_active_time = GetCurrentTimeMicros(); + auto column_id = cdc_state->ColumnId(master::kCdcData); + cdc_state->AddMapColumnValue(req, column_id, "active_time", ToString(last_active_time)); + + VLOG(2) << "Updating cdc state table with: checkpoint: " << commit_op_id.ToString() + << ", last active time: " << last_active_time + << ", for tablet: " << producer_tablet.tablet_id + << ", and stream: " << producer_tablet.stream_id; + } + // Only perform the update if we have a row in cdc_state to prevent a race condition where // a stream is deleted and then this logic inserts entries in cdc_state from that deleted // stream. diff --git a/ent/src/yb/cdc/cdc_service.h b/ent/src/yb/cdc/cdc_service.h index 5ae972b06a40..8c6c24790b3f 100644 --- a/ent/src/yb/cdc/cdc_service.h +++ b/ent/src/yb/cdc/cdc_service.h @@ -67,7 +67,7 @@ struct TabletCheckpoint { // Timestamp at which the op ID was last updated. CoarseTimePoint last_update_time; // Timestamp at which stream polling happen. - CoarseTimePoint last_active_time; + int64_t last_active_time; bool ExpiredAt(std::chrono::milliseconds duration, std::chrono::time_point now) { return !IsInitialized(last_update_time) || (now - last_update_time) >= duration; @@ -80,11 +80,10 @@ struct TabletCDCCheckpointInfo { OpId cdc_op_id = OpId::Max(); OpId cdc_sdk_op_id = OpId::Invalid(); MonoDelta cdc_sdk_op_id_expiration = MonoDelta::kZero; - CoarseTimePoint cdc_sdk_most_active_time = CoarseTimePoint::min(); - std::unordered_set active_stream_list; + int64_t cdc_sdk_latest_active_time = 0; }; -using TabletOpIdMap = std::unordered_map; +using TabletIdCDCCheckpointMap = std::unordered_map; using TabletIdStreamIdSet = std::set>; class CDCServiceImpl : public CDCServiceIf { @@ -188,6 +187,14 @@ class CDCServiceImpl : public CDCServiceIf { template bool CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc); + Status CheckStreamActive( + const ProducerTabletInfo& producer_tablet, const client::YBSessionPtr& session, + const int64_t& last_active_time_passed = 0); + + Result GetLastActiveTime( + const ProducerTabletInfo& producer_tablet, const client::YBSessionPtr& session, + bool ignore_cache = false); + Result GetLastCheckpoint(const ProducerTabletInfo& producer_tablet, const client::YBSessionPtr& session); @@ -198,12 +205,14 @@ class CDCServiceImpl : public CDCServiceIf { Result GetCdcStreamId(const ProducerTabletInfo& producer_tablet, const std::shared_ptr& session); - Status UpdateCheckpoint(const ProducerTabletInfo& producer_tablet, - const OpId& sent_op_id, - const OpId& commit_op_id, - const client::YBSessionPtr& session, - uint64_t last_record_hybrid_time, - bool force_update = false); + Status UpdateCheckpointAndActiveTime( + const ProducerTabletInfo& producer_tablet, + const OpId& sent_op_id, + const OpId& commit_op_id, + const client::YBSessionPtr& session, + uint64_t last_record_hybrid_time, + const CDCRequestSource& request_source = CDCRequestSource::CDCSDK, + bool force_update = false); Result> GetTablets( const CDCStreamId& stream_id); @@ -235,7 +244,7 @@ class CDCServiceImpl : public CDCServiceIf { rpc::RpcContext* context, const std::shared_ptr& peer); - void UpdateTabletPeersWithMinReplicatedIndex(TabletOpIdMap* tablet_min_checkpoint_map); + void UpdateTabletPeersWithMinReplicatedIndex(TabletIdCDCCheckpointMap* tablet_min_checkpoint_map); Result TabletLeaderLatestEntryOpId(const TabletId& tablet_id); @@ -324,7 +333,7 @@ class CDCServiceImpl : public CDCServiceIf { CreateCDCStreamResponsePB* resp, CoarseTimePoint deadline); - Result PopulateTabletCheckPointInfo( + Result PopulateTabletCheckPointInfo( const TabletId& input_tablet_id = "", TabletIdStreamIdSet* tablet_stream_to_be_deleted = nullptr); diff --git a/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc b/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc index 46d1dc2c6070..dfc8cc898c99 100644 --- a/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -67,6 +67,7 @@ #include "yb/util/monotime.h" #include "yb/util/random_util.h" #include "yb/util/result.h" +#include "yb/util/stol_utils.h" #include "yb/util/test_macros.h" #include "yb/util/thread.h" @@ -855,10 +856,10 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { replica.ts_info().permanent_uuid()) { if (replica.role() == PeerRole::LEADER) { *leader_index = i; - LOG(INFO) << "Found first leader index: " << i; + LOG(INFO) << "Found leader index: " << i; } else if (replica.role() == PeerRole::FOLLOWER) { *follower_index = i; - LOG(INFO) << "Found first follower index: " << i; + LOG(INFO) << "Found follower index: " << i; } } } @@ -866,7 +867,7 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { } void CompareExpirationTime( const TabletId& tablet_id, const CoarseTimePoint& prev_leader_expiry_time, - size_t current_leader_idx) { + size_t current_leader_idx, bool strictly_greater_than = false) { ASSERT_OK(WaitFor( [&]() { CoarseTimePoint current_expiry_time; @@ -877,47 +878,59 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { break; } } - if (current_expiry_time >= prev_leader_expiry_time) { - LOG(INFO) << "The expiration time for the current LEADER is: " - << current_expiry_time.time_since_epoch().count() - << ", and the previous LEADER expiration time should be: " - << prev_leader_expiry_time.time_since_epoch().count(); - return true; + if (strictly_greater_than) { + if (current_expiry_time > prev_leader_expiry_time) { + LOG(INFO) << "The expiration time for the current LEADER is: " + << current_expiry_time.time_since_epoch().count() + << ", and the previous LEADER expiration time should be: " + << prev_leader_expiry_time.time_since_epoch().count(); + return true; + } + } else { + if (current_expiry_time >= prev_leader_expiry_time) { + LOG(INFO) << "The expiration time for the current LEADER is: " + << current_expiry_time.time_since_epoch().count() + << ", and the previous LEADER expiration time should be: " + << prev_leader_expiry_time.time_since_epoch().count(); + return true; + } } } return false; }, - MonoDelta::FromSeconds(60), "Waiting for active time is updated in FOLLOWERs")); + MonoDelta::FromSeconds(60), "Waiting for active time to be updated")); } - void CompareCacheActiveTime( - const CDCStreamId& stream_id, const TabletId& tablet_id, - const CoarseTimePoint& prev_leader_active_time, size_t current_leader_index) { - ASSERT_OK(WaitFor( - [&]() -> Result { - while (true) { - const auto& first_tserver = - test_cluster()->mini_tablet_server(current_leader_index)->server(); - auto cdc_service = - dynamic_cast(first_tserver->rpc_server() - ->TEST_service_pool("yb.cdc.CDCService") - ->TEST_get_service() - .get()); - auto tablet_info = cdc_service->TEST_GetTabletInfoFromCache({"", stream_id, tablet_id}); - if (!tablet_info.ok()) { - return false; - } - if (tablet_info->last_active_time >= prev_leader_active_time) { - LOG(INFO) << "Current LEADER active time in Cache: " - << tablet_info->last_active_time.time_since_epoch().count() - << " previoud LEADER actibe time in Cache: " - << prev_leader_active_time.time_since_epoch().count(); - return true; - } - } - return false; - }, - MonoDelta::FromSeconds(60), "Waiting for active time is updated in FOLLOWERs.")); + Result GetLastActiveTimeFromCdcStateTable( + const CDCStreamId& stream_id, const TabletId& tablet_id, client::YBClient* client) { + auto session = client->NewSession(); + client::TableHandle table; + const client::YBTableName cdc_state_table( + YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); + RETURN_NOT_OK(table.Open(cdc_state_table, client)); + + auto read_op = table.NewReadOp(); + auto* read_req = read_op->mutable_request(); + QLAddStringHashValue(read_req, tablet_id); + auto cond = read_req->mutable_where_expr()->mutable_condition(); + cond->set_op(QLOperator::QL_OP_AND); + QLAddStringCondition( + cond, Schema::first_column_id() + master::kCdcStreamIdIdx, QL_OP_EQUAL, stream_id); + table.AddColumns({master::kCdcData}, read_req); + // TODO(async_flush): https://github.com/yugabyte/yugabyte-db/issues/12173 + RETURN_NOT_OK(session->TEST_ReadSync(read_op)); + + auto row_block = ql::RowsResult(read_op.get()).GetRowBlock(); + if (row_block->row_count() != 1) { + return STATUS( + InvalidArgument, "Did not find a row in the cdc_state table for the tablet and stream."); + } + + const auto& last_active_time_string = + row_block->row(0).column(0).map_value().values().Get(0).string_value(); + + auto last_active_time = VERIFY_RESULT(CheckedStoInt(last_active_time_string)); + return last_active_time; } }; @@ -3395,21 +3408,14 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKCacheWithLeaderReElect) LOG(INFO) << "The correct expiry time after the final GetChanges call: " << correct_expiry_time.time_since_epoch().count(); - const auto& tserver = test_cluster()->mini_tablet_server(second_leader_index)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); - auto tablet_info = ASSERT_RESULT( - cdc_service->TEST_GetTabletInfoFromCache({"", stream_id, tablets[0].tablet_id()})); - auto correct_last_active_time = tablet_info.last_active_time; // we need to ensure the initial leader get's back leadership ASSERT_OK(ChangeLeaderOfTablet(first_follower_index, tablets[0].tablet_id())); + LOG(INFO) << "Changed leadership back to the first leader TServer"; // Call the test RPC to get last active time of the current leader (original), and it should // be lower than the previously recorded last_active_time. - // SleepFor(MonoDelta::FromSeconds(2)); CompareExpirationTime(tablets2[0].tablet_id(), correct_expiry_time, first_leader_index); - CompareCacheActiveTime( - stream_id, tablets[0].tablet_id(), correct_last_active_time, first_leader_index); + LOG(INFO) << "Succesfully compared expiry times"; } TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKCacheWithLeaderRestart)) { @@ -3518,13 +3524,6 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKCacheWithLeaderRestart) << second_leader_index << " : " << correct_expiry_time.time_since_epoch().count(); - const auto& tserver = test_cluster()->mini_tablet_server(second_leader_index)->server(); - auto cdc_service = dynamic_cast( - tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); - auto tablet_info = ASSERT_RESULT( - cdc_service->TEST_GetTabletInfoFromCache({"", stream_id, tablets[0].tablet_id()})); - auto correct_last_active_time = tablet_info.last_active_time; - // We need to ensure the initial leader get's back leadership. ASSERT_OK(ChangeLeaderOfTablet(first_leader_index, tablets[0].tablet_id())); @@ -3543,8 +3542,157 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKCacheWithLeaderRestart) // Call the test RPC to get last active time of the current leader (original), and it will // be lower than the previously recorded last_active_time. CompareExpirationTime(tablets2[0].tablet_id(), correct_expiry_time, first_leader_index); - CompareCacheActiveTime( - stream_id, tablets[0].tablet_id(), correct_last_active_time, first_leader_index); +} + +TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKActiveTimeCacheInSyncWithCDCStateTable)) { + FLAGS_update_min_cdc_indices_interval_secs = 1; + FLAGS_update_metrics_interval_ms = 1000; + FLAGS_cdc_state_checkpoint_update_interval_ms = 0; + const int num_tservers = 3; + ASSERT_OK(SetUpWithParams(num_tservers, 1, false)); + + const uint32_t num_tablets = 1; + auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName, num_tablets)); + + google::protobuf::RepeatedPtrField tablets; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr)); + ASSERT_EQ(tablets.size(), num_tablets); + + TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); + + CDCStreamId stream_id = ASSERT_RESULT(CreateDBStream(IMPLICIT)); + + auto resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets)); + ASSERT_FALSE(resp.has_error()); + + // Insert some records in transaction. + ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); + ASSERT_OK(test_client()->FlushTables( + {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + GetChangesResponsePB change_resp; + change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); + uint32_t record_size = change_resp.cdc_sdk_proto_records_size(); + ASSERT_GE(record_size, 100); + LOG(INFO) << "Total records read by GetChanges call on stream_id_1: " << record_size; + + size_t first_leader_index = -1; + size_t first_follower_index = -1; + GetTabletLeaderAndAnyFollowerIndex(tablets, &first_leader_index, &first_follower_index); + + GetChangesResponsePB change_resp_1 = + ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &change_resp.cdc_sdk_checkpoint())); + LOG(INFO) << "Number of records after first transaction: " << change_resp_1.records().size(); + + const auto& first_leader_tserver = + test_cluster()->mini_tablet_server(first_leader_index)->server(); + auto cdc_service = dynamic_cast(first_leader_tserver->rpc_server() + ->TEST_service_pool("yb.cdc.CDCService") + ->TEST_get_service() + .get()); + auto tablet_info = ASSERT_RESULT( + cdc_service->TEST_GetTabletInfoFromCache({"", stream_id, tablets[0].tablet_id()})); + auto first_last_active_time = tablet_info.last_active_time; + auto last_active_time_from_table = ASSERT_RESULT( + GetLastActiveTimeFromCdcStateTable(stream_id, tablets[0].tablet_id(), test_client())); + // Now check the active time in CDCSTate table, it should be greater than or equal to the + // last_active_time from the cache. + ASSERT_GE(last_active_time_from_table, first_last_active_time); + LOG(INFO) << "The active time is equal in both the cache and cdc_state table"; + + const size_t& second_leader_index = first_follower_index; + ASSERT_OK(ChangeLeaderOfTablet(second_leader_index, tablets[0].tablet_id())); + + // Insert some records in transaction after first leader stepdown. + ASSERT_OK(WriteRowsHelper(100 /* start */, 200 /* end */, &test_cluster_, true)); + ASSERT_OK(test_client()->FlushTables( + {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + // Call GetChanges so that the last active time is updated on the new leader. + auto result = GetChangesFromCDC(stream_id, tablets, &change_resp.cdc_sdk_checkpoint()); + + const auto& second_leader_tserver = + test_cluster()->mini_tablet_server(second_leader_index)->server(); + cdc_service = dynamic_cast(second_leader_tserver->rpc_server() + ->TEST_service_pool("yb.cdc.CDCService") + ->TEST_get_service() + .get()); + tablet_info = ASSERT_RESULT( + cdc_service->TEST_GetTabletInfoFromCache({"", stream_id, tablets[0].tablet_id()})); + auto second_last_active_time = tablet_info.last_active_time; + + last_active_time_from_table = ASSERT_RESULT( + GetLastActiveTimeFromCdcStateTable(stream_id, tablets[0].tablet_id(), test_client())); + ASSERT_GE(last_active_time_from_table, second_last_active_time); + LOG(INFO) << "The active time is equal in both the cache and cdc_state table"; +} + +TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKCacheWhenAFollowerIsUnavailable)) { + FLAGS_update_min_cdc_indices_interval_secs = 1; + FLAGS_update_metrics_interval_ms = 500; + FLAGS_cdc_state_checkpoint_update_interval_ms = 0; + const int num_tservers = 5; + ASSERT_OK(SetUpWithParams(num_tservers, 1, false)); + + const uint32_t num_tablets = 1; + auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName, num_tablets)); + google::protobuf::RepeatedPtrField tablets; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr)); + ASSERT_EQ(tablets.size(), num_tablets); + + TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); + + CDCStreamId stream_id = ASSERT_RESULT(CreateDBStream(IMPLICIT)); + auto resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets)); + ASSERT_FALSE(resp.has_error()); + + size_t first_leader_index = 0; + size_t first_follower_index = 0; + GetTabletLeaderAndAnyFollowerIndex(tablets, &first_leader_index, &first_follower_index); + + SleepFor(MonoDelta::FromSeconds(2)); + + // Insert some records in transaction. + ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); + ASSERT_OK(test_client()->FlushTables( + {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + GetChangesResponsePB change_resp; + change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); + uint32_t record_size = change_resp.cdc_sdk_proto_records_size(); + ASSERT_GE(record_size, 100); + LOG(INFO) << "Total records read by GetChanges call on stream_id_1: " << record_size; + SleepFor(MonoDelta::FromSeconds(10)); + + // Insert some records in transaction after leader shutdown. + ASSERT_OK(WriteRowsHelper(100 /* start */, 200 /* end */, &test_cluster_, true)); + ASSERT_OK(test_client()->FlushTables( + {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + auto result = GetChangesFromCDC(stream_id, tablets, &change_resp.cdc_sdk_checkpoint()); + + CoarseTimePoint first_expiry_time; + for (auto const& peer : test_cluster()->GetTabletPeers(first_leader_index)) { + if (peer->tablet_id() == tablets[0].tablet_id()) { + first_expiry_time = peer->cdc_sdk_min_checkpoint_op_id_expiration(); + } + } + LOG(INFO) << "The expiry time after the first GetChanges call: " + << first_expiry_time.time_since_epoch().count(); + + // Shutdown tserver having tablet FOLLOWER. + test_cluster()->mini_tablet_server(first_follower_index)->Shutdown(); + LOG(INFO) << "TServer hosting tablet follower shutdown"; + // Call GetChanges so that the last active time is updated on the new leader. + result = GetChangesFromCDC(stream_id, tablets, &change_resp.cdc_sdk_checkpoint()); + + // Call the test RPC to get last active time of the current leader (original), and it must + // be greater than or equal to the previously recorded last_active_time. + CompareExpirationTime(tablets[0].tablet_id(), first_expiry_time, first_leader_index, true); } } // namespace enterprise diff --git a/ent/src/yb/master/catalog_manager_ent.cc b/ent/src/yb/master/catalog_manager_ent.cc index d6716abe44b4..9b98237b53dd 100644 --- a/ent/src/yb/master/catalog_manager_ent.cc +++ b/ent/src/yb/master/catalog_manager_ent.cc @@ -3510,6 +3510,14 @@ Status CatalogManager::CreateCDCStream(const CreateCDCStreamRequestPB* req, cdc_table.AddStringColumnValue(req, master::kCdcCheckpoint, OpId().ToString()); cdc_table.AddTimestampColumnValue( req, master::kCdcLastReplicationTime, GetCurrentTimeMicros()); + + if (id_type_option_value == cdc::kNamespaceId) { + // For cdcsdk cases, we also need to persist last_active_time in the 'cdc_state' table. We + // will store this info in the map in the 'kCdcData' column. + auto column_id = cdc_table.ColumnId(master::kCdcData); + cdc_table.AddMapColumnValue(req, column_id, "active_time", "0"); + } + session->Apply(op); } // TODO(async_flush): https://github.com/yugabyte/yugabyte-db/issues/12173 diff --git a/src/yb/client/table_handle.cc b/src/yb/client/table_handle.cc index bfd7b28894d8..c15a23418ad0 100644 --- a/src/yb/client/table_handle.cc +++ b/src/yb/client/table_handle.cc @@ -171,6 +171,27 @@ void TableHandle::AddCondition(QLConditionPB* const condition, const QLOperator condition->add_operands()->mutable_condition()->set_op(op); } +QLMapValuePB* TableHandle::AddMapColumnValue( + QLWriteRequestPB* req, const int32_t& column_id, const string& entry_key, + const string& entry_value) const { + auto column_value = req->add_column_values(); + column_value->set_column_id(column_id); + QLMapValuePB* map_value = (column_value->mutable_expr()->mutable_value()->mutable_map_value()); + QLValuePB* elem = map_value->add_keys(); + elem->set_string_value(entry_key); + elem = map_value->add_values(); + elem->set_string_value(entry_value); + return map_value; +} + +void TableHandle::AddMapEntryToColumn( + QLMapValuePB* map_value_pb, const string& entry_key, const string& entry_value) const { + QLValuePB* elem = map_value_pb->add_keys(); + elem->set_string_value(entry_key); + elem = map_value_pb->add_values(); + elem->set_string_value(entry_value); +} + void TableHandle::AddColumns(const std::vector& columns, QLReadRequestPB* req) const { QLRSRowDescPB* rsrow_desc = req->mutable_rsrow_desc(); for (const auto& column : columns) { diff --git a/src/yb/client/table_handle.h b/src/yb/client/table_handle.h index 31171645272f..7f778e8b77e1 100644 --- a/src/yb/client/table_handle.h +++ b/src/yb/client/table_handle.h @@ -111,6 +111,13 @@ class TableHandle { // E.g. Add under "... AND ". void AddCondition(QLConditionPB *const condition, const QLOperator op) const; + QLMapValuePB* AddMapColumnValue( + QLWriteRequestPB* req, const int32_t& column_id, const string& entry_key, + const string& entry_value) const; + + void AddMapEntryToColumn( + QLMapValuePB* map_value_pb, const string& entry_key, const string& entry_value) const; + void AddColumns(const std::vector& columns, QLReadRequestPB* req) const; const YBTablePtr& table() const { diff --git a/src/yb/tablet/tablet_peer.cc b/src/yb/tablet/tablet_peer.cc index b45f686637c0..adff64412ed6 100644 --- a/src/yb/tablet/tablet_peer.cc +++ b/src/yb/tablet/tablet_peer.cc @@ -1077,14 +1077,13 @@ Status TabletPeer::SetCDCSDKRetainOpIdAndTime( return Status::OK(); } -Result TabletPeer::GetCDCSDKIntentRetainTime( - const CoarseTimePoint& cdc_sdk_latest_active_time) { +Result TabletPeer::GetCDCSDKIntentRetainTime(const int64_t& cdc_sdk_latest_active_time) { MonoDelta cdc_sdk_intent_retention = MonoDelta::kZero; // If cdc_sdk_latest_update_time is not updated to default CoarseTimePoint::min() value, // It's mean that, no need to retain the intents. This can happen in below case:- // a. Only XCluster streams are defined for the tablet. // b. CDCSDK stream for the tablet is expired. - if (cdc_sdk_latest_active_time == CoarseTimePoint::min()) { + if (cdc_sdk_latest_active_time == 0) { return cdc_sdk_intent_retention; } @@ -1095,7 +1094,8 @@ Result TabletPeer::GetCDCSDKIntentRetainTime( // all the FOLLOWERs as their cdc_sdk_min_checkpoint_op_id_expiration_. MonoDelta max_retain_time = MonoDelta::FromMilliseconds(GetAtomicFlag(&FLAGS_cdc_intent_retention_ms)); - MonoDelta lastest_active_time(CoarseMonoClock::Now() - cdc_sdk_latest_active_time); + auto lastest_active_time = + MonoDelta::FromMicroseconds(GetCurrentTimeMicros() - cdc_sdk_latest_active_time); if (max_retain_time >= lastest_active_time) { cdc_sdk_intent_retention = max_retain_time - lastest_active_time; } diff --git a/src/yb/tablet/tablet_peer.h b/src/yb/tablet/tablet_peer.h index 01e755394a35..f3dc39647e53 100644 --- a/src/yb/tablet/tablet_peer.h +++ b/src/yb/tablet/tablet_peer.h @@ -390,7 +390,7 @@ class TabletPeer : public std::enable_shared_from_this, Status SetCDCSDKRetainOpIdAndTime( const OpId& cdc_sdk_op_id, const MonoDelta& cdc_sdk_op_id_expiration); - Result GetCDCSDKIntentRetainTime(const CoarseTimePoint& cdc_sdk_latest_active_time); + Result GetCDCSDKIntentRetainTime(const int64_t& cdc_sdk_latest_active_time); OpId GetLatestCheckPoint();