Skip to content

Commit

Permalink
[#6305] Adaptive Heartbeat Reporting
Browse files Browse the repository at this point in the history
Summary:
Currently, we send all tablets that changed on the TServer to the Master within a single heartbeat.
This can get extremely large and processing it on the server can exceed the Heartbeat timeout on
large clusters, causing server restarts and further instability.  We want to limit heartbeat timeouts
to more severe problems, like resource issues and software deadlocks. To
be more flexible:

1: The TS negotiates a limit on the max number of tablets included in a single heartbeat.  With a
fixed size, we can now bound the amount of processing in a single RPC.

2: Honor deadline on Master.  After every batch of tablet heartbeat processing, the Master checks to
see if it is close to the deadline and exits early if so.  The heartbeat response only includes the
processed tablets so the TS knows what information to send on the next heartbeat.

Test Plan:
MultiHeartbeat/CreateMultiHBTableStressTest.CreateAndDeleteBigTable/1
MultiHeartbeat/CreateMultiHBTableStressTest.RestartServersAfterCreation/1
CreateSmallHBTableStressTest.TestRestartMasterDuringFullHeartbeat
CreateTableStressTest.TestHeartbeatDeadline
TsTabletManagerTest.TestTabletReportLimit
TsTabletManagerTest.TestTabletReports
ClientTest.Capability

Reviewers: timur, amitanand, rahuldesirazu, bogdan

Reviewed By: bogdan

Subscribers: kannan, sergei, ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D9974
  • Loading branch information
nspiegelberg committed Dec 12, 2020
1 parent 978dd14 commit 8bac374
Show file tree
Hide file tree
Showing 14 changed files with 537 additions and 154 deletions.
5 changes: 5 additions & 0 deletions src/yb/client/client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ DECLARE_int32(max_backoff_ms_exponent);
METRIC_DECLARE_counter(rpcs_queue_overflow);

DEFINE_CAPABILITY(ClientTest, 0x1523c5ae);
DECLARE_CAPABILITY(TabletReportLimit);

using namespace std::literals; // NOLINT
using namespace std::placeholders;
Expand Down Expand Up @@ -2083,6 +2084,10 @@ TEST_F(ClientTest, Capability) {

// Check that fake capability is not reported.
ASSERT_FALSE(replica->HasCapability(kFakeCapability));

// This capability is defined on the TServer, passed to the Master on registration,
// then propagated to the YBClient. Ensure that this runtime pipeline holds.
ASSERT_TRUE(replica->HasCapability(CAPABILITY_TabletReportLimit));
}
}

Expand Down
187 changes: 173 additions & 14 deletions src/yb/integration-tests/create-table-stress-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "yb/rpc/rpc_test_util.h"
#include "yb/tserver/mini_tablet_server.h"
#include "yb/tserver/tablet_server.h"
#include "yb/tserver/ts_tablet_manager.h"
#include "yb/util/hdr_histogram.h"
#include "yb/util/metrics.h"
#include "yb/util/spinlock_profiling.h"
Expand All @@ -81,6 +82,11 @@ DECLARE_bool(log_preallocate_segments);
DECLARE_bool(TEST_enable_remote_bootstrap);
DECLARE_int32(tserver_unresponsive_timeout_ms);
DECLARE_int32(max_create_tablets_per_ts);
DECLARE_int32(tablet_report_limit);
DECLARE_uint64(TEST_inject_latency_during_tablet_report_ms);
DECLARE_int32(heartbeat_rpc_timeout_ms);
DECLARE_int32(catalog_manager_report_batch_size);
DECLARE_int32(tablet_report_limit);

DEFINE_int32(num_test_tablets, 60, "Number of tablets for stress test");
DEFINE_int32(benchmark_runtime_secs, 5, "Number of seconds to run the benchmark");
Expand Down Expand Up @@ -253,9 +259,49 @@ TEST_F(CreateTableStressTest, GetTableLocationsBenchmark) {
hist->histogram()->DumpHumanReadable(&LOG(INFO));
}

TEST_F(CreateTableStressTest, CreateAndDeleteBigTable) {
class CreateMultiHBTableStressTest : public CreateTableStressTest,
public testing::WithParamInterface<bool /* is_multiHb */> {
void SetUp() override {
// "MultiHB" Tables are too large to be reported in a single heartbeat from a TS.
// Setup so all 3 TS will have to break tablet report updates into multiple chunks.
bool is_multiHb = GetParam();
if (is_multiHb) {
// 90 Tablets * 3 TS < 300 Tablets
FLAGS_tablet_report_limit = 90;
FLAGS_num_test_tablets = 300;
FLAGS_max_create_tablets_per_ts = FLAGS_num_test_tablets;
// 1000 ms deadline / 20 ms wait/batch ~= 40 Tablets processed before Master hits deadline
FLAGS_TEST_inject_latency_during_tablet_report_ms = 20;
FLAGS_heartbeat_rpc_timeout_ms = 1000;
FLAGS_catalog_manager_report_batch_size = 1;
}
CreateTableStressTest::SetUp();
}
};
INSTANTIATE_TEST_CASE_P(MultiHeartbeat, CreateMultiHBTableStressTest, ::testing::Bool());

// Replaces itest version, which requires an External Mini Cluster.
Status ListRunningTabletIds(std::shared_ptr<tserver::TabletServerServiceProxy> ts_proxy,
const MonoDelta& timeout,
std::vector<string>* tablet_ids) {
tserver::ListTabletsRequestPB req;
tserver::ListTabletsResponsePB resp;
RpcController rpc;
rpc.set_timeout(timeout);

RETURN_NOT_OK(ts_proxy->ListTablets(req, &resp, &rpc));
tablet_ids->clear();
for (const auto& t : resp.status_and_schema()) {
if (t.tablet_status().state() == tablet::RUNNING) {
tablet_ids->push_back(t.tablet_status().tablet_id());
}
}
return Status::OK();
}

TEST_P(CreateMultiHBTableStressTest, CreateAndDeleteBigTable) {
DontVerifyClusterBeforeNextTearDown();
if (!AllowSlowTests()) {
if (IsSanitizer()) {
LOG(INFO) << "Skipping slow test";
return;
}
Expand All @@ -271,30 +317,41 @@ TEST_F(CreateTableStressTest, CreateAndDeleteBigTable) {
std::cout << "CatalogManager state:\n";
cluster_->mini_master()->master()->catalog_manager()->DumpState(&std::cerr);

// Store all relevant tablets for this big table we've created.
std::vector<string> big_table_tablets;
for (const auto & loc : resp.tablet_locations()) {
big_table_tablets.push_back(loc.tablet_id());
}
std::sort(big_table_tablets.begin(), big_table_tablets.end());

LOG(INFO) << "Deleting table...";
ASSERT_OK(client_->DeleteTable(table_name));

// The actual removal of the tablets is asynchronous, so we loop for a bit
// waiting for them to get removed.
LOG(INFO) << "Waiting for tablets to be removed";
vector<string> tablet_ids;
LOG(INFO) << "Waiting for tablets to be removed on TS#1";
std::vector<string> big_tablet_left, tablet_ids;
auto ts_proxy = cluster_->mini_tablet_server(0)->server()->proxy();
for (int i = 0; i < 1000; i++) {
ASSERT_OK(itest::ListRunningTabletIds(ts_map_.begin()->second.get(),
MonoDelta::FromSeconds(10),
&tablet_ids));
if (tablet_ids.empty()) break;
ASSERT_OK(ListRunningTabletIds(ts_proxy, 10s, &tablet_ids));
std::sort(tablet_ids.begin(), tablet_ids.end());
big_tablet_left.clear();
std::set_intersection(big_table_tablets.begin(), big_table_tablets.end(),
tablet_ids.begin(), tablet_ids.end(),
big_tablet_left.begin());
if (big_tablet_left.empty()) return;
SleepFor(MonoDelta::FromMilliseconds(100));
}
ASSERT_TRUE(tablet_ids.empty()) << "Tablets remained: " << tablet_ids;
ASSERT_TRUE(big_tablet_left.empty()) << "Tablets remaining: " << big_tablet_left.size()
<< " : " << big_tablet_left;
}

TEST_F(CreateTableStressTest, RestartMasterDuringCreation) {
if (!AllowSlowTests()) {
TEST_P(CreateMultiHBTableStressTest, RestartServersAfterCreation) {
DontVerifyClusterBeforeNextTearDown();
if (IsSanitizer()) {
LOG(INFO) << "Skipping slow test";
DontVerifyClusterBeforeNextTearDown();
return;
}

YBTableName table_name(YQL_DATABASE_CQL, "my_keyspace", "test_table");
ASSERT_NO_FATALS(CreateBigTable(table_name, FLAGS_num_test_tablets));

Expand All @@ -307,6 +364,9 @@ TEST_F(CreateTableStressTest, RestartMasterDuringCreation) {
LOG(INFO) << "Master restarted.";
}

// Restart TS#2, which forces a full tablet report on TS #2 and incremental updates on the others.
ASSERT_OK(cluster_->mini_tablet_server(1)->Restart());

master::GetTableLocationsResponsePB resp;
Status s = WaitForRunningTabletCount(cluster_->mini_master(), table_name,
FLAGS_num_test_tablets, &resp);
Expand All @@ -316,8 +376,107 @@ TEST_F(CreateTableStressTest, RestartMasterDuringCreation) {
}
}

TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
class CreateSmallHBTableStressTest : public CreateTableStressTest {
void SetUp() override {
// 40 / 3 ~= 13 tablets / server. 2 / report >= 7 reports to finish a heartbeat
FLAGS_tablet_report_limit = 2;
FLAGS_num_test_tablets = 40;
FLAGS_max_create_tablets_per_ts = FLAGS_num_test_tablets;

CreateTableStressTest::SetUp();
}
};
TEST_F(CreateSmallHBTableStressTest, TestRestartMasterDuringFullHeartbeat) {
DontVerifyClusterBeforeNextTearDown();
if (IsSanitizer()) {
LOG(INFO) << "Skipping slow test";
return;
}
YBTableName table_name(YQL_DATABASE_CQL, "my_keyspace", "test_table");
ASSERT_NO_FATALS(CreateBigTable(table_name, FLAGS_num_test_tablets));

// 100 ms wait / tablet >= 1.3 sec to receive a full report
FLAGS_TEST_inject_latency_during_tablet_report_ms = 100;
FLAGS_catalog_manager_report_batch_size = 1;

// Restart Master #1. Triggers Full Report from all TServers.
ASSERT_OK(cluster_->mini_master()->Restart());
ASSERT_OK(cluster_->mini_master()->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests());

// Wait until the Master is ~25% complete with getting the heartbeats from the TS.
master::GetTableLocationsResponsePB resp;
ASSERT_OK(WaitForRunningTabletCount(cluster_->mini_master(), table_name,
FLAGS_num_test_tablets / 4, &resp));
ASSERT_LT(resp.tablet_locations_size(), FLAGS_num_test_tablets / 2);
LOG(INFO) << "Resetting Master after seeing table count: " << resp.tablet_locations_size();

// Restart Master #2. Re-triggers a Full Report from all TServers, even though they were in the
// middle of sending a full report to the old master.
ASSERT_OK(cluster_->mini_master()->Restart());
ASSERT_OK(cluster_->mini_master()->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests());

// Speed up the test now...
FLAGS_TEST_inject_latency_during_tablet_report_ms = 0;

// The TS should send a full report. If they just sent the remainder from their original
// Full Report, this test will fail.
Status s = WaitForRunningTabletCount(cluster_->mini_master(), table_name,
FLAGS_num_test_tablets, &resp);
if (!s.ok()) {
cluster_->mini_master()->master()->catalog_manager()->DumpState(&std::cerr);
CHECK_OK(s);
}
}

TEST_F(CreateTableStressTest, TestHeartbeatDeadline) {
DontVerifyClusterBeforeNextTearDown();

// 500ms deadline / 50 ms wait ~= 10 Tablets processed before Master hits deadline
FLAGS_catalog_manager_report_batch_size = 1;
FLAGS_TEST_inject_latency_during_tablet_report_ms = 50;
FLAGS_heartbeat_rpc_timeout_ms = 500;
FLAGS_num_test_tablets = 60;

// Create a Table with 60 tablets, so ~20 per TS.
YBTableName table_name(YQL_DATABASE_CQL, "my_keyspace", "test_table");
ASSERT_NO_FATALS(CreateBigTable(table_name, FLAGS_num_test_tablets));
master::GetTableLocationsResponsePB resp;
ASSERT_OK(WaitForRunningTabletCount(cluster_->mini_master(), table_name,
FLAGS_num_test_tablets, &resp));

// Grab TS#1 and Generate a Full Report for it.
auto ts_server = cluster_->mini_tablet_server(0)->server();
master::TSHeartbeatRequestPB hb_req;
hb_req.mutable_common()->mutable_ts_instance()->CopyFrom(ts_server->instance_pb());
ts_server->tablet_manager()->StartFullTabletReport(hb_req.mutable_tablet_report());
ASSERT_GT(hb_req.tablet_report().updated_tablets_size(),
FLAGS_heartbeat_rpc_timeout_ms / FLAGS_TEST_inject_latency_during_tablet_report_ms);
ASSERT_EQ(ts_server->tablet_manager()->GetReportLimit(), FLAGS_tablet_report_limit);
ASSERT_LE(hb_req.tablet_report().updated_tablets_size(), FLAGS_tablet_report_limit);

// Grab Master and Process this Tablet Report.
// This should go over the deadline and get truncated.
master::TSHeartbeatResponsePB hb_resp;
hb_req.mutable_tablet_report()->set_is_incremental(true);
hb_req.mutable_tablet_report()->set_sequence_number(1);
Status heartbeat_status;
// Regression testbed often has stalls at this timing granularity. Allow a couple hiccups.
for (int tries = 0; tries < 3; ++tries) {
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms));
heartbeat_status = master_proxy_->TSHeartbeat(hb_req, &hb_resp, &rpc);
if (heartbeat_status.ok()) break;
ASSERT_TRUE(heartbeat_status.IsTimedOut());
}
ASSERT_OK(heartbeat_status);
ASSERT_TRUE(hb_resp.tablet_report().processing_truncated());
ASSERT_LE(hb_resp.tablet_report().tablets_size(),
FLAGS_heartbeat_rpc_timeout_ms / FLAGS_TEST_inject_latency_during_tablet_report_ms);
}


TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
DontVerifyClusterBeforeNextTearDown();
if (!AllowSlowTests()) {
LOG(INFO) << "Skipping slow test";
return;
Expand Down
2 changes: 1 addition & 1 deletion src/yb/integration-tests/ts_tablet_manager-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) {

// Ensure that our tablet reports are consistent.
TabletReportPB report;
tablet_manager->GenerateIncrementalTabletReport(&report);
tablet_manager->GenerateTabletReport(&report);
ASSERT_EQ(1, report.updated_tablets_size()) << "Wrong report size:\n" << report.DebugString();
ReportedTabletPB reported_tablet = report.updated_tablets(0);
ASSERT_TRUE(reported_tablet.has_committed_consensus_state());
Expand Down
Loading

0 comments on commit 8bac374

Please sign in to comment.