From 960e599ad59a0f99498c1c230b2700a774872424 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Fri, 23 Sep 2022 09:56:40 -0700 Subject: [PATCH] config: make node ID optional --- src/v/cluster/cluster_discovery.cc | 6 +-- src/v/cluster/cluster_utils.cc | 9 +++- src/v/cluster/commands.h | 1 + src/v/cluster/config_frontend.cc | 2 +- src/v/cluster/config_manager.cc | 2 +- src/v/cluster/controller_backend.cc | 2 +- src/v/cluster/feature_manager.cc | 6 +-- src/v/cluster/members_frontend.cc | 2 +- src/v/cluster/tests/cluster_test_fixture.h | 40 +++++++++++------ src/v/cluster/tests/cluster_tests.cc | 44 +++++++++++++++++++ src/v/cluster/topic_table_probe.cc | 2 +- src/v/config/node_config.cc | 6 ++- src/v/config/node_config.h | 7 ++- src/v/coproc/reconciliation_backend.cc | 2 +- .../coproc/tests/partition_movement_tests.cc | 3 +- .../kafka/server/handlers/describe_configs.cc | 4 +- src/v/kafka/server/handlers/metadata.cc | 4 +- src/v/redpanda/admin_server.cc | 4 +- src/v/redpanda/application.cc | 2 +- src/v/redpanda/tests/fixture.h | 24 +++++++--- 20 files changed, 129 insertions(+), 43 deletions(-) diff --git a/src/v/cluster/cluster_discovery.cc b/src/v/cluster/cluster_discovery.cc index fb18ed7b8cb86..ac632f87e9796 100644 --- a/src/v/cluster/cluster_discovery.cc +++ b/src/v/cluster/cluster_discovery.cc @@ -32,9 +32,9 @@ cluster_discovery::cluster_discovery( ss::future cluster_discovery::determine_node_id() { const auto& configured_node_id = _node_config.node_id(); - if (configured_node_id == -1) { - clusterlog.info("Using configured node ID {}", configured_node_id); - co_return configured_node_id; + if (configured_node_id != std::nullopt) { + clusterlog.info("Using configured node ID {}", *configured_node_id); + co_return *configured_node_id; } // TODO: once is_cluster_founder() refers to all seeds, verify that all the // seeds' seed_servers lists match and assign node IDs based on the diff --git a/src/v/cluster/cluster_utils.cc b/src/v/cluster/cluster_utils.cc index f984773c2bcd8..4f8391af1011e 100644 --- a/src/v/cluster/cluster_utils.cc +++ b/src/v/cluster/cluster_utils.cc @@ -192,8 +192,15 @@ model::broker make_self_broker(const config::node_config& node_cfg) { // As for memory, if disk_gb is zero this is handled like a legacy broker. uint32_t disk_gb = space_info.capacity >> 30; + // If this node hasn't been configured with a node ID, use -1 to indicate + // that we don't yet know it yet. This shouldn't be used during the normal + // operation of a broker, and instead should be used to indicate a broker + // that needs to be assigned a node ID when it first starts up. + model::node_id node_id = node_cfg.node_id() == std::nullopt + ? model::node_id(-1) + : *node_cfg.node_id(); return model::broker( - model::node_id(node_cfg.node_id), + node_id, kafka_addr, rpc_addr, node_cfg.rack, diff --git a/src/v/cluster/commands.h b/src/v/cluster/commands.h index c019412ac23d7..a5d2a4f3674d0 100644 --- a/src/v/cluster/commands.h +++ b/src/v/cluster/commands.h @@ -11,6 +11,7 @@ #pragma once #include "bytes/iobuf_parser.h" +#include "cluster/logger.h" #include "cluster/simple_batch_builder.h" #include "cluster/types.h" #include "model/metadata.h" diff --git a/src/v/cluster/config_frontend.cc b/src/v/cluster/config_frontend.cc index de941291a4557..4ec8e0c7bcc44 100644 --- a/src/v/cluster/config_frontend.cc +++ b/src/v/cluster/config_frontend.cc @@ -28,7 +28,7 @@ config_frontend::config_frontend( , _leaders(leaders) , _features(features) , _as(as) - , _self(config::node().node_id()) {} + , _self(*config::node().node_id()) {} /** * RPC wrapper on do_patch, to dispatch to the controller leader diff --git a/src/v/cluster/config_manager.cc b/src/v/cluster/config_manager.cc index 306a5d000bed1..99e2824644c9c 100644 --- a/src/v/cluster/config_manager.cc +++ b/src/v/cluster/config_manager.cc @@ -55,7 +55,7 @@ config_manager::config_manager( ss::sharded& pl, ss::sharded& ft, ss::sharded& as) - : _self(config::node().node_id()) + : _self(*config::node().node_id()) , _frontend(cf) , _connection_cache(cc) , _leaders(pl) diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index 45356548302a4..43831a9a401a4 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -258,7 +258,7 @@ controller_backend::controller_backend( , _partition_leaders_table(leaders) , _topics_frontend(frontend) , _storage(storage) - , _self(model::node_id(config::node().node_id)) + , _self(*config::node().node_id()) , _data_directory(config::node().data_directory().as_sstring()) , _housekeeping_timer_interval( config::shard_local_cfg().controller_backend_housekeeping_interval_ms()) diff --git a/src/v/cluster/feature_manager.cc b/src/v/cluster/feature_manager.cc index 81c396c5bce68..a119635aacd9f 100644 --- a/src/v/cluster/feature_manager.cc +++ b/src/v/cluster/feature_manager.cc @@ -45,7 +45,7 @@ feature_manager::feature_manager( , _connection_cache(connection_cache) , _raft0_group(raft0_group) , _barrier_state( - config::node().node_id(), + *config::node().node_id(), members.local(), as.local(), _gate, @@ -109,7 +109,7 @@ ss::future<> feature_manager::start() { vlog( clusterlog.debug, "Controller leader notification term {}", term); - _am_controller_leader = leader_id == config::node().node_id(); + _am_controller_leader = leader_id == *config::node().node_id(); // This hook avoids the need for the controller leader to receive // its own health report to generate a call to update_node_version. @@ -128,7 +128,7 @@ ss::future<> feature_manager::start() { leader_id.value(), features::feature_table::get_latest_logical_version()); update_node_version( - config::node().node_id, + *config::node().node_id(), features::feature_table::get_latest_logical_version()); } }); diff --git a/src/v/cluster/members_frontend.cc b/src/v/cluster/members_frontend.cc index 5e3ec545d825d..be3714b7cb818 100644 --- a/src/v/cluster/members_frontend.cc +++ b/src/v/cluster/members_frontend.cc @@ -34,7 +34,7 @@ members_frontend::members_frontend( ss::sharded& leaders, ss::sharded& feature_table, ss::sharded& as) - : _self(config::node().node_id()) + : _self(*config::node().node_id()) , _node_op_timeout( config::shard_local_cfg().node_management_operation_timeout_ms) , _stm(stm) diff --git a/src/v/cluster/tests/cluster_test_fixture.h b/src/v/cluster/tests/cluster_test_fixture.h index 7c0fd22a71747..6ce9643307796 100644 --- a/src/v/cluster/tests/cluster_test_fixture.h +++ b/src/v/cluster/tests/cluster_test_fixture.h @@ -11,6 +11,7 @@ #pragma once #include "cluster/fwd.h" +#include "cluster/logger.h" #include "cluster/tests/utils.h" #include "config/seed_server.h" #include "model/metadata.h" @@ -72,7 +73,9 @@ class cluster_test_fixture { int16_t proxy_port, int16_t schema_reg_port, int16_t coproc_supervisor_port, - std::vector seeds) { + std::vector seeds, + configure_node_id use_node_id = configure_node_id::yes) { + cluster::clusterlog.info("AWONG emplacing"); _instances.emplace( node_id, std::make_unique( @@ -85,7 +88,8 @@ class cluster_test_fixture { seeds, ssx::sformat("{}.{}", _base_dir, node_id()), _sgroups, - false)); + false, + use_node_id)); } application* get_node_application(model::node_id id) { @@ -107,27 +111,37 @@ class cluster_test_fixture { application* create_node_application( model::node_id node_id, - int kafka_port = 9092, - int rpc_port = 11000, - int proxy_port = 8082, - int schema_reg_port = 8081, - int coproc_supervisor_port = 43189) { + int kafka_port_base = 9092, + int rpc_port_base = 11000, + int proxy_port_base = 8082, + int schema_reg_port_base = 8081, + int coproc_supervisor_port_base = 43189, + configure_node_id use_node_id = configure_node_id::yes) { + cluster::clusterlog.info("AWONG creating"); std::vector seeds = {}; if (node_id != 0) { seeds.push_back( {.addr = net::unresolved_address("127.0.0.1", 11000)}); } + cluster::clusterlog.info("AWONG adding"); add_node( node_id, - kafka_port + node_id(), - rpc_port + node_id(), - proxy_port + node_id(), - schema_reg_port + node_id(), - coproc_supervisor_port + node_id(), - std::move(seeds)); + kafka_port_base + node_id(), + rpc_port_base + node_id(), + proxy_port_base + node_id(), + schema_reg_port_base + node_id(), + coproc_supervisor_port_base + node_id(), + std::move(seeds), + use_node_id); return get_node_application(node_id); } + application* create_node_application( + model::node_id node_id, configure_node_id use_node_id) { + return create_node_application( + node_id, 9092, 11000, 8082, 8081, 43189, use_node_id); + } + void remove_node_application(model::node_id node_id) { _instances.erase(node_id); } diff --git a/src/v/cluster/tests/cluster_tests.cc b/src/v/cluster/tests/cluster_tests.cc index 731f2796da2c1..e3ff44e9b90ed 100644 --- a/src/v/cluster/tests/cluster_tests.cc +++ b/src/v/cluster/tests/cluster_tests.cc @@ -16,6 +16,7 @@ #include "test_utils/fixture.h" using namespace std::chrono_literals; // NOLINT +static ss::logger logger("cluster_tests"); FIXTURE_TEST(test_join_single_node, cluster_test_fixture) { model::node_id id{0}; @@ -45,3 +46,46 @@ FIXTURE_TEST(test_three_node_cluster, cluster_test_fixture) { wait_for_all_members(3s).get(); } + +FIXTURE_TEST(test_auto_assign_node_id, cluster_test_fixture) { + create_node_application(model::node_id{0}, configure_node_id::no); + BOOST_REQUIRE_EQUAL(0, *config::node().node_id()); + + create_node_application(model::node_id{1}, configure_node_id::no); + BOOST_REQUIRE_EQUAL(1, *config::node().node_id()); + + create_node_application(model::node_id{2}, configure_node_id::no); + BOOST_REQUIRE_EQUAL(2, *config::node().node_id()); + + wait_for_all_members(3s).get(); +} + +FIXTURE_TEST(test_auto_assign_non_seeds, cluster_test_fixture) { + create_node_application(model::node_id{0}); + BOOST_REQUIRE_EQUAL(0, *config::node().node_id()); + + create_node_application(model::node_id{1}, configure_node_id::no); + BOOST_REQUIRE_EQUAL(1, *config::node().node_id()); + + create_node_application(model::node_id{2}, configure_node_id::no); + BOOST_REQUIRE_EQUAL(2, *config::node().node_id()); + + wait_for_all_members(3s).get(); +} + +FIXTURE_TEST(test_auto_assign_with_explicit_node_id, cluster_test_fixture) { + create_node_application(model::node_id{0}); + BOOST_REQUIRE_EQUAL(0, *config::node().node_id()); + + // Explicitly assign node ID 2. Node ID assignment should assign around it. + create_node_application(model::node_id{2}); + BOOST_REQUIRE_EQUAL(2, *config::node().node_id()); + + create_node_application(model::node_id{1}, configure_node_id::no); + BOOST_REQUIRE_EQUAL(1, *config::node().node_id()); + + create_node_application(model::node_id{3}, configure_node_id::no); + BOOST_REQUIRE_EQUAL(3, *config::node().node_id()); + + wait_for_all_members(3s).get(); +} diff --git a/src/v/cluster/topic_table_probe.cc b/src/v/cluster/topic_table_probe.cc index 279ed516fbe2e..ac3169cb66759 100644 --- a/src/v/cluster/topic_table_probe.cc +++ b/src/v/cluster/topic_table_probe.cc @@ -19,7 +19,7 @@ namespace cluster { topic_table_probe::topic_table_probe(const topic_table& topic_table) : _topic_table(topic_table) - , _node_id(config::node().node_id()) { + , _node_id(*config::node().node_id()) { setup_metrics(); } diff --git a/src/v/config/node_config.cc b/src/v/config/node_config.cc index 11f26c3642def..9784ac2431736 100644 --- a/src/v/config/node_config.cc +++ b/src/v/config/node_config.cc @@ -29,8 +29,10 @@ node_config::node_config() noexcept , node_id( *this, "node_id", - "Unique id identifying a node in the cluster", - {.required = required::yes, .visibility = visibility::user}) + "Unique id identifying a node in the cluster. If missing, a unique id " + "will be assigned for this node when it joins the cluster", + {.visibility = visibility::user}, + std::nullopt) , rack( *this, "rack", diff --git a/src/v/config/node_config.h b/src/v/config/node_config.h index c85eaf2fc36b1..faf6b2d395989 100644 --- a/src/v/config/node_config.h +++ b/src/v/config/node_config.h @@ -26,7 +26,12 @@ struct node_config final : public config_store { public: property developer_mode; property data_directory; - property node_id; + + // NOTE: during the normal runtime of a cluster, it is safe to assume that + // the value of the node ID has been determined, and that there is a value + // set for this property. + property> node_id; + property> rack; property> seed_servers; diff --git a/src/v/coproc/reconciliation_backend.cc b/src/v/coproc/reconciliation_backend.cc index d6f8c34f6808d..473960bc083d9 100644 --- a/src/v/coproc/reconciliation_backend.cc +++ b/src/v/coproc/reconciliation_backend.cc @@ -79,7 +79,7 @@ reconciliation_backend::reconciliation_backend( ss::sharded& coproc_pm, ss::sharded& pacemaker, ss::sharded& sdb) noexcept - : _self(model::node_id(config::node().node_id)) + : _self(model::node_id(*config::node().node_id())) , _data_directory(config::node().data_directory().as_sstring()) , _topics(topics) , _shard_table(shard_table) diff --git a/src/v/coproc/tests/partition_movement_tests.cc b/src/v/coproc/tests/partition_movement_tests.cc index 0e0a880cdc4d9..cdb36b80a0a04 100644 --- a/src/v/coproc/tests/partition_movement_tests.cc +++ b/src/v/coproc/tests/partition_movement_tests.cc @@ -58,7 +58,8 @@ FIXTURE_TEST(test_move_source_topic, coproc_test_fixture) { const ss::shard_id next_shard = (*shard + 1) % ss::smp::count; info("Current target shard {} and next shard {}", *shard, next_shard); model::broker_shard bs{ - .node_id = model::node_id(config::node().node_id), .shard = next_shard}; + .node_id = model::node_id(*config::node().node_id()), + .shard = next_shard}; /// Move the input onto the new desired target auto& topics_fe = root_fixture()->app.controller->get_topics_frontend(); diff --git a/src/v/kafka/server/handlers/describe_configs.cc b/src/v/kafka/server/handlers/describe_configs.cc index ce74c64e85a09..c6c900c2f6913 100644 --- a/src/v/kafka/server/handlers/describe_configs.cc +++ b/src/v/kafka/server/handlers/describe_configs.cc @@ -294,12 +294,12 @@ static void report_broker_config( result.resource_name.data() + result.resource_name.size(), // NOLINT broker_id); if (res.ec == std::errc()) { - if (broker_id != config::node().node_id()) { + if (broker_id != *config::node().node_id()) { result.error_code = error_code::invalid_request; result.error_message = ssx::sformat( "Unexpected broker id {} expected {}", broker_id, - config::node().node_id()); + *config::node().node_id()); return; } } else { diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index f6dc64e04da51..8c0a0e76de22e 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -83,7 +83,7 @@ std::optional get_leader_term( const auto previous = md_cache.get_previous_leader_id(tp_ns, p_id); leader_term->leader = previous; - if (previous == config::node().node_id()) { + if (previous == *config::node().node_id()) { auto idx = fast_prng_source() % replicas.size(); leader_term->leader = replicas[idx]; } @@ -333,7 +333,7 @@ guess_peer_listener(request_context& ctx, cluster::broker_ptr broker) { // is not yet consistent with what's in members_table, // because a node configuration update didn't propagate // via raft0 yet - if (broker->id() == config::node().node_id()) { + if (broker->id() == *config::node().node_id()) { return l; } } diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index 00271a3aac50e..93671ba221773 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -434,7 +434,7 @@ ss::future admin_server::redirect_to_leader( ss::httpd::reply::status_type::service_unavailable); } - if (leader_id_opt.value() == config::node().node_id()) { + if (leader_id_opt.value() == *config::node().node_id()) { vlog( logger.info, "Can't redirect to leader from leader node ({})", @@ -555,7 +555,7 @@ bool need_redirect_to_leader( ss::httpd::reply::status_type::service_unavailable); } - return leader_id_opt.value() != config::node().node_id(); + return leader_id_opt.value() != *config::node().node_id(); } model::node_id parse_broker_id(const ss::httpd::request& req) { diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 924eabd477848..a652572cfe66a 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1553,7 +1553,7 @@ void application::start_kafka(::stop_signal& app_signal) { vlog(_log.info, "Waiting for cluster membership"); controller->get_members_table() .local() - .await_membership(config::node().node_id(), app_signal.abort_source()) + .await_membership(*config::node().node_id(), app_signal.abort_source()) .get(); _kafka_server.invoke_on_all(&net::server::start).get(); vlog( diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index fd8896271d881..d93ce6da4ea95 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -12,6 +12,7 @@ #pragma once #include "cluster/cluster_utils.h" #include "cluster/controller.h" +#include "cluster/logger.h" #include "cluster/members_table.h" #include "cluster/metadata_cache.h" #include "cluster/partition_leaders_table.h" @@ -48,6 +49,8 @@ #include +using configure_node_id = ss::bool_class; + class redpanda_thread_fixture { public: static constexpr const char* rack_name = "i-am-rack"; @@ -62,25 +65,30 @@ class redpanda_thread_fixture { std::vector seed_servers, ss::sstring base_dir, std::optional sch_groups, - bool remove_on_shutdown) + bool remove_on_shutdown, + configure_node_id use_node_id = configure_node_id::yes) : app(ssx::sformat("redpanda-{}", node_id())) , proxy_port(proxy_port) , schema_reg_port(schema_reg_port) , data_dir(std::move(base_dir)) , remove_on_shutdown(remove_on_shutdown) , app_signal(std::make_unique<::stop_signal>()) { + cluster::clusterlog.info("AWONG configuring"); configure( node_id, kafka_port, rpc_port, coproc_supervisor_port, - std::move(seed_servers)); + std::move(seed_servers), + use_node_id); + cluster::clusterlog.info("AWONG initializing"); app.initialize( proxy_config(proxy_port), proxy_client_config(kafka_port), schema_reg_config(schema_reg_port), proxy_client_config(kafka_port), sch_groups); + cluster::clusterlog.info("AWONG checking env"); app.check_environment(); app.wire_up_and_start(*app_signal); @@ -156,14 +164,16 @@ class redpanda_thread_fixture { int32_t kafka_port, int32_t rpc_port, int32_t coproc_supervisor_port, - std::vector seed_servers) { + std::vector seed_servers, + configure_node_id use_node_id) { auto base_path = std::filesystem::path(data_dir); ss::smp::invoke_on_all([node_id, kafka_port, rpc_port, coproc_supervisor_port, seed_servers = std::move(seed_servers), - base_path]() mutable { + base_path, + use_node_id]() mutable { auto& config = config::shard_local_cfg(); config.get("enable_pid_file").set_value(false); @@ -176,9 +186,11 @@ class redpanda_thread_fixture { node_config.get("admin").set_value( std::vector()); node_config.get("developer_mode").set_value(true); - node_config.get("node_id").set_value(node_id); + node_config.get("node_id").set_value( + use_node_id ? std::make_optional(node_id) + : std::optional(std::nullopt)); node_config.get("rack").set_value( - std::optional(model::rack_id(rack_name))); + std::make_optional(model::rack_id(rack_name))); node_config.get("seed_servers").set_value(seed_servers); node_config.get("rpc_server") .set_value(net::unresolved_address("127.0.0.1", rpc_port));