Skip to content

Commit

Permalink
[yugabyte#13693] CDCSDK: Stale entry in CDC Cache causes Steam Expira…
Browse files Browse the repository at this point in the history
…tion.

Summary:
Consider a cluster with 3 tservers (TS1, TS2, TS3) and a table with a single tablet. Today we maintain and track the stream active time in the cache. During starting time tablet LEADER is TS1, so there is a Cache entry for the tablet, to track its active time.  After some time TS2 becomes the tablet LEADER, so an entry will be created in TS2's cache to track the active time of the tablet. now after cdc_intent_retention_ms expiration time,  TS1 becomes a LEADER, but its existing cache entry is not in sync, so if we call GetChanges stream will expire.

To handle this RPC request will be sent by LEADER as part //UpdatePeersAndMetrics// thread, to all the FOLLOWER to update their //last_active_time// in the  CDC Service Cache, so that LEADER and FOLLOWERS are in sync.

Test Plan:
Jenkins: skip
Running all the c and java testcases

Reviewers: abharadwaj, aagrawal, vkushwaha, skumar, srangavajjula

Reviewed By: skumar

Subscribers: ycdcxcluster

Differential Revision: https://phabricator.dev.yugabyte.com/D18882
  • Loading branch information
sureshdash2022-yb committed Aug 19, 2022
1 parent 9b9525a commit 2787d62
Show file tree
Hide file tree
Showing 8 changed files with 465 additions and 9 deletions.
48 changes: 45 additions & 3 deletions ent/src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -581,14 +581,31 @@ class CDCServiceImpl::Impl {
CoarseMonoClock::Now() >
it->cdc_state_checkpoint.last_active_time +
MonoDelta::FromMilliseconds(GetAtomicFlag(&FLAGS_cdc_intent_retention_ms))) {
LOG(ERROR) << "Stream ID: " << producer_tablet.stream_id
<< " expired for Tablet ID: " << producer_tablet.tablet_id
<< " with active time :"
<< it->cdc_state_checkpoint.last_active_time.time_since_epoch();
return STATUS_FORMAT(
InternalError, "stream ID $0 is expired for Tablet ID $1", producer_tablet.stream_id,
producer_tablet.tablet_id);
}
VLOG(1) << "Tablet :" << producer_tablet.ToString()
<< " found in CDCSerive Cache with active time: "
<< ": " << it->cdc_state_checkpoint.last_active_time.time_since_epoch();
}
return Status::OK();
}

Result<TabletCheckpoint> TEST_GetTabletInfoFromCache(const ProducerTabletInfo& producer_tablet) {
SharedLock<rw_spinlock> l(mutex_);
auto it = tablet_checkpoints_.find(producer_tablet);
if (it != tablet_checkpoints_.end()) {
return it->cdc_state_checkpoint;
}
return STATUS_FORMAT(
InternalError, "Tablet info: $0 not found in cache.", producer_tablet.ToString());
}

void UpdateActiveTime(const ProducerTabletInfo& producer_tablet) {
SharedLock<rw_spinlock> l(mutex_);
auto it = tablet_checkpoints_.find(producer_tablet);
Expand Down Expand Up @@ -1208,6 +1225,11 @@ Result<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> CDCService
return all_tablets;
}

Result<TabletCheckpoint> CDCServiceImpl::TEST_GetTabletInfoFromCache(
const ProducerTabletInfo& producer_tablet) {
return impl_->TEST_GetTabletInfoFromCache(producer_tablet);
}

void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,
GetChangesResponsePB* resp,
RpcContext context) {
Expand Down Expand Up @@ -1440,6 +1462,7 @@ Status CDCServiceImpl::UpdatePeersCdcMinReplicatedIndex(
std::vector<client::internal::RemoteTabletServer *> servers;
RETURN_NOT_OK(GetTServers(tablet_id, &servers));

auto ts_leader = VERIFY_RESULT(GetLeaderTServer(tablet_id));
for (const auto &server : servers) {
if (server->IsLocal()) {
// We modify our log directly. Avoid calling itself through the proxy.
Expand All @@ -1455,6 +1478,12 @@ Status CDCServiceImpl::UpdatePeersCdcMinReplicatedIndex(
cdc_checkpoint_min.cdc_sdk_op_id.ToPB(update_index_req.add_cdc_sdk_consumed_ops());
update_index_req.add_cdc_sdk_ops_expiration_ms(
cdc_checkpoint_min.cdc_sdk_op_id_expiration.ToMilliseconds());
// Don't update active time for the TABLET LEADER. Only update in FOLLOWERS.
if (server->permanent_uuid() != ts_leader->permanent_uuid()) {
for (auto& stream_id : cdc_checkpoint_min.active_stream_list) {
update_index_req.add_stream_ids(stream_id);
}
}

rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms));
Expand Down Expand Up @@ -1795,13 +1824,17 @@ Result<TabletOpIdMap> CDCServiceImpl::PopulateTabletCheckPointInfo(
// Check stream associated with the tablet is active or not.
// Don't consider those inactive stream for the min_checkpoint calculation.
CoarseTimePoint latest_active_time = CoarseTimePoint ::min();
if (record.source_type == CDCSDK) {
// if current tsever is the tablet LEADER, send the FOLLOWER tablets to
// update their active_time in their CDCService Cache.
std::shared_ptr<tablet::TabletPeer> tablet_peer;
Status s = tablet_manager_->GetTabletPeer(tablet_id, &tablet_peer);
if (s.ok() && record.source_type == CDCSDK && IsTabletPeerLeader(tablet_peer)) {
auto status = impl_->CheckStreamActive(producer_tablet);
if (!status.ok()) {
// Inactive stream read from cdc_state table are not considered for the minimum
// cdc_sdk_op_id calculation except if tablet is associated with a single stream, This is
// required to update the cdc_sdk_op_id_expiration in the tablet_min_checkpoint_map for the
// corresponding tablet, so that the tablet PEERS will be updated with
// required to update the cdc_sdk_op_id_expiration in the tablet_min_checkpoint_map for
// the corresponding tablet, so that the tablet PEERS will be updated with
// cdc_sdk_min_checkpoint_op_id_expiration_ as EXPIRED.
if (tablet_min_checkpoint_map.find(tablet_id) == tablet_min_checkpoint_map.end()) {
auto& tablet_info = tablet_min_checkpoint_map[tablet_id];
Expand All @@ -1810,6 +1843,7 @@ Result<TabletOpIdMap> CDCServiceImpl::PopulateTabletCheckPointInfo(
}
continue;
}
tablet_min_checkpoint_map[tablet_id].active_stream_list.insert(stream_id);
latest_active_time = impl_->GetLatestActiveTime(producer_tablet, *result);
}

Expand Down Expand Up @@ -2223,6 +2257,14 @@ void CDCServiceImpl::UpdateCdcReplicatedIndex(const UpdateCdcReplicatedIndexRequ
cdc_sdk_op,
cdc_sdk_op_id_expiration);
RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);

if (req->stream_ids_size() > 0) {
for (int stream_idx = 0; stream_idx < req->stream_ids_size(); stream_idx++) {
ProducerTabletInfo producer_tablet = {
"" /* UUID */, req->stream_ids(stream_idx), req->tablet_ids(i)};
impl_->UpdateActiveTime(producer_tablet);
}
}
}
context.RespondSuccess();
}
Expand Down
3 changes: 3 additions & 0 deletions ent/src/yb/cdc/cdc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ struct TabletCDCCheckpointInfo {
OpId cdc_sdk_op_id = OpId::Invalid();
MonoDelta cdc_sdk_op_id_expiration = MonoDelta::kZero;
CoarseTimePoint cdc_sdk_most_active_time = CoarseTimePoint::min();
std::unordered_set<CDCStreamId> active_stream_list;
};

using TabletOpIdMap = std::unordered_map<TabletId, TabletCDCCheckpointInfo>;
Expand Down Expand Up @@ -118,6 +119,8 @@ class CDCServiceImpl : public CDCServiceIf {
GetCheckpointResponsePB* resp,
rpc::RpcContext rpc) override;

Result<TabletCheckpoint> TEST_GetTabletInfoFromCache(const ProducerTabletInfo& producer_tablet);

// Update peers in other tablet servers about the latest minimum applied cdc index for a specific
// tablet.
void UpdateCdcReplicatedIndex(const UpdateCdcReplicatedIndexRequestPB* req,
Expand Down
Loading

0 comments on commit 2787d62

Please sign in to comment.