From 6cb15486cdbeacb722e69b81d563b6f05d840650 Mon Sep 17 00:00:00 2001 From: Alexey Serbin Date: Tue, 13 Jul 2021 13:57:12 -0700 Subject: [PATCH] [catalog_manager] introduce --min_num_replicas flag This patch introduces a flag for kudu-master to enforce the minimum number of replicas for newly created tables in a cluster. Unless overridden, --min_num_replicas is set to 1 by default. For example, setting --min_num_replicas=3 enforces every new table to have at least 3 replicas for each of its tablets, so there cannot be a data loss when a single tablet server in the cluster fails irrecoverably. I also added validators for the related flags to keep things more consistent and report misconfiguration as early as possible. This patch contains a few test scenarios to cover the newly introduced functionality. Change-Id: I86191fcdc1b4ed6670f33ba7176d28dbd1df541f Reviewed-on: http://gerrit.cloudera.org:8080/17684 Tested-by: Alexey Serbin Reviewed-by: Mahesh Reddy Reviewed-by: Attila Bukor --- src/kudu/client/client-test.cc | 140 ++++++++++++++++++++++++----- src/kudu/master/catalog_manager.cc | 140 ++++++++++++++++++++++------- src/kudu/master/master.proto | 3 +- 3 files changed, 226 insertions(+), 57 deletions(-) diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index 51bf76b21b..a0f3649636 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -156,7 +156,9 @@ DECLARE_int32(log_inject_latency_ms_stddev); DECLARE_int32(master_inject_latency_on_tablet_lookups_ms); DECLARE_int32(max_column_comment_length); DECLARE_int32(max_create_tablets_per_ts); +DECLARE_int32(max_num_replicas); DECLARE_int32(max_table_comment_length); +DECLARE_int32(min_num_replicas); DECLARE_int32(raft_heartbeat_interval_ms); DECLARE_int32(scanner_batch_size_rows); DECLARE_int32(scanner_gc_check_interval_us); @@ -5349,29 +5351,6 @@ TEST_F(ClientTest, TestCreateTableWithTooManyTablets) { "maximum permitted at creation time (3)"); } -// Tests for too many replicas, too few replicas, even replica count, etc. -TEST_F(ClientTest, TestCreateTableWithBadNumReplicas) { - const vector> cases = { - {3, "not enough live tablet servers to create a table with the requested " - "replication factor 3; 1 tablet servers are alive"}, - {2, "illegal replication factor 2 (replication factor must be odd)"}, - {-1, "illegal replication factor -1 (replication factor must be positive)"}, - {11, "illegal replication factor 11 (max replication factor is 7)"} - }; - - for (const auto& c : cases) { - SCOPED_TRACE(Substitute("num_replicas=$0", c.first)); - unique_ptr table_creator(client_->NewTableCreator()); - Status s = table_creator->table_name("foobar") - .schema(&schema_) - .set_range_partition_columns({ "key" }) - .num_replicas(c.first) - .Create(); - EXPECT_TRUE(s.IsInvalidArgument()); - ASSERT_STR_CONTAINS(s.ToString(), c.second); - } -} - TEST_F(ClientTest, TestCreateTableWithInvalidEncodings) { unique_ptr table_creator(client_->NewTableCreator()); KuduSchema schema; @@ -8527,5 +8506,120 @@ TEST_F(ClientTest, WriteWhileRestartingMultipleTabletServers) { } } +class ReplicationFactorLimitsTest : public ClientTest { + public: + static constexpr const char* const kTableName = "replication_limits"; + + void SetUp() override { + // Reduce the TS<->Master heartbeat interval to speed up testing. + FLAGS_heartbeat_interval_ms = 10; + + // Set RF-related flags. + FLAGS_min_num_replicas = 3; + FLAGS_max_num_replicas = 5; + + KuduTest::SetUp(); + + // Start minicluster and wait for tablet servers to connect to master. + InternalMiniClusterOptions options; + options.num_tablet_servers = 7; + cluster_.reset(new InternalMiniCluster(env_, std::move(options))); + ASSERT_OK(cluster_->StartSync()); + ASSERT_OK(cluster_->CreateClient(nullptr, &client_)); + } +}; + +TEST_F(ReplicationFactorLimitsTest, MinReplicationFactor) { + // Creating table with number of replicas equal to --min_num_replicas should + // succeed. + { + unique_ptr table_creator(client_->NewTableCreator()); + ASSERT_OK(table_creator->table_name(kTableName) + .schema(&schema_) + .add_hash_partitions({ "key" }, 2) + .num_replicas(3) + .Create()); + } + + // An attempt to create a table with replication factor less than + // the specified by --min_num_replicas should fail. + for (auto rf : { -1, 0, 1, 2 }) { + SCOPED_TRACE(Substitute("replication factor $0", rf)); + unique_ptr table_creator(client_->NewTableCreator()); + const auto s = table_creator->table_name(kTableName) + .schema(&schema_) + .add_hash_partitions({ "key" }, 2) + .num_replicas(rf) + .Create(); + ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "illegal replication factor"); + ASSERT_STR_CONTAINS(s.ToString(), "minimum allowed replication factor is 3"); + } + + // Test a couple of other cases: even number of replicas when in [min, max] + // range and an attempt to create a table with number of replicas more than + // the number of tablet servers currently alive in the cluster. + { + FLAGS_min_num_replicas = 1; + const vector> cases = { + {2, "illegal replication factor 2: replication factor must be odd"}, + {3, "not enough live tablet servers to create a table with the requested " + "replication factor 3; 1 tablet servers are alive"}, + }; + + for (auto i = 1; i < cluster_->num_tablet_servers(); ++i) { + cluster_->mini_tablet_server(i)->Shutdown(); + } + // Restart masters so only the alive tablet servers: that's a faster way + // to update tablet servers' liveliness status in the master's registry. + for (auto i = 0; i < cluster_->num_masters(); ++i) { + cluster_->mini_master(i)->Shutdown(); + ASSERT_OK(cluster_->mini_master(i)->Restart()); + } + + SleepFor(MonoDelta::FromMilliseconds(3 * FLAGS_heartbeat_interval_ms)); + + for (const auto& c : cases) { + SCOPED_TRACE(Substitute("num_replicas=$0", c.first)); + unique_ptr table_creator(client_->NewTableCreator()); + Status s = table_creator->table_name("foobar") + .schema(&schema_) + .set_range_partition_columns({ "key" }) + .num_replicas(c.first) + .Create(); + ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), c.second); + } + } +} + +TEST_F(ReplicationFactorLimitsTest, MaxReplicationFactor) { + // Creating table with number of replicas equal to --max_num_replicas should + // succeed. + { + unique_ptr table_creator(client_->NewTableCreator()); + ASSERT_OK(table_creator->table_name(kTableName) + .schema(&schema_) + .add_hash_partitions({ "key" }, 2) + .num_replicas(5) + .Create()); + } + + // An attempt to create a table with replication factor greater than + // the specified by --max_num_replicas should fail. + for (auto rf : { 6, 7 }) { + SCOPED_TRACE(Substitute("replication factor $0", rf)); + unique_ptr table_creator(client_->NewTableCreator()); + const auto s = table_creator->table_name(kTableName) + .schema(&schema_) + .add_hash_partitions({ "key" }, 2) + .num_replicas(rf) + .Create(); + ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "illegal replication factor"); + ASSERT_STR_CONTAINS(s.ToString(), "maximum allowed replication factor is 5"); + } +} + } // namespace client } // namespace kudu diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index a922d8d94a..174a483e31 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -173,11 +173,23 @@ TAG_FLAG(unresponsive_ts_rpc_timeout_ms, advanced); DEFINE_int32(default_num_replicas, 3, "Default number of replicas for tables that do not have the num_replicas set."); TAG_FLAG(default_num_replicas, advanced); +TAG_FLAG(default_num_replicas, runtime); DEFINE_int32(max_num_replicas, 7, "Maximum number of replicas that may be specified for a table."); // Tag as unsafe since we have done very limited testing of higher than 5 replicas. TAG_FLAG(max_num_replicas, unsafe); +TAG_FLAG(max_num_replicas, runtime); + +DEFINE_int32(min_num_replicas, 1, + "Minimum number of replicas that may be specified when creating " + "a table: this is to enforce the minimum replication factor for " + "tables created in a Kudu cluster. For example, setting this flag " + "to 3 enforces every new table to have at least 3 replicas for " + "each of its tablets, so there cannot be a data loss when a " + "single tablet server fails irrecoverably."); +TAG_FLAG(min_num_replicas, advanced); +TAG_FLAG(min_num_replicas, runtime); DEFINE_int32(max_num_columns, 300, "Maximum number of columns that may be in a table."); @@ -210,6 +222,7 @@ TAG_FLAG(max_identifier_length, unsafe); DEFINE_bool(allow_unsafe_replication_factor, false, "Allow creating tables with even replication factor."); TAG_FLAG(allow_unsafe_replication_factor, unsafe); +TAG_FLAG(allow_unsafe_replication_factor, runtime); DEFINE_int32(catalog_manager_bg_task_wait_ms, 1000, "Amount of time the catalog manager background task thread waits " @@ -360,24 +373,9 @@ TAG_FLAG(table_write_limit_ratio, experimental); DECLARE_bool(raft_prepare_replacement_before_eviction); DECLARE_int64(tsk_rotation_seconds); - -METRIC_DEFINE_entity(table); - DECLARE_string(ranger_config_path); -// Validates that if auto-rebalancing is enabled, the cluster uses 3-4-3 replication -// (the --raft_prepare_replacement_before_eviction flag must be set to true). -static bool Validate343SchemeEnabledForAutoRebalancing() { - if (FLAGS_auto_rebalancing_enabled && - !FLAGS_raft_prepare_replacement_before_eviction) { - LOG(ERROR) << "If enabling auto-rebalancing, Kudu must be configured" - " with --raft_prepare_replacement_before_eviction."; - return false; - } - return true; -} -GROUP_FLAG_VALIDATOR(auto_rebalancing_flags, - Validate343SchemeEnabledForAutoRebalancing); +METRIC_DEFINE_entity(table); using base::subtle::NoBarrier_CompareAndSwap; using base::subtle::NoBarrier_Load; @@ -422,10 +420,9 @@ using std::unordered_set; using std::vector; using strings::Substitute; -namespace kudu { -namespace master { +namespace { -static bool ValidateTableWriteLimitRatio(const char* flagname, double value) { +bool ValidateTableWriteLimitRatio(const char* flagname, double value) { if (value > 1.0) { LOG(ERROR) << Substitute("$0 must be less than or equal to 1.0, value $1 is invalid.", flagname, value); @@ -439,7 +436,7 @@ static bool ValidateTableWriteLimitRatio(const char* flagname, double value) { } DEFINE_validator(table_write_limit_ratio, &ValidateTableWriteLimitRatio); -static bool ValidateTableLimit(const char* flag, int64_t limit) { +bool ValidateTableLimit(const char* flag, int64_t limit) { if (limit != -1 && limit < 0) { LOG(ERROR) << Substitute("$0 must be greater than or equal to -1, " "$1 is invalid", flag, limit); @@ -449,10 +446,85 @@ static bool ValidateTableLimit(const char* flag, int64_t limit) { } DEFINE_validator(table_disk_size_limit, &ValidateTableLimit); DEFINE_validator(table_row_count_limit, &ValidateTableLimit); + +bool ValidateMinNumReplicas(const char* flagname, int value) { + if (value < 1) { + LOG(ERROR) << Substitute( + "$0: invalid value for flag $1; must be at least 1", value, flagname); + return false; + } + return true; +} +DEFINE_validator(min_num_replicas, &ValidateMinNumReplicas); + +// Validate that if the auto-rebalancing is enabled, the cluster uses the 3-4-3 +// replication scheme: the --raft_prepare_replacement_before_eviction flag +// must be set to 'true'. +bool Validate343SchemeEnabledForAutoRebalancing() { + if (FLAGS_auto_rebalancing_enabled && + !FLAGS_raft_prepare_replacement_before_eviction) { + LOG(ERROR) << "if enabling auto-rebalancing, Kudu must be configured " + "with --raft_prepare_replacement_before_eviction"; + return false; + } + return true; +} +GROUP_FLAG_VALIDATOR(auto_rebalancing_flags, + Validate343SchemeEnabledForAutoRebalancing); + +// Check for the replication factor flags' sanity. +bool ValidateReplicationFactorFlags() { + if (FLAGS_min_num_replicas > FLAGS_max_num_replicas) { + LOG(ERROR) << Substitute( + "--min_num_replicas ($0) must not be greater than " + "--max_num_replicas ($1)", + FLAGS_min_num_replicas, FLAGS_max_num_replicas); + return false; + } + if (FLAGS_default_num_replicas > FLAGS_max_num_replicas) { + LOG(ERROR) << Substitute( + "--default_num_replicas ($0) must not be greater than " + "--max_num_replicas ($1)", + FLAGS_default_num_replicas, FLAGS_max_num_replicas); + return false; + } + if (FLAGS_default_num_replicas % 2 == 0 && + !FLAGS_allow_unsafe_replication_factor) { + LOG(ERROR) << Substitute( + "--default_num_replicas ($0) must not be an even number since " + "--allow_unsafe_replication_factor is not set", + FLAGS_max_num_replicas); + return false; + } + if (FLAGS_min_num_replicas % 2 == 0 && + !FLAGS_allow_unsafe_replication_factor) { + LOG(ERROR) << Substitute( + "--min_num_replicas ($0) must not be an even number since " + "--allow_unsafe_replication_factor is not set", + FLAGS_min_num_replicas); + return false; + } + if (FLAGS_max_num_replicas % 2 == 0 && + !FLAGS_allow_unsafe_replication_factor) { + LOG(ERROR) << Substitute( + "--max_num_replicas ($0) must not be an even number since " + "--allow_unsafe_replication_factor is not set", + FLAGS_max_num_replicas); + return false; + } + return true; +} +GROUP_FLAG_VALIDATOR(replication_factor_flags, + ValidateReplicationFactorFlags); +} // anonymous namespace + //////////////////////////////////////////////////////////// // Table Loader //////////////////////////////////////////////////////////// +namespace kudu { +namespace master { + class TableLoader : public TableVisitor { public: explicit TableLoader(CatalogManager *catalog_manager) @@ -1808,26 +1880,28 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req, } const auto num_replicas = req.num_replicas(); - // Reject create table with even replication factors, unless master flag - // allow_unsafe_replication_factor is on. - if (num_replicas % 2 == 0 && !FLAGS_allow_unsafe_replication_factor) { - return SetupError(Status::InvalidArgument( - Substitute("illegal replication factor $0 (replication factor must be odd)", num_replicas)), - resp, MasterErrorPB::EVEN_REPLICATION_FACTOR); - } - if (num_replicas > FLAGS_max_num_replicas) { return SetupError(Status::InvalidArgument( - Substitute("illegal replication factor $0 (max replication factor is $1)", - num_replicas, FLAGS_max_num_replicas)), + Substitute("illegal replication factor $0: maximum allowed replication " + "factor is $1 (controlled by --max_num_replicas)", + num_replicas, FLAGS_max_num_replicas)), resp, MasterErrorPB::REPLICATION_FACTOR_TOO_HIGH); } - if (num_replicas <= 0) { + if (num_replicas < FLAGS_min_num_replicas) { return SetupError(Status::InvalidArgument( - Substitute("illegal replication factor $0 (replication factor must be positive)", - num_replicas, FLAGS_max_num_replicas)), + Substitute("illegal replication factor $0: minimum allowed replication " + "factor is $1 (controlled by --min_num_replicas)", + num_replicas, FLAGS_min_num_replicas)), resp, MasterErrorPB::ILLEGAL_REPLICATION_FACTOR); } + // Reject create table with even replication factors, unless master flag + // allow_unsafe_replication_factor is on. + if (num_replicas % 2 == 0 && !FLAGS_allow_unsafe_replication_factor) { + return SetupError(Status::InvalidArgument( + Substitute("illegal replication factor $0: replication factor must be odd", + num_replicas)), + resp, MasterErrorPB::EVEN_REPLICATION_FACTOR); + } // Verify that the number of replicas isn't larger than the number of live tablet // servers. diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto index 850cba942d..b5011ca519 100644 --- a/src/kudu/master/master.proto +++ b/src/kudu/master/master.proto @@ -73,7 +73,8 @@ message MasterErrorPB { // The number of replicas requested is even. EVEN_REPLICATION_FACTOR = 10; - // The number of replicas requested is illegal (eg non-positive). + // The number of replicas requested is illegal, i.e. either non-positive + // or not allowed per system policies. ILLEGAL_REPLICATION_FACTOR = 11; // The callee detected that its replica management scheme is incompatible