Skip to content

Commit

Permalink
[#5251] Only Start Metrics Thread After Creating First Replication St…
Browse files Browse the repository at this point in the history
…ream

Summary: Create a flag on master heartbeat response for cdc_enabled. Set to true the first time we create the cdc_state table to tell the tservers to start the metrics update thread.

Test Plan: Added sleep to beginning of existing metrics tests and ensure collecting metrics is disabled.

Reviewers: bogdan, nicolas

Reviewed By: nicolas

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D9195
  • Loading branch information
rahuldesirazu committed Jan 28, 2021
1 parent 9328454 commit 7852d54
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 5 deletions.
27 changes: 22 additions & 5 deletions ent/src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ DEFINE_int32(update_min_cdc_indices_interval_secs, 60,
DEFINE_int32(update_metrics_interval_ms, 1000,
"How often to update xDC cluster metrics.");

DEFINE_bool(enable_collect_cdc_metrics, false, "Enable collecting cdc metrics.");
DEFINE_bool(enable_collect_cdc_metrics, true, "Enable collecting cdc metrics.");

DECLARE_bool(enable_log_retention_by_op_idx);

Expand Down Expand Up @@ -317,6 +317,9 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,
Status s = CheckTabletValidForStream(producer_tablet);
RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);

// Since GetChanges is called for a valid stream, mark cdc as enabled.
cdc_enabled_.store(true, std::memory_order_release);

std::shared_ptr<tablet::TabletPeer> tablet_peer;
s = tablet_manager_->GetTabletPeer(req->tablet_id(), &tablet_peer);
auto original_leader_term = tablet_peer ? tablet_peer->LeaderTerm() : OpId::kUnknownTerm;
Expand Down Expand Up @@ -553,6 +556,18 @@ void CDCServiceImpl::UpdateLagMetrics() {
}
}

bool CDCServiceImpl::ShouldUpdateLagMetrics(MonoTime time_since_update_metrics) {
// Only update metrics if cdc is enabled, which means we have a valid replication stream.
return GetAtomicFlag(&FLAGS_enable_collect_cdc_metrics) &&
(time_since_update_metrics == MonoTime::kUninitialized ||
MonoTime::Now() - time_since_update_metrics >=
MonoDelta::FromMilliseconds(GetAtomicFlag(&FLAGS_update_metrics_interval_ms)));
}

bool CDCServiceImpl::CDCEnabled() {
return cdc_enabled_.load(std::memory_order_acquire);
}

MicrosTime CDCServiceImpl::GetLastReplicatedTime(
const std::shared_ptr<tablet::TabletPeer>& tablet_peer) {
yb::tablet::RemoveIntentsData data;
Expand All @@ -576,10 +591,12 @@ void CDCServiceImpl::UpdatePeersAndMetrics() {
};

do {
if (ANNOTATE_UNPROTECTED_READ(FLAGS_enable_collect_cdc_metrics) &&
(time_since_update_metrics == MonoTime::kUninitialized ||
MonoTime::Now() - time_since_update_metrics >=
MonoDelta::FromMilliseconds(GetAtomicFlag(&FLAGS_update_metrics_interval_ms)))) {
if (!cdc_enabled_.load(std::memory_order_acquire)) {
// Have not yet received any GetChanges requests, so skip background thread work.
continue;
}
// Always update lag metrics, default every 1s.
if (ShouldUpdateLagMetrics(time_since_update_metrics)) {
UpdateLagMetrics();
time_since_update_metrics = MonoTime::Now();
}
Expand Down
9 changes: 9 additions & 0 deletions ent/src/yb/cdc/cdc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ class CDCServiceImpl : public CDCServiceIf {
return server_metrics_;
}

// Returns true if this server has received a GetChanges call.
bool CDCEnabled();

private:
template <class ReqType, class RespType>
bool CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc);
Expand Down Expand Up @@ -177,6 +180,8 @@ class CDCServiceImpl : public CDCServiceIf {

MicrosTime GetLastReplicatedTime(const std::shared_ptr<tablet::TabletPeer>& tablet_peer);

bool ShouldUpdateLagMetrics(MonoTime time_since_update_metrics);

yb::rpc::Rpcs rpcs_;

tserver::TSTabletManager* tablet_manager_;
Expand Down Expand Up @@ -268,6 +273,10 @@ class CDCServiceImpl : public CDCServiceIf {
// True when this service is stopped. Used to inform
// get_minimum_checkpoints_and_update_peers_thread_ that it should exit.
std::atomic<bool> cdc_service_stopped_{false};

// True when this service has received a GetChanges request on a valid replication stream.
std::atomic<bool> cdc_enabled_{false};

};

} // namespace cdc
Expand Down
17 changes: 17 additions & 0 deletions ent/src/yb/integration-tests/cdc_service-int-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ class CDCServiceTest : public YBMiniClusterTestBase<MiniCluster>,
TableHandle table_;
};

INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTest, ::testing::Bool());

void CDCServiceTest::CreateTable(int num_tablets, TableHandle* table) {
ASSERT_OK(client_->CreateNamespaceIfNotExists(kTableName.namespace_name(),
kTableName.namespace_type()));
Expand Down Expand Up @@ -742,6 +744,9 @@ class CDCServiceTestMultipleServersOneTablet : public CDCServiceTest {
virtual int tablet_count() override { return 1; }
};

INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTestMultipleServersOneTablet,
::testing::Bool());

TEST_P(CDCServiceTestMultipleServersOneTablet, TestUpdateLagMetrics) {
CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);
Expand Down Expand Up @@ -905,6 +910,8 @@ class CDCServiceTestMultipleServers : public CDCServiceTest {
virtual int tablet_count() override { return 4; }
};

INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTestMultipleServers, ::testing::Bool());

TEST_P(CDCServiceTestMultipleServers, TestListTablets) {
CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);
Expand Down Expand Up @@ -1333,6 +1340,8 @@ class CDCServiceTestMaxRentionTime : public CDCServiceTest {
const int kMaxSecondsToRetain = 30;
};

INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTestMaxRentionTime, ::testing::Bool());

TEST_P(CDCServiceTestMaxRentionTime, TestLogRetentionByOpId_MaxRentionTime) {
CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);
Expand Down Expand Up @@ -1399,6 +1408,9 @@ class CDCServiceTestDurableMinReplicatedIndex : public CDCServiceTest {
}
};

INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTestDurableMinReplicatedIndex,
::testing::Bool());

TEST_P(CDCServiceTestDurableMinReplicatedIndex, TestLogCDCMinReplicatedIndexIsDurable) {
CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);
Expand Down Expand Up @@ -1524,6 +1536,8 @@ class CDCLogAndMetaIndex : public CDCServiceTest {
}
};

INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCLogAndMetaIndex, ::testing::Bool());

TEST_P(CDCLogAndMetaIndex, TestLogAndMetaCdcIndex) {
constexpr int kNStreams = 5;

Expand Down Expand Up @@ -1582,6 +1596,8 @@ class CDCLogAndMetaIndexReset : public CDCLogAndMetaIndex {
}
};

INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCLogAndMetaIndexReset, ::testing::Bool());

// Test that when all the streams for a specific tablet have been deleted, the log and meta
// cdc min replicated index is reset to max int64.
TEST_P(CDCLogAndMetaIndexReset, TestLogAndMetaCdcIndexAreReset) {
Expand Down Expand Up @@ -1683,6 +1699,7 @@ class CDCServiceTestThreeServers : public CDCServiceTest {
void GetFirstTabletIdAndLeaderPeer(TabletId* tablet_id, int* leader_idx, int timeout_secs);
};

INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTestThreeServers, ::testing::Bool());

// Sometimes leadership takes a while. Keep retrying until timeout_secs seconds have elapsed.
void CDCServiceTestThreeServers::GetFirstTabletIdAndLeaderPeer(TabletId* tablet_id,
Expand Down

0 comments on commit 7852d54

Please sign in to comment.