Skip to content

Commit

Permalink
[BACKPORT 2.14][#13653] CDCSDK: Deleting stream IDs lead to stale ent…
Browse files Browse the repository at this point in the history
…ries in the cdc_state table causing tserver crash

Summary:
"Original commit:
 - 2787d62/D18882
  -  86a78b7/D18986"
During the analysis we found that in case a stream_id is deleted, the metadata related to it is not getting cleared from the cdc_state table - and now even if a new stream is created, the previous (deleted) stream is causing interference to the functioning which is ultimately leading to a tserver crash.

To fix this we will ignore those deleted stream metadata entries as part //setCDCCheckpoint//, and will remove those entries when //UpdatePeersAndMetrics// thread is enabled again.

[#13693] CDCSDK: Stale entry in CDC Cache causes Steam Expiration.

Consider a cluster with 3 tservers (TS1, TS2, TS3) and a table with a single tablet. Today we maintain and track the stream active time in the cache. During starting time tablet LEADER is TS1, so there is a Cache entry for the tablet, to track its active time.  After some time TS2 becomes the tablet LEADER, so an entry will be created in TS2's cache to track the active time of the tablet. now after cdc_intent_retention_ms expiration time,  TS1 becomes a LEADER, but its existing cache entry is not in sync, so if we call GetChanges stream will expire.

To handle this RPC request will be sent by LEADER as part //UpdatePeersAndMetrics// thread, to all the FOLLOWER to update their //last_active_time// in the  CDC Service Cache, so that LEADER and FOLLOWERS are in sync.

Test Plan:
Running all the c and java testcases

Jenkins: urgent
Running all the c and java testcases

Reviewers: srangavajjula, vkushwaha, abharadwaj, aagrawal, skumar

Reviewed By: skumar

Subscribers: ycdcxcluster

Differential Revision: https://phabricator.dev.yugabyte.com/D19056
  • Loading branch information
sureshdash2022-yb committed Aug 20, 2022
1 parent 5404cf3 commit 5fbaab5
Show file tree
Hide file tree
Showing 8 changed files with 531 additions and 2 deletions.
46 changes: 44 additions & 2 deletions ent/src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -552,14 +552,31 @@ class CDCServiceImpl::Impl {
CoarseMonoClock::Now() >
it->cdc_state_checkpoint.last_active_time +
MonoDelta::FromMilliseconds(GetAtomicFlag(&FLAGS_cdc_intent_retention_ms))) {
LOG(ERROR) << "Stream ID: " << producer_tablet.stream_id
<< " expired for Tablet ID: " << producer_tablet.tablet_id
<< " with active time :"
<< it->cdc_state_checkpoint.last_active_time.time_since_epoch();
return STATUS_FORMAT(
InternalError, "stream ID $0 is expired for Tablet ID $1", producer_tablet.stream_id,
producer_tablet.tablet_id);
}
VLOG(1) << "Tablet :" << producer_tablet.ToString()
<< " found in CDCSerive Cache with active time: "
<< ": " << it->cdc_state_checkpoint.last_active_time.time_since_epoch();
}
return Status::OK();
}

Result<TabletCheckpoint> TEST_GetTabletInfoFromCache(const ProducerTabletInfo& producer_tablet) {
SharedLock<rw_spinlock> l(mutex_);
auto it = tablet_checkpoints_.find(producer_tablet);
if (it != tablet_checkpoints_.end()) {
return it->cdc_state_checkpoint;
}
return STATUS_FORMAT(
InternalError, "Tablet info: $0 not found in cache.", producer_tablet.ToString());
}

void UpdateActiveTime(const ProducerTabletInfo& producer_tablet) {
SharedLock<rw_spinlock> l(mutex_);
auto it = tablet_checkpoints_.find(producer_tablet);
Expand Down Expand Up @@ -1177,6 +1194,11 @@ Result<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> CDCService
return all_tablets;
}

Result<TabletCheckpoint> CDCServiceImpl::TEST_GetTabletInfoFromCache(
const ProducerTabletInfo& producer_tablet) {
return impl_->TEST_GetTabletInfoFromCache(producer_tablet);
}

void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,
GetChangesResponsePB* resp,
RpcContext context) {
Expand Down Expand Up @@ -1402,6 +1424,7 @@ Status CDCServiceImpl::UpdatePeersCdcMinReplicatedIndex(
std::vector<client::internal::RemoteTabletServer *> servers;
RETURN_NOT_OK(GetTServers(tablet_id, &servers));

auto ts_leader = VERIFY_RESULT(GetLeaderTServer(tablet_id));
for (const auto &server : servers) {
if (server->IsLocal()) {
// We modify our log directly. Avoid calling itself through the proxy.
Expand All @@ -1417,6 +1440,12 @@ Status CDCServiceImpl::UpdatePeersCdcMinReplicatedIndex(
cdc_checkpoint_min.cdc_sdk_op_id.ToPB(update_index_req.add_cdc_sdk_consumed_ops());
update_index_req.add_cdc_sdk_ops_expiration_ms(
cdc_checkpoint_min.cdc_sdk_op_id_expiration.ToMilliseconds());
// Don't update active time for the TABLET LEADER. Only update in FOLLOWERS.
if (server->permanent_uuid() != ts_leader->permanent_uuid()) {
for (auto& stream_id : cdc_checkpoint_min.active_stream_list) {
update_index_req.add_stream_ids(stream_id);
}
}

rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms));
Expand Down Expand Up @@ -1702,7 +1731,7 @@ Result<std::shared_ptr<client::TableHandle>> CDCServiceImpl::GetCdcStateTable()
<< ", last replicated time: " << last_replicated_time_str;

// Add the {tablet_id, stream_id} pair to the set if its checkpoint is OpId::Max().
if (checkpoint == OpId::Max().ToString()) {
if (tablet_stream_to_be_deleted && checkpoint == OpId::Max().ToString()) {
tablet_stream_to_be_deleted->insert({tablet_id, stream_id});
}

Expand Down Expand Up @@ -1743,7 +1772,11 @@ Result<std::shared_ptr<client::TableHandle>> CDCServiceImpl::GetCdcStateTable()
// Check stream associated with the tablet is active or not.
// Don't consider those inactive stream for the min_checkpoint calculation.
CoarseTimePoint latest_active_time = CoarseTimePoint ::min();
if (record.source_type == CDCSDK) {
// if current tsever is the tablet LEADER, send the FOLLOWER tablets to
// update their active_time in their CDCService Cache.
std::shared_ptr<tablet::TabletPeer> tablet_peer;
Status s = tablet_manager_->GetTabletPeer(tablet_id, &tablet_peer);
if (s.ok() && record.source_type == CDCSDK && IsTabletPeerLeader(tablet_peer)) {
auto status = impl_->CheckStreamActive(producer_tablet);
if (!status.ok()) {
// Inactive stream read from cdc_state table are not considered for the minimum
Expand All @@ -1758,6 +1791,7 @@ Result<std::shared_ptr<client::TableHandle>> CDCServiceImpl::GetCdcStateTable()
}
continue;
}
tablet_min_checkpoint_map[tablet_id].active_stream_list.insert(stream_id);
latest_active_time = impl_->GetLatestActiveTime(producer_tablet, *result);
}

Expand Down Expand Up @@ -2171,6 +2205,14 @@ Result<std::shared_ptr<client::TableHandle>> CDCServiceImpl::GetCdcStateTable()
cdc_sdk_op,
cdc_sdk_op_id_expiration);
RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);

if (req->stream_ids_size() > 0) {
for (int stream_idx = 0; stream_idx < req->stream_ids_size(); stream_idx++) {
ProducerTabletInfo producer_tablet = {
"" /* UUID */, req->stream_ids(stream_idx), req->tablet_ids(i)};
impl_->UpdateActiveTime(producer_tablet);
}
}
}
context.RespondSuccess();
}
Expand Down
3 changes: 3 additions & 0 deletions ent/src/yb/cdc/cdc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ struct TabletCDCCheckpointInfo {
OpId cdc_sdk_op_id = OpId::Invalid();
MonoDelta cdc_sdk_op_id_expiration = MonoDelta::kZero;
CoarseTimePoint cdc_sdk_most_active_time = CoarseTimePoint::min();
std::unordered_set<CDCStreamId> active_stream_list;
};

using TabletOpIdMap = std::unordered_map<TabletId, TabletCDCCheckpointInfo>;
Expand Down Expand Up @@ -118,6 +119,8 @@ class CDCServiceImpl : public CDCServiceIf {
GetCheckpointResponsePB* resp,
rpc::RpcContext rpc) override;

Result<TabletCheckpoint> TEST_GetTabletInfoFromCache(const ProducerTabletInfo& producer_tablet);

// Update peers in other tablet servers about the latest minimum applied cdc index for a specific
// tablet.
void UpdateCdcReplicatedIndex(const UpdateCdcReplicatedIndexRequestPB* req,
Expand Down
Loading

0 comments on commit 5fbaab5

Please sign in to comment.