Skip to content

Commit

Permalink
[BACKPORT 2.14][#13693] CDCSDK:Add last_active_time to cdc_state table
Browse files Browse the repository at this point in the history
Summary:
Original commit: 2b8a52b/D19201
When CDC Clients are not running for a long period of time, we don't want to retain intents as it may cause a resource utilization issue in a YugabyteDB cluster. We have a GFLAG cdc_intent_retention_ms (default value of 4 hours) that determines when can we mark a stream as inactive.

To determine if a stream is active, we track the timestamp of the GetChanges API call for the stream/tablet pair. We introduce last_active_time in the cdc_state table and store this value in the table against each stream/tablet pair. To improve performance, we also maintain an in-memory cache to ensure we don't write this into cdc_state table every time.

The TabletPeer tracks the 'min' OpId and the 'oldest' active time among all the 'active' streams. If all the streams are inactive, the min OpId is returned as OpId::max() to ensure all the intents are garbage collected.

This information is also sent to follower tablets along with min OpId so that they can also decide when to clear the intents.

Test Plan:
Jenkins: urgent
Existing cdcsdk test cases

Reviewers: srangavajjula, skumar, sdash

Reviewed By: skumar, sdash

Subscribers: bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D19346
  • Loading branch information
Adithya Bharadwaj committed Sep 7, 2022
1 parent 3f717c8 commit 827a369
Show file tree
Hide file tree
Showing 8 changed files with 514 additions and 242 deletions.
417 changes: 248 additions & 169 deletions ent/src/yb/cdc/cdc_service.cc

Large diffs are not rendered by default.

33 changes: 21 additions & 12 deletions ent/src/yb/cdc/cdc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ struct TabletCheckpoint {
// Timestamp at which the op ID was last updated.
CoarseTimePoint last_update_time;
// Timestamp at which stream polling happen.
CoarseTimePoint last_active_time;
int64_t last_active_time;

bool ExpiredAt(std::chrono::milliseconds duration, std::chrono::time_point<CoarseMonoClock> now) {
return !IsInitialized(last_update_time) || (now - last_update_time) >= duration;
Expand All @@ -85,11 +85,10 @@ struct TabletCDCCheckpointInfo {
OpId cdc_op_id = OpId::Max();
OpId cdc_sdk_op_id = OpId::Invalid();
MonoDelta cdc_sdk_op_id_expiration = MonoDelta::kZero;
CoarseTimePoint cdc_sdk_most_active_time = CoarseTimePoint::min();
std::unordered_set<CDCStreamId> active_stream_list;
int64_t cdc_sdk_latest_active_time = 0;
};

using TabletOpIdMap = std::unordered_map<TabletId, TabletCDCCheckpointInfo>;
using TabletIdCDCCheckpointMap = std::unordered_map<TabletId, TabletCDCCheckpointInfo>;
using TabletIdStreamIdSet = std::set<pair<TabletId, CDCStreamId>>;

class CDCServiceImpl : public CDCServiceIf {
Expand Down Expand Up @@ -188,6 +187,14 @@ class CDCServiceImpl : public CDCServiceIf {
template <class ReqType, class RespType>
bool CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc);

Status CheckStreamActive(
const ProducerTabletInfo& producer_tablet, const client::YBSessionPtr& session,
const int64_t& last_active_time_passed = 0);

Result<int64_t> GetLastActiveTime(
const ProducerTabletInfo& producer_tablet, const client::YBSessionPtr& session,
bool ignore_cache = false);

Result<OpId> GetLastCheckpoint(const ProducerTabletInfo& producer_tablet,
const client::YBSessionPtr& session);

Expand All @@ -198,12 +205,14 @@ class CDCServiceImpl : public CDCServiceIf {
Result<std::string> GetCdcStreamId(const ProducerTabletInfo& producer_tablet,
const std::shared_ptr<client::YBSession>& session);

Status UpdateCheckpoint(const ProducerTabletInfo& producer_tablet,
const OpId& sent_op_id,
const OpId& commit_op_id,
const client::YBSessionPtr& session,
uint64_t last_record_hybrid_time,
bool force_update = false);
Status UpdateCheckpointAndActiveTime(
const ProducerTabletInfo& producer_tablet,
const OpId& sent_op_id,
const OpId& commit_op_id,
const client::YBSessionPtr& session,
uint64_t last_record_hybrid_time,
const CDCRequestSource& request_source = CDCRequestSource::CDCSDK,
bool force_update = false);

Result<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> GetTablets(
const CDCStreamId& stream_id);
Expand Down Expand Up @@ -235,7 +244,7 @@ class CDCServiceImpl : public CDCServiceIf {
rpc::RpcContext* context,
const std::shared_ptr<tablet::TabletPeer>& peer);

void UpdateTabletPeersWithMinReplicatedIndex(TabletOpIdMap* tablet_min_checkpoint_map);
void UpdateTabletPeersWithMinReplicatedIndex(TabletIdCDCCheckpointMap* tablet_min_checkpoint_map);

Result<OpId> TabletLeaderLatestEntryOpId(const TabletId& tablet_id);

Expand Down Expand Up @@ -324,7 +333,7 @@ class CDCServiceImpl : public CDCServiceIf {
CreateCDCStreamResponsePB* resp,
CoarseTimePoint deadline);

Result<TabletOpIdMap> PopulateTabletCheckPointInfo(
Result<TabletIdCDCCheckpointMap> PopulateTabletCheckPointInfo(
const TabletId& input_tablet_id = "",
TabletIdStreamIdSet* tablet_stream_to_be_deleted = nullptr);

Expand Down
260 changes: 204 additions & 56 deletions ent/src/yb/integration-tests/cdcsdk_ysql-test.cc

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3410,6 +3410,14 @@ Status CatalogManager::CreateCDCStream(const CreateCDCStreamRequestPB* req,
cdc_table.AddStringColumnValue(req, master::kCdcCheckpoint, OpId().ToString());
cdc_table.AddTimestampColumnValue(
req, master::kCdcLastReplicationTime, GetCurrentTimeMicros());

if (id_type_option_value == cdc::kNamespaceId) {
// For cdcsdk cases, we also need to persist last_active_time in the 'cdc_state' table. We
// will store this info in the map in the 'kCdcData' column.
auto column_id = cdc_table.ColumnId(master::kCdcData);
cdc_table.AddMapColumnValue(req, column_id, "active_time", "0");
}

session->Apply(op);
}
// TODO(async_flush): https://github.com/yugabyte/yugabyte-db/issues/12173
Expand Down
21 changes: 21 additions & 0 deletions src/yb/client/table_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,27 @@ void TableHandle::AddCondition(QLConditionPB* const condition, const QLOperator
condition->add_operands()->mutable_condition()->set_op(op);
}

QLMapValuePB* TableHandle::AddMapColumnValue(
QLWriteRequestPB* req, const int32_t& column_id, const string& entry_key,
const string& entry_value) const {
auto column_value = req->add_column_values();
column_value->set_column_id(column_id);
QLMapValuePB* map_value = (column_value->mutable_expr()->mutable_value()->mutable_map_value());
QLValuePB* elem = map_value->add_keys();
elem->set_string_value(entry_key);
elem = map_value->add_values();
elem->set_string_value(entry_value);
return map_value;
}

void TableHandle::AddMapEntryToColumn(
QLMapValuePB* map_value_pb, const string& entry_key, const string& entry_value) const {
QLValuePB* elem = map_value_pb->add_keys();
elem->set_string_value(entry_key);
elem = map_value_pb->add_values();
elem->set_string_value(entry_value);
}

void TableHandle::AddColumns(const std::vector<std::string>& columns, QLReadRequestPB* req) const {
QLRSRowDescPB* rsrow_desc = req->mutable_rsrow_desc();
for (const auto& column : columns) {
Expand Down
7 changes: 7 additions & 0 deletions src/yb/client/table_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ class TableHandle {
// E.g. Add <EXISTS> under "... AND <EXISTS>".
void AddCondition(QLConditionPB *const condition, const QLOperator op) const;

QLMapValuePB* AddMapColumnValue(
QLWriteRequestPB* req, const int32_t& column_id, const string& entry_key,
const string& entry_value) const;

void AddMapEntryToColumn(
QLMapValuePB* map_value_pb, const string& entry_key, const string& entry_value) const;

void AddColumns(const std::vector<std::string>& columns, QLReadRequestPB* req) const;

const YBTablePtr& table() const {
Expand Down
8 changes: 4 additions & 4 deletions src/yb/tablet/tablet_peer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1054,14 +1054,13 @@ Status TabletPeer::SetCDCSDKRetainOpIdAndTime(
return Status::OK();
}

Result<MonoDelta> TabletPeer::GetCDCSDKIntentRetainTime(
const CoarseTimePoint& cdc_sdk_latest_active_time) {
Result<MonoDelta> TabletPeer::GetCDCSDKIntentRetainTime(const int64_t& cdc_sdk_latest_active_time) {
MonoDelta cdc_sdk_intent_retention = MonoDelta::kZero;
// If cdc_sdk_latest_update_time is not updated to default CoarseTimePoint::min() value,
// It's mean that, no need to retain the intents. This can happen in below case:-
// a. Only XCluster streams are defined for the tablet.
// b. CDCSDK stream for the tablet is expired.
if (cdc_sdk_latest_active_time == CoarseTimePoint::min()) {
if (cdc_sdk_latest_active_time == 0) {
return cdc_sdk_intent_retention;
}

Expand All @@ -1072,7 +1071,8 @@ Result<MonoDelta> TabletPeer::GetCDCSDKIntentRetainTime(
// all the FOLLOWERs as their cdc_sdk_min_checkpoint_op_id_expiration_.
MonoDelta max_retain_time =
MonoDelta::FromMilliseconds(GetAtomicFlag(&FLAGS_cdc_intent_retention_ms));
MonoDelta lastest_active_time(CoarseMonoClock::Now() - cdc_sdk_latest_active_time);
auto lastest_active_time =
MonoDelta::FromMicroseconds(GetCurrentTimeMicros() - cdc_sdk_latest_active_time);
if (max_retain_time >= lastest_active_time) {
cdc_sdk_intent_retention = max_retain_time - lastest_active_time;
}
Expand Down
2 changes: 1 addition & 1 deletion src/yb/tablet/tablet_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ class TabletPeer : public consensus::ConsensusContext,
Status SetCDCSDKRetainOpIdAndTime(
const OpId& cdc_sdk_op_id, const MonoDelta& cdc_sdk_op_id_expiration);

Result<MonoDelta> GetCDCSDKIntentRetainTime(const CoarseTimePoint& cdc_sdk_latest_active_time);
Result<MonoDelta> GetCDCSDKIntentRetainTime(const int64_t& cdc_sdk_latest_active_time);

OpId GetLatestCheckPoint();

Expand Down

0 comments on commit 827a369

Please sign in to comment.