Skip to content

Commit

Permalink
[catalog_manager] introduce --min_num_replicas flag
Browse files Browse the repository at this point in the history
This patch introduces a flag for kudu-master to enforce the minimum
number of replicas for newly created tables in a cluster.  Unless
overridden, --min_num_replicas is set to 1 by default.

For example, setting --min_num_replicas=3 enforces every new table to
have at least 3 replicas for each of its tablets, so there cannot be
a data loss when a single tablet server in the cluster fails
irrecoverably.

I also added validators for the related flags to keep things more
consistent and report misconfiguration as early as possible.

This patch contains a few test scenarios to cover the newly introduced
functionality.

Change-Id: I86191fcdc1b4ed6670f33ba7176d28dbd1df541f
Reviewed-on: http://gerrit.cloudera.org:8080/17684
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Mahesh Reddy <[email protected]>
Reviewed-by: Attila Bukor <[email protected]>
  • Loading branch information
alexeyserbin committed Jul 16, 2021
1 parent 35b5664 commit 6cb1548
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 57 deletions.
140 changes: 117 additions & 23 deletions src/kudu/client/client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ DECLARE_int32(log_inject_latency_ms_stddev);
DECLARE_int32(master_inject_latency_on_tablet_lookups_ms);
DECLARE_int32(max_column_comment_length);
DECLARE_int32(max_create_tablets_per_ts);
DECLARE_int32(max_num_replicas);
DECLARE_int32(max_table_comment_length);
DECLARE_int32(min_num_replicas);
DECLARE_int32(raft_heartbeat_interval_ms);
DECLARE_int32(scanner_batch_size_rows);
DECLARE_int32(scanner_gc_check_interval_us);
Expand Down Expand Up @@ -5349,29 +5351,6 @@ TEST_F(ClientTest, TestCreateTableWithTooManyTablets) {
"maximum permitted at creation time (3)");
}

// Tests for too many replicas, too few replicas, even replica count, etc.
TEST_F(ClientTest, TestCreateTableWithBadNumReplicas) {
const vector<pair<int, string>> cases = {
{3, "not enough live tablet servers to create a table with the requested "
"replication factor 3; 1 tablet servers are alive"},
{2, "illegal replication factor 2 (replication factor must be odd)"},
{-1, "illegal replication factor -1 (replication factor must be positive)"},
{11, "illegal replication factor 11 (max replication factor is 7)"}
};

for (const auto& c : cases) {
SCOPED_TRACE(Substitute("num_replicas=$0", c.first));
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
Status s = table_creator->table_name("foobar")
.schema(&schema_)
.set_range_partition_columns({ "key" })
.num_replicas(c.first)
.Create();
EXPECT_TRUE(s.IsInvalidArgument());
ASSERT_STR_CONTAINS(s.ToString(), c.second);
}
}

TEST_F(ClientTest, TestCreateTableWithInvalidEncodings) {
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
KuduSchema schema;
Expand Down Expand Up @@ -8527,5 +8506,120 @@ TEST_F(ClientTest, WriteWhileRestartingMultipleTabletServers) {
}
}

class ReplicationFactorLimitsTest : public ClientTest {
public:
static constexpr const char* const kTableName = "replication_limits";

void SetUp() override {
// Reduce the TS<->Master heartbeat interval to speed up testing.
FLAGS_heartbeat_interval_ms = 10;

// Set RF-related flags.
FLAGS_min_num_replicas = 3;
FLAGS_max_num_replicas = 5;

KuduTest::SetUp();

// Start minicluster and wait for tablet servers to connect to master.
InternalMiniClusterOptions options;
options.num_tablet_servers = 7;
cluster_.reset(new InternalMiniCluster(env_, std::move(options)));
ASSERT_OK(cluster_->StartSync());
ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
}
};

TEST_F(ReplicationFactorLimitsTest, MinReplicationFactor) {
// Creating table with number of replicas equal to --min_num_replicas should
// succeed.
{
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kTableName)
.schema(&schema_)
.add_hash_partitions({ "key" }, 2)
.num_replicas(3)
.Create());
}

// An attempt to create a table with replication factor less than
// the specified by --min_num_replicas should fail.
for (auto rf : { -1, 0, 1, 2 }) {
SCOPED_TRACE(Substitute("replication factor $0", rf));
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
const auto s = table_creator->table_name(kTableName)
.schema(&schema_)
.add_hash_partitions({ "key" }, 2)
.num_replicas(rf)
.Create();
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "illegal replication factor");
ASSERT_STR_CONTAINS(s.ToString(), "minimum allowed replication factor is 3");
}

// Test a couple of other cases: even number of replicas when in [min, max]
// range and an attempt to create a table with number of replicas more than
// the number of tablet servers currently alive in the cluster.
{
FLAGS_min_num_replicas = 1;
const vector<pair<int, string>> cases = {
{2, "illegal replication factor 2: replication factor must be odd"},
{3, "not enough live tablet servers to create a table with the requested "
"replication factor 3; 1 tablet servers are alive"},
};

for (auto i = 1; i < cluster_->num_tablet_servers(); ++i) {
cluster_->mini_tablet_server(i)->Shutdown();
}
// Restart masters so only the alive tablet servers: that's a faster way
// to update tablet servers' liveliness status in the master's registry.
for (auto i = 0; i < cluster_->num_masters(); ++i) {
cluster_->mini_master(i)->Shutdown();
ASSERT_OK(cluster_->mini_master(i)->Restart());
}

SleepFor(MonoDelta::FromMilliseconds(3 * FLAGS_heartbeat_interval_ms));

for (const auto& c : cases) {
SCOPED_TRACE(Substitute("num_replicas=$0", c.first));
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
Status s = table_creator->table_name("foobar")
.schema(&schema_)
.set_range_partition_columns({ "key" })
.num_replicas(c.first)
.Create();
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), c.second);
}
}
}

TEST_F(ReplicationFactorLimitsTest, MaxReplicationFactor) {
// Creating table with number of replicas equal to --max_num_replicas should
// succeed.
{
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kTableName)
.schema(&schema_)
.add_hash_partitions({ "key" }, 2)
.num_replicas(5)
.Create());
}

// An attempt to create a table with replication factor greater than
// the specified by --max_num_replicas should fail.
for (auto rf : { 6, 7 }) {
SCOPED_TRACE(Substitute("replication factor $0", rf));
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
const auto s = table_creator->table_name(kTableName)
.schema(&schema_)
.add_hash_partitions({ "key" }, 2)
.num_replicas(rf)
.Create();
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "illegal replication factor");
ASSERT_STR_CONTAINS(s.ToString(), "maximum allowed replication factor is 5");
}
}

} // namespace client
} // namespace kudu
140 changes: 107 additions & 33 deletions src/kudu/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,23 @@ TAG_FLAG(unresponsive_ts_rpc_timeout_ms, advanced);
DEFINE_int32(default_num_replicas, 3,
"Default number of replicas for tables that do not have the num_replicas set.");
TAG_FLAG(default_num_replicas, advanced);
TAG_FLAG(default_num_replicas, runtime);

DEFINE_int32(max_num_replicas, 7,
"Maximum number of replicas that may be specified for a table.");
// Tag as unsafe since we have done very limited testing of higher than 5 replicas.
TAG_FLAG(max_num_replicas, unsafe);
TAG_FLAG(max_num_replicas, runtime);

DEFINE_int32(min_num_replicas, 1,
"Minimum number of replicas that may be specified when creating "
"a table: this is to enforce the minimum replication factor for "
"tables created in a Kudu cluster. For example, setting this flag "
"to 3 enforces every new table to have at least 3 replicas for "
"each of its tablets, so there cannot be a data loss when a "
"single tablet server fails irrecoverably.");
TAG_FLAG(min_num_replicas, advanced);
TAG_FLAG(min_num_replicas, runtime);

DEFINE_int32(max_num_columns, 300,
"Maximum number of columns that may be in a table.");
Expand Down Expand Up @@ -210,6 +222,7 @@ TAG_FLAG(max_identifier_length, unsafe);
DEFINE_bool(allow_unsafe_replication_factor, false,
"Allow creating tables with even replication factor.");
TAG_FLAG(allow_unsafe_replication_factor, unsafe);
TAG_FLAG(allow_unsafe_replication_factor, runtime);

DEFINE_int32(catalog_manager_bg_task_wait_ms, 1000,
"Amount of time the catalog manager background task thread waits "
Expand Down Expand Up @@ -360,24 +373,9 @@ TAG_FLAG(table_write_limit_ratio, experimental);

DECLARE_bool(raft_prepare_replacement_before_eviction);
DECLARE_int64(tsk_rotation_seconds);

METRIC_DEFINE_entity(table);

DECLARE_string(ranger_config_path);

// Validates that if auto-rebalancing is enabled, the cluster uses 3-4-3 replication
// (the --raft_prepare_replacement_before_eviction flag must be set to true).
static bool Validate343SchemeEnabledForAutoRebalancing() {
if (FLAGS_auto_rebalancing_enabled &&
!FLAGS_raft_prepare_replacement_before_eviction) {
LOG(ERROR) << "If enabling auto-rebalancing, Kudu must be configured"
" with --raft_prepare_replacement_before_eviction.";
return false;
}
return true;
}
GROUP_FLAG_VALIDATOR(auto_rebalancing_flags,
Validate343SchemeEnabledForAutoRebalancing);
METRIC_DEFINE_entity(table);

using base::subtle::NoBarrier_CompareAndSwap;
using base::subtle::NoBarrier_Load;
Expand Down Expand Up @@ -422,10 +420,9 @@ using std::unordered_set;
using std::vector;
using strings::Substitute;

namespace kudu {
namespace master {
namespace {

static bool ValidateTableWriteLimitRatio(const char* flagname, double value) {
bool ValidateTableWriteLimitRatio(const char* flagname, double value) {
if (value > 1.0) {
LOG(ERROR) << Substitute("$0 must be less than or equal to 1.0, value $1 is invalid.",
flagname, value);
Expand All @@ -439,7 +436,7 @@ static bool ValidateTableWriteLimitRatio(const char* flagname, double value) {
}
DEFINE_validator(table_write_limit_ratio, &ValidateTableWriteLimitRatio);

static bool ValidateTableLimit(const char* flag, int64_t limit) {
bool ValidateTableLimit(const char* flag, int64_t limit) {
if (limit != -1 && limit < 0) {
LOG(ERROR) << Substitute("$0 must be greater than or equal to -1, "
"$1 is invalid", flag, limit);
Expand All @@ -449,10 +446,85 @@ static bool ValidateTableLimit(const char* flag, int64_t limit) {
}
DEFINE_validator(table_disk_size_limit, &ValidateTableLimit);
DEFINE_validator(table_row_count_limit, &ValidateTableLimit);

bool ValidateMinNumReplicas(const char* flagname, int value) {
if (value < 1) {
LOG(ERROR) << Substitute(
"$0: invalid value for flag $1; must be at least 1", value, flagname);
return false;
}
return true;
}
DEFINE_validator(min_num_replicas, &ValidateMinNumReplicas);

// Validate that if the auto-rebalancing is enabled, the cluster uses the 3-4-3
// replication scheme: the --raft_prepare_replacement_before_eviction flag
// must be set to 'true'.
bool Validate343SchemeEnabledForAutoRebalancing() {
if (FLAGS_auto_rebalancing_enabled &&
!FLAGS_raft_prepare_replacement_before_eviction) {
LOG(ERROR) << "if enabling auto-rebalancing, Kudu must be configured "
"with --raft_prepare_replacement_before_eviction";
return false;
}
return true;
}
GROUP_FLAG_VALIDATOR(auto_rebalancing_flags,
Validate343SchemeEnabledForAutoRebalancing);

// Check for the replication factor flags' sanity.
bool ValidateReplicationFactorFlags() {
if (FLAGS_min_num_replicas > FLAGS_max_num_replicas) {
LOG(ERROR) << Substitute(
"--min_num_replicas ($0) must not be greater than "
"--max_num_replicas ($1)",
FLAGS_min_num_replicas, FLAGS_max_num_replicas);
return false;
}
if (FLAGS_default_num_replicas > FLAGS_max_num_replicas) {
LOG(ERROR) << Substitute(
"--default_num_replicas ($0) must not be greater than "
"--max_num_replicas ($1)",
FLAGS_default_num_replicas, FLAGS_max_num_replicas);
return false;
}
if (FLAGS_default_num_replicas % 2 == 0 &&
!FLAGS_allow_unsafe_replication_factor) {
LOG(ERROR) << Substitute(
"--default_num_replicas ($0) must not be an even number since "
"--allow_unsafe_replication_factor is not set",
FLAGS_max_num_replicas);
return false;
}
if (FLAGS_min_num_replicas % 2 == 0 &&
!FLAGS_allow_unsafe_replication_factor) {
LOG(ERROR) << Substitute(
"--min_num_replicas ($0) must not be an even number since "
"--allow_unsafe_replication_factor is not set",
FLAGS_min_num_replicas);
return false;
}
if (FLAGS_max_num_replicas % 2 == 0 &&
!FLAGS_allow_unsafe_replication_factor) {
LOG(ERROR) << Substitute(
"--max_num_replicas ($0) must not be an even number since "
"--allow_unsafe_replication_factor is not set",
FLAGS_max_num_replicas);
return false;
}
return true;
}
GROUP_FLAG_VALIDATOR(replication_factor_flags,
ValidateReplicationFactorFlags);
} // anonymous namespace

////////////////////////////////////////////////////////////
// Table Loader
////////////////////////////////////////////////////////////

namespace kudu {
namespace master {

class TableLoader : public TableVisitor {
public:
explicit TableLoader(CatalogManager *catalog_manager)
Expand Down Expand Up @@ -1808,26 +1880,28 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
}

const auto num_replicas = req.num_replicas();
// Reject create table with even replication factors, unless master flag
// allow_unsafe_replication_factor is on.
if (num_replicas % 2 == 0 && !FLAGS_allow_unsafe_replication_factor) {
return SetupError(Status::InvalidArgument(
Substitute("illegal replication factor $0 (replication factor must be odd)", num_replicas)),
resp, MasterErrorPB::EVEN_REPLICATION_FACTOR);
}

if (num_replicas > FLAGS_max_num_replicas) {
return SetupError(Status::InvalidArgument(
Substitute("illegal replication factor $0 (max replication factor is $1)",
num_replicas, FLAGS_max_num_replicas)),
Substitute("illegal replication factor $0: maximum allowed replication "
"factor is $1 (controlled by --max_num_replicas)",
num_replicas, FLAGS_max_num_replicas)),
resp, MasterErrorPB::REPLICATION_FACTOR_TOO_HIGH);
}
if (num_replicas <= 0) {
if (num_replicas < FLAGS_min_num_replicas) {
return SetupError(Status::InvalidArgument(
Substitute("illegal replication factor $0 (replication factor must be positive)",
num_replicas, FLAGS_max_num_replicas)),
Substitute("illegal replication factor $0: minimum allowed replication "
"factor is $1 (controlled by --min_num_replicas)",
num_replicas, FLAGS_min_num_replicas)),
resp, MasterErrorPB::ILLEGAL_REPLICATION_FACTOR);
}
// Reject create table with even replication factors, unless master flag
// allow_unsafe_replication_factor is on.
if (num_replicas % 2 == 0 && !FLAGS_allow_unsafe_replication_factor) {
return SetupError(Status::InvalidArgument(
Substitute("illegal replication factor $0: replication factor must be odd",
num_replicas)),
resp, MasterErrorPB::EVEN_REPLICATION_FACTOR);
}

// Verify that the number of replicas isn't larger than the number of live tablet
// servers.
Expand Down
3 changes: 2 additions & 1 deletion src/kudu/master/master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ message MasterErrorPB {
// The number of replicas requested is even.
EVEN_REPLICATION_FACTOR = 10;

// The number of replicas requested is illegal (eg non-positive).
// The number of replicas requested is illegal, i.e. either non-positive
// or not allowed per system policies.
ILLEGAL_REPLICATION_FACTOR = 11;

// The callee detected that its replica management scheme is incompatible
Expand Down

0 comments on commit 6cb1548

Please sign in to comment.