From 26934e34f8e9942e365b9fdc470e714c9f38330d Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Fri, 23 Sep 2022 09:54:14 -0700 Subject: [PATCH 01/17] storage: add node_uuid to storage API This will be used when we first start a node that has no node UUID. I exposed this as storage because the UUID uniquely identifies the contents of the kv-store / data directory. --- src/v/model/fundamental.h | 3 +++ src/v/storage/api.h | 12 ++++++++++++ 2 files changed, 15 insertions(+) diff --git a/src/v/model/fundamental.h b/src/v/model/fundamental.h index 5c40b7b7e38d..7fb847a4ef67 100644 --- a/src/v/model/fundamental.h +++ b/src/v/model/fundamental.h @@ -16,6 +16,7 @@ #include "serde/serde.h" #include "ssx/sformat.h" #include "utils/named_type.h" +#include "utils/uuid.h" #include "vassert.h" #include @@ -43,6 +44,8 @@ using bucket_name = named_type; namespace model { +using node_uuid = named_type; + // Named after Kafka cleanup.policy topic property enum class cleanup_policy_bitflags : uint8_t { none = 0, diff --git a/src/v/storage/api.h b/src/v/storage/api.h index cd77fbfefe09..93b40ffa2fd7 100644 --- a/src/v/storage/api.h +++ b/src/v/storage/api.h @@ -11,6 +11,7 @@ #pragma once +#include "model/fundamental.h" #include "seastarx.h" #include "storage/kvstore.h" #include "storage/log_manager.h" @@ -75,6 +76,12 @@ class api { return f; } + void set_node_uuid(const model::node_uuid& node_uuid) { + _node_uuid = node_uuid; + } + + model::node_uuid node_uuid() const { return _node_uuid; } + kvstore& kvs() { return *_kvstore; } log_manager& log_mgr() { return *_log_mgr; } storage_resources& resources() { return _resources; } @@ -87,6 +94,11 @@ class api { std::unique_ptr _kvstore; std::unique_ptr _log_mgr; + + // UUID that uniquely identifies the contents of this node's data + // directory. Should be generated once upon first starting up and + // immediately persisted into `_kvstore`. + model::node_uuid _node_uuid; }; } // namespace storage From 63525deea2ddf49917fb0b9e1453ea0ea560b1b2 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Fri, 23 Sep 2022 10:06:48 -0700 Subject: [PATCH 02/17] cluster: encapsulate cluster discovery This encapsulates some pieces of cluster discovery into its own class. I added some descriptions on how each method is expected to be implemented, but I added implementations that match the existing behavior. --- src/v/cluster/CMakeLists.txt | 1 + src/v/cluster/cluster_discovery.cc | 48 +++++++++++++++++++++ src/v/cluster/cluster_discovery.h | 69 ++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+) create mode 100644 src/v/cluster/cluster_discovery.cc create mode 100644 src/v/cluster/cluster_discovery.h diff --git a/src/v/cluster/CMakeLists.txt b/src/v/cluster/CMakeLists.txt index 0f42e4b4d486..dff1da6ee52a 100644 --- a/src/v/cluster/CMakeLists.txt +++ b/src/v/cluster/CMakeLists.txt @@ -85,6 +85,7 @@ v_cc_library( security_frontend.cc data_policy_manager.cc data_policy_frontend.cc + cluster_discovery.cc controller_api.cc members_frontend.cc members_backend.cc diff --git a/src/v/cluster/cluster_discovery.cc b/src/v/cluster/cluster_discovery.cc new file mode 100644 index 000000000000..4aa4fec3e4a7 --- /dev/null +++ b/src/v/cluster/cluster_discovery.cc @@ -0,0 +1,48 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cluster/cluster_discovery.h" + +#include "cluster/cluster_utils.h" +#include "cluster/logger.h" +#include "config/node_config.h" +#include "model/fundamental.h" +#include "model/metadata.h" +#include "seastarx.h" + +#include + +using model::broker; +using model::node_id; +using std::vector; + +namespace cluster { + +cluster_discovery::cluster_discovery(const model::node_uuid& node_uuid) + : _node_uuid(node_uuid) {} + +ss::future cluster_discovery::determine_node_id() { + co_return config::node().node_id(); +} + +vector cluster_discovery::initial_raft0_brokers() const { + // If configured as the root node, we'll want to start the cluster with + // just this node as the initial seed. + if (is_cluster_founder()) { + // TODO: we should only return non-empty seed list if our log is empty. + return {make_self_broker(config::node())}; + } + return {}; +} + +bool cluster_discovery::is_cluster_founder() const { + return config::node().seed_servers().empty(); +} + +} // namespace cluster diff --git a/src/v/cluster/cluster_discovery.h b/src/v/cluster/cluster_discovery.h new file mode 100644 index 000000000000..ab1b8a1be5ce --- /dev/null +++ b/src/v/cluster/cluster_discovery.h @@ -0,0 +1,69 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "model/fundamental.h" +#include "model/metadata.h" +#include "seastarx.h" + +#include + +#include +#include + +namespace cluster { + +// Provides metadata pertaining to initial cluster discovery. +class cluster_discovery { +public: + explicit cluster_discovery(const model::node_uuid& node_uuid); + + // Returns this node's node ID. + // + // TODO: implement the below behavior. + // + // Determines what the node ID for this node should be. Once called, we can + // proceed with initializing anything that depends on node ID. + // + // On an empty seed server, this sends requests to all other seed servers + // to determine if there is a valid assigned of node IDs for the seeds. + // + // On a non-seed server with no node ID specified via config, this sends a + // request to the controllers to register this node's UUID and assign it a + // node ID. + ss::future determine_node_id(); + + // If configured as the root, return this broker as the sole initial Raft0 + // broker. + // + // TODO: implement the below behavior. + // + // Returns brokers to be used to form a Raft group for a new cluster. + // + // If this node is a seed server, returns all seed servers, assuming seeds + // are configured with identical seed servers. + // + // If this node is not a seed server returns an empty list. + std::vector initial_raft0_brokers() const; + +private: + // Returns whether this node is the root node. + // + // TODO: implement the below behavior. + // + // Returns true if the local node is a founding member of the cluster, as + // indicated by either us having an empty seed server (we are the root node + // in a legacy config) or our node UUID matching one of those returned by + // the seed servers. + bool is_cluster_founder() const; + + const model::node_uuid _node_uuid; +}; + +} // namespace cluster From 1c92fad9cdd2a4382fb16f3233e7063781a58055 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Fri, 23 Sep 2022 09:57:32 -0700 Subject: [PATCH 03/17] cluster: introduce bootstrap service This adds the skeleton of an RPC service that will be used for cluster bootstrap. It includes an initial `cluster_bootstrap_info` RPC type that is currently empty. Introducing as a standalone commit to parallelize some workstreams. --- src/v/cluster/CMakeLists.txt | 8 +++++ src/v/cluster/bootstrap.json | 15 ++++++++++ src/v/cluster/bootstrap_service.cc | 24 +++++++++++++++ src/v/cluster/bootstrap_service.h | 29 ++++++++++++++++++ src/v/cluster/bootstrap_types.h | 48 ++++++++++++++++++++++++++++++ 5 files changed, 124 insertions(+) create mode 100644 src/v/cluster/bootstrap.json create mode 100644 src/v/cluster/bootstrap_service.cc create mode 100644 src/v/cluster/bootstrap_service.h create mode 100644 src/v/cluster/bootstrap_types.h diff --git a/src/v/cluster/CMakeLists.txt b/src/v/cluster/CMakeLists.txt index dff1da6ee52a..c3b4e6d0197d 100644 --- a/src/v/cluster/CMakeLists.txt +++ b/src/v/cluster/CMakeLists.txt @@ -1,5 +1,11 @@ find_package(Roaring REQUIRED) include(rpcgen) +rpcgen( + TARGET bootstrap_rpc + IN_FILE ${CMAKE_CURRENT_SOURCE_DIR}/bootstrap.json + OUT_FILE ${CMAKE_CURRENT_BINARY_DIR}/cluster_bootstrap_service.h + INCLUDES ${CMAKE_BINARY_DIR}/src/v +) rpcgen( TARGET controller_rpc IN_FILE ${CMAKE_CURRENT_SOURCE_DIR}/controller.json @@ -114,8 +120,10 @@ v_cc_library( partition_balancer_rpc_handler.cc node_status_backend.cc node_status_rpc_handler.cc + bootstrap_service.cc DEPS Seastar::seastar + bootstrap_rpc controller_rpc metadata_rpc id_allocator_rpc diff --git a/src/v/cluster/bootstrap.json b/src/v/cluster/bootstrap.json new file mode 100644 index 000000000000..6011b71e0f0a --- /dev/null +++ b/src/v/cluster/bootstrap.json @@ -0,0 +1,15 @@ +{ + "namespace": "cluster", + "service_name": "cluster_bootstrap", + "includes": [ + "cluster/bootstrap_types.h" + ], + "methods": [ + { + "name": "cluster_bootstrap_info", + "input_type": "cluster_bootstrap_info_request", + "output_type": "cluster_bootstrap_info_reply" + } + ] +} + diff --git a/src/v/cluster/bootstrap_service.cc b/src/v/cluster/bootstrap_service.cc new file mode 100644 index 000000000000..a78e2e368934 --- /dev/null +++ b/src/v/cluster/bootstrap_service.cc @@ -0,0 +1,24 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cluster/bootstrap_service.h" + +#include "cluster/bootstrap_types.h" + +namespace cluster { + +ss::future +bootstrap_service::cluster_bootstrap_info( + cluster_bootstrap_info_request&&, rpc::streaming_context&) { + cluster_bootstrap_info_reply r{}; + // TODO: add fields! + co_return r; +} + +} // namespace cluster diff --git a/src/v/cluster/bootstrap_service.h b/src/v/cluster/bootstrap_service.h new file mode 100644 index 000000000000..ebb9a27c1f72 --- /dev/null +++ b/src/v/cluster/bootstrap_service.h @@ -0,0 +1,29 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "cluster/bootstrap_types.h" +#include "cluster/cluster_bootstrap_service.h" + +#include + +namespace cluster { + +// RPC service to be used when initialially bootstrapping a cluster. +// TODO: talk about how it's a slim service with few dependencies. +class bootstrap_service : public cluster_bootstrap_service { +public: + bootstrap_service(ss::scheduling_group sg, ss::smp_service_group ssg) + : cluster_bootstrap_service(sg, ssg) {} + + ss::future cluster_bootstrap_info( + cluster_bootstrap_info_request&&, rpc::streaming_context&) override; +}; + +} // namespace cluster diff --git a/src/v/cluster/bootstrap_types.h b/src/v/cluster/bootstrap_types.h new file mode 100644 index 000000000000..39278e73486d --- /dev/null +++ b/src/v/cluster/bootstrap_types.h @@ -0,0 +1,48 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "model/fundamental.h" +#include "serde/serde.h" + +#include + +#include + +namespace cluster { + +struct cluster_bootstrap_info_request + : serde::envelope> { + using rpc_adl_exempt = std::true_type; + + friend std::ostream& + operator<<(std::ostream& o, const cluster_bootstrap_info_request&) { + fmt::print(o, "{{}}"); + return o; + } + + auto serde_fields() { return std::tie(); } +}; + +struct cluster_bootstrap_info_reply + : serde::envelope> { + using rpc_adl_exempt = std::true_type; + + // TODO: add fields! + + auto serde_fields() { return std::tie(); } + + friend std::ostream& + operator<<(std::ostream& o, const cluster_bootstrap_info_reply&) { + fmt::print(o, "{{}}"); + return o; + } +}; + +} // namespace cluster From 575b476e297fffb6e61e8706bb1325ce5630cba3 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Fri, 7 Oct 2022 09:27:54 -0700 Subject: [PATCH 04/17] redpanda: plumb cluster discovery into application This plumbs the cluster discovery module into `redpanda::application` as one of the first subsystems initialized during startup. Cluster discovery will have an initial dependency on the storage subsystem, to get node UUID, cluster UUID, etc., and on the RPC subsystem, to serve the RPCs required for the initial bootstrapping sequence. The rest of the subsystems will have a dependency on cluster discovery, which, once automatic node ID assignment is supported, will be what determines the node ID. To that end, this commit orders the startup of the storage and RPC subsystems up front, followed by cluster discovery, followed by the rest of the subsystems. This commit only plumbs cluster discovery infrastructure. The cluster discovery sequence as a whole is unchanged. --- src/v/cluster/controller.cc | 8 +- src/v/cluster/controller.h | 2 +- src/v/redpanda/application.cc | 354 +++++++++++++++++++--------------- src/v/redpanda/application.h | 26 ++- 4 files changed, 218 insertions(+), 172 deletions(-) diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index ef89b50259ad..ef053e45dc4f 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -98,12 +98,8 @@ ss::future<> controller::wire_up() { .then([this] { _probe.start(); }); } -ss::future<> controller::start() { - std::vector initial_raft0_brokers; - if (config::node().seed_servers().empty()) { - initial_raft0_brokers.push_back( - cluster::make_self_broker(config::node())); - } +ss::future<> +controller::start(std::vector initial_raft0_brokers) { return create_raft0( _partition_manager, _shard_table, diff --git a/src/v/cluster/controller.h b/src/v/cluster/controller.h index 2d623a48152e..a73ca24c627b 100644 --- a/src/v/cluster/controller.h +++ b/src/v/cluster/controller.h @@ -118,7 +118,7 @@ class controller { ss::future<> wire_up(); - ss::future<> start(); + ss::future<> start(std::vector); // prevents controller from accepting new requests ss::future<> shutdown_input(); ss::future<> stop(); diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index fe884facdaa6..fd01e07a44f1 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -16,6 +16,8 @@ #include "cloud_storage/cache_service.h" #include "cloud_storage/partition_recovery_manager.h" #include "cloud_storage/remote.h" +#include "cluster/bootstrap_service.h" +#include "cluster/cluster_discovery.h" #include "cluster/cluster_utils.h" #include "cluster/controller.h" #include "cluster/fwd.h" @@ -139,7 +141,7 @@ void application::shutdown() { _kafka_server.invoke_on_all(&net::server::shutdown_input).get(); } if (_rpc.local_is_initialized()) { - _rpc.invoke_on_all(&net::server::shutdown_input).get(); + _rpc.invoke_on_all(&rpc::rpc_server::shutdown_input).get(); } // We schedule shutting down controller input and aborting its operation as @@ -182,7 +184,7 @@ void application::shutdown() { _kafka_conn_quotas.stop().get(); } if (_rpc.local_is_initialized()) { - _rpc.invoke_on_all(&net::server::wait_for_shutdown).get(); + _rpc.invoke_on_all(&rpc::rpc_server::wait_for_shutdown).get(); _rpc.stop().get(); } @@ -714,8 +716,8 @@ make_upload_controller_config(ss::scheduling_group sg) { } // add additional services in here -void application::wire_up_services() { - wire_up_redpanda_services(); +void application::wire_up_runtime_services(model::node_id node_id) { + wire_up_redpanda_services(node_id); if (_proxy_config) { construct_service( _proxy_client, @@ -747,7 +749,7 @@ void application::wire_up_services() { if (_schema_reg_config) { construct_single_service( _schema_registry, - config::node().node_id(), + node_id, smp_service_groups.proxy_smp_sg(), // TODO: Improve memory budget for services // https://github.com/redpanda-data/redpanda/issues/1392 @@ -759,7 +761,7 @@ void application::wire_up_services() { configure_admin_server(); } -void application::wire_up_redpanda_services() { +void application::wire_up_redpanda_services(model::node_id node_id) { ss::smp::invoke_on_all([] { return storage::internal::chunks().start(); }).get(); @@ -768,25 +770,11 @@ void application::wire_up_redpanda_services() { construct_service(_feature_table).get(); // cluster - syschecks::systemd_message("Adding raft client cache").get(); + syschecks::systemd_message("Initializing connection cache").get(); construct_service(_connection_cache).get(); syschecks::systemd_message("Building shard-lookup tables").get(); construct_service(shard_table).get(); - syschecks::systemd_message("Intializing storage services").get(); - construct_single_service_sharded(storage_node).get(); - - construct_service( - storage, - []() { return kvstore_config_from_global_config(); }, - [this]() { - auto log_cfg = manager_config_from_global_config(_scheduling_groups); - log_cfg.reclaim_opts.background_reclaimer_sg - = _scheduling_groups.cache_background_reclaim_sg(); - return log_cfg; - }) - .get(); - syschecks::systemd_message("Intializing raft recovery throttle").get(); recovery_throttle .start(ss::sharded_parameter([] { @@ -797,7 +785,7 @@ void application::wire_up_redpanda_services() { syschecks::systemd_message("Intializing raft group manager").get(); raft_group_manager .start( - model::node_id(config::node().node_id()), + node_id, _scheduling_groups.raft_sg(), [] { return raft::group_manager::configuration{ @@ -890,11 +878,11 @@ void application::wire_up_redpanda_services() { std::ref(cloud_storage_api)); controller->wire_up().get0(); - construct_service(node_status_table, config::node().node_id()).get(); + construct_service(node_status_table, node_id).get(); construct_single_service_sharded( node_status_backend, - config::node().node_id(), + node_id, std::ref(controller->get_members_table()), std::ref(_feature_table), std::ref(node_status_table), @@ -1045,56 +1033,6 @@ void application::wire_up_redpanda_services() { coprocessing->start().get(); } - // rpc - ss::sharded rpc_cfg; - rpc_cfg.start(ss::sstring("internal_rpc")).get(); - rpc_cfg - .invoke_on_all([this](net::server_configuration& c) { - return ss::async([this, &c] { - auto rpc_server_addr - = net::resolve_dns(config::node().rpc_server()).get0(); - c.load_balancing_algo - = ss::server_socket::load_balancing_algorithm::port; - c.max_service_memory_per_core = memory_groups::rpc_total_memory(); - c.disable_metrics = net::metrics_disabled( - config::shard_local_cfg().disable_metrics()); - c.disable_public_metrics = net::public_metrics_disabled( - config::shard_local_cfg().disable_public_metrics()); - c.listen_backlog - = config::shard_local_cfg().rpc_server_listen_backlog; - c.tcp_recv_buf - = config::shard_local_cfg().rpc_server_tcp_recv_buf; - c.tcp_send_buf - = config::shard_local_cfg().rpc_server_tcp_send_buf; - auto rpc_builder = config::node() - .rpc_server_tls() - .get_credentials_builder() - .get0(); - auto credentials - = rpc_builder - ? rpc_builder - ->build_reloadable_server_credentials( - [this]( - const std::unordered_set& updated, - const std::exception_ptr& eptr) { - cluster::log_certificate_reload_event( - _log, "Internal RPC TLS", updated, eptr); - }) - .get0() - : nullptr; - c.addrs.emplace_back(rpc_server_addr, credentials); - }); - }) - .get(); - /** - * Use port based load_balancing_algorithm to make connection shard - * assignment deterministic. - **/ - syschecks::systemd_message("Starting internal RPC {}", rpc_cfg.local()) - .get(); - _rpc.start(&rpc_cfg).get(); - rpc_cfg.stop().get(); - syschecks::systemd_message("Creating id allocator frontend").get(); construct_service( id_allocator_frontend, @@ -1290,56 +1228,81 @@ application::set_proxy_client_config(ss::sstring name, std::any val) { }); } -void application::wire_up_and_start(::stop_signal& app_signal, bool test_mode) { - wire_up_services(); - - if (test_mode) { - // When running inside a unit test fixture, we may fast-forward - // some of initialization that would usually wait for the controller - // to commit some state to its log. - vlog(_log.warn, "Running in unit test mode"); - _feature_table - .invoke_on_all( - [](features::feature_table& ft) { ft.testing_activate_all(); }) - .get(); - } - - start_redpanda(app_signal); - - if (_proxy_config) { - _proxy.invoke_on_all(&pandaproxy::rest::proxy::start).get(); - vlog( - _log.info, - "Started Pandaproxy listening at {}", - _proxy_config->pandaproxy_api()); - } - - if (_schema_reg_config) { - _schema_registry->start().get(); - vlog( - _log.info, - "Started Schema Registry listening at {}", - _schema_reg_config->schema_registry_api()); - } - - start_kafka(app_signal); +void application::wire_up_bootstrap_services() { + // Wire up local storage. + ss::smp::invoke_on_all([] { + return storage::internal::chunks().start(); + }).get(); + syschecks::systemd_message("Constructing storage services").get(); + construct_single_service_sharded(storage_node).get(); + construct_service( + storage, + []() { return kvstore_config_from_global_config(); }, + [this]() { + auto log_cfg = manager_config_from_global_config(_scheduling_groups); + log_cfg.reclaim_opts.background_reclaimer_sg + = _scheduling_groups.cache_background_reclaim_sg(); + return log_cfg; + }) + .get(); - _admin.invoke_on_all([](admin_server& admin) { admin.set_ready(); }).get(); + // Wire up the internal RPC server. + ss::sharded rpc_cfg; + rpc_cfg.start(ss::sstring("internal_rpc")).get(); + rpc_cfg + .invoke_on_all([this](net::server_configuration& c) { + return ss::async([this, &c] { + auto rpc_server_addr + = net::resolve_dns(config::node().rpc_server()).get0(); + c.load_balancing_algo + = ss::server_socket::load_balancing_algorithm::port; + c.max_service_memory_per_core = memory_groups::rpc_total_memory(); + c.disable_metrics = net::metrics_disabled( + config::shard_local_cfg().disable_metrics()); + c.disable_public_metrics = net::public_metrics_disabled( + config::shard_local_cfg().disable_public_metrics()); + c.listen_backlog + = config::shard_local_cfg().rpc_server_listen_backlog; + c.tcp_recv_buf + = config::shard_local_cfg().rpc_server_tcp_recv_buf; + c.tcp_send_buf + = config::shard_local_cfg().rpc_server_tcp_send_buf; + auto rpc_builder = config::node() + .rpc_server_tls() + .get_credentials_builder() + .get0(); + auto credentials + = rpc_builder + ? rpc_builder + ->build_reloadable_server_credentials( + [this]( + const std::unordered_set& updated, + const std::exception_ptr& eptr) { + cluster::log_certificate_reload_event( + _log, "Internal RPC TLS", updated, eptr); + }) + .get0() + : nullptr; + c.addrs.emplace_back(rpc_server_addr, credentials); + }); + }) + .get(); - vlog(_log.info, "Successfully started Redpanda!"); - syschecks::systemd_notify_ready().get(); + // Use port based load_balancing_algorithm to make connection shard + // assignment deterministic. + syschecks::systemd_message( + "Constructing internal RPC services {}", rpc_cfg.local()) + .get(); + _rpc.start(&rpc_cfg).get(); + rpc_cfg.stop().get(); } -void application::start_redpanda(::stop_signal& app_signal) { - syschecks::systemd_message("Staring storage services").get(); +void application::start_bootstrap_services() { + syschecks::systemd_message("Starting storage services").get(); // single instance storage_node.invoke_on_all(&storage::node_api::start).get0(); - // single instance - node_status_backend.invoke_on_all(&cluster::node_status_backend::start) - .get(); - // Early initialization of disk stats, so that logic for e.g. picking // falloc sizes works without having to wait for a local_monitor tick. auto tmp_lm = cluster::node::local_monitor( @@ -1370,6 +1333,79 @@ void application::start_redpanda(::stop_signal& app_signal) { storage.invoke_on_all(&storage::api::start).get(); + syschecks::systemd_message("Starting internal RPC bootstrap service").get(); + _rpc + .invoke_on_all([this](rpc::rpc_server& s) { + std::vector> bootstrap_service; + bootstrap_service.push_back( + std::make_unique( + _scheduling_groups.cluster_sg(), + smp_service_groups.cluster_smp_sg())); + s.add_services(std::move(bootstrap_service)); + }) + .get(); + _rpc.invoke_on_all(&rpc::rpc_server::start).get(); + vlog( + _log.info, + "Started RPC server listening at {}", + config::node().rpc_server()); +} + +void application::wire_up_and_start(::stop_signal& app_signal, bool test_mode) { + wire_up_bootstrap_services(); + start_bootstrap_services(); + + // Begin the cluster discovery manager so we can determine our initial node + // ID. A valid node ID is required before we can initialize the rest of our + // subsystems. + const auto& node_uuid = storage.local().node_uuid(); + cluster::cluster_discovery cd(node_uuid); + auto node_id = cd.determine_node_id().get(); + + vlog(_log.info, "Starting Redpanda with node_id {}", node_id); + + wire_up_runtime_services(node_id); + + if (test_mode) { + // When running inside a unit test fixture, we may fast-forward + // some of initialization that would usually wait for the controller + // to commit some state to its log. + vlog(_log.warn, "Running in unit test mode"); + _feature_table + .invoke_on_all( + [](features::feature_table& ft) { ft.testing_activate_all(); }) + .get(); + } + + start_runtime_services(cd, app_signal); + + if (_proxy_config) { + _proxy.invoke_on_all(&pandaproxy::rest::proxy::start).get(); + vlog( + _log.info, + "Started Pandaproxy listening at {}", + _proxy_config->pandaproxy_api()); + } + + if (_schema_reg_config) { + _schema_registry->start().get(); + vlog( + _log.info, + "Started Schema Registry listening at {}", + _schema_reg_config->schema_registry_api()); + } + + start_kafka(node_id, app_signal); + + _admin.invoke_on_all([](admin_server& admin) { admin.set_ready(); }).get(); + + vlog(_log.info, "Successfully started Redpanda!"); + syschecks::systemd_notify_ready().get(); +} + +void application::start_runtime_services( + const cluster::cluster_discovery& cluster_discovery, + ::stop_signal& app_signal) { ssx::background = _feature_table.invoke_on_all( [this](features::feature_table& ft) -> ss::future<> { try { @@ -1394,6 +1430,9 @@ void application::start_redpanda(::stop_signal& app_signal) { } }); + // single instance + node_status_backend.invoke_on_all(&cluster::node_status_backend::start) + .get(); syschecks::systemd_message("Starting the partition manager").get(); partition_manager.invoke_on_all(&cluster::partition_manager::start).get(); @@ -1408,7 +1447,7 @@ void application::start_redpanda(::stop_signal& app_signal) { _co_group_manager.invoke_on_all(&kafka::group_manager::start).get(); syschecks::systemd_message("Starting controller").get(); - controller->start().get0(); + controller->start(cluster_discovery.initial_raft0_brokers()).get0(); kafka_group_migration = ss::make_lw_shared( *controller, group_router); @@ -1421,28 +1460,29 @@ void application::start_redpanda(::stop_signal& app_signal) { syschecks::systemd_message("Starting RPC").get(); _rpc - .invoke_on_all([this](net::server& s) { - auto proto = std::make_unique(); - proto->register_service( + .invoke_on_all([this](rpc::rpc_server& s) { + std::vector> runtime_services; + runtime_services.push_back(std::make_unique( _scheduling_groups.raft_sg(), smp_service_groups.raft_smp_sg(), - std::ref(id_allocator_frontend)); + std::ref(id_allocator_frontend))); // _rm_group_proxy is wrap around a sharded service with only // `.local()' access so it's ok to share without foreign_ptr - proto->register_service( + runtime_services.push_back(std::make_unique( _scheduling_groups.raft_sg(), smp_service_groups.raft_smp_sg(), std::ref(tx_gateway_frontend), _rm_group_proxy.get(), - std::ref(rm_partition_frontend)); - proto->register_service< - raft::service>( - _scheduling_groups.raft_sg(), - smp_service_groups.raft_smp_sg(), - partition_manager, - shard_table.local(), - config::shard_local_cfg().raft_heartbeat_interval_ms()); - proto->register_service( + std::ref(rm_partition_frontend))); + runtime_services.push_back( + std::make_unique< + raft::service>( + _scheduling_groups.raft_sg(), + smp_service_groups.raft_smp_sg(), + partition_manager, + shard_table.local(), + config::shard_local_cfg().raft_heartbeat_interval_ms())); + runtime_services.push_back(std::make_unique( _scheduling_groups.cluster_sg(), smp_service_groups.cluster_smp_sg(), std::ref(controller->get_topics_frontend()), @@ -1456,35 +1496,28 @@ void application::start_redpanda(::stop_signal& app_signal) { std::ref(controller->get_feature_manager()), std::ref(controller->get_feature_table()), std::ref(controller->get_health_monitor()), - std::ref(_connection_cache)); - - proto->register_service( - _scheduling_groups.cluster_sg(), - smp_service_groups.cluster_smp_sg(), - std::ref(controller->get_partition_leaders())); - - proto->register_service( - _scheduling_groups.cluster_sg(), - smp_service_groups.cluster_smp_sg(), - std::ref(controller->get_partition_balancer())); - - proto->register_service( - _scheduling_groups.node_status(), - smp_service_groups.cluster_smp_sg(), - std::ref(node_status_backend)); - - if (!config::shard_local_cfg().disable_metrics()) { - proto->setup_metrics(); - } - - s.set_protocol(std::move(proto)); + std::ref(_connection_cache))); + + runtime_services.push_back( + std::make_unique( + _scheduling_groups.cluster_sg(), + smp_service_groups.cluster_smp_sg(), + std::ref(controller->get_partition_leaders()))); + + runtime_services.push_back( + std::make_unique( + _scheduling_groups.node_status(), + smp_service_groups.cluster_smp_sg(), + std::ref(node_status_backend))); + + runtime_services.push_back( + std::make_unique( + _scheduling_groups.cluster_sg(), + smp_service_groups.cluster_smp_sg(), + std::ref(controller->get_partition_balancer()))); + s.add_services(std::move(runtime_services)); }) .get(); - _rpc.invoke_on_all(&net::server::start).get(); - vlog( - _log.info, - "Started RPC server listening at {}", - config::node().rpc_server()); // After we have started internal RPC listener, we may join // the cluster (if we aren't already a member) @@ -1523,7 +1556,8 @@ void application::start_redpanda(::stop_signal& app_signal) { * cluster -- this is expected to be run last, after everything else is * started. */ -void application::start_kafka(::stop_signal& app_signal) { +void application::start_kafka( + const model::node_id& node_id, ::stop_signal& app_signal) { // Kafka API // The Kafka listener is intentionally the last thing we start: during // this phase we will wait for the node to be a cluster member before @@ -1572,7 +1606,7 @@ void application::start_kafka(::stop_signal& app_signal) { vlog(_log.info, "Waiting for cluster membership"); controller->get_members_table() .local() - .await_membership(config::node().node_id(), app_signal.abort_source()) + .await_membership(node_id, app_signal.abort_source()) .get(); _kafka_server.invoke_on_all(&net::server::start).get(); vlog( diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index b2181f3cf166..d8e696ce1f0f 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -36,6 +36,7 @@ #include "resource_mgmt/scheduling_groups_probe.h" #include "resource_mgmt/smp_groups.h" #include "rpc/fwd.h" +#include "rpc/rpc_server.h" #include "seastarx.h" #include "ssx/metrics.h" #include "storage/fwd.h" @@ -49,6 +50,10 @@ namespace po = boost::program_options; // NOLINT +namespace cluster { +class cluster_discovery; +} // namespace cluster + namespace kafka { struct group_metadata_migration; } // namespace kafka @@ -113,11 +118,22 @@ class application { using deferred_actions = std::vector>>; + // Constructs services across shards required to get bootstrap metadata. + void wire_up_bootstrap_services(); + + // Starts services across shards required to get bootstrap metadata. + void start_bootstrap_services(); + + // Constructs services across shards meant for Redpanda runtime. + void wire_up_runtime_services(model::node_id node_id); void configure_admin_server(); - void wire_up_services(); - void wire_up_redpanda_services(); - void start_redpanda(::stop_signal&); - void start_kafka(::stop_signal&); + void wire_up_redpanda_services(model::node_id); + + // Starts the services meant for Redpanda runtime. Must be called after + // having constructed the subsystems via the corresponding `wire_up` calls. + void + start_runtime_services(const cluster::cluster_discovery&, ::stop_signal&); + void start_kafka(const model::node_id&, ::stop_signal&); // All methods are calleds from Seastar thread ss::app_template::config setup_app_config(); @@ -173,7 +189,7 @@ class application { ss::sharded _feature_table; ss::sharded _group_manager; ss::sharded _co_group_manager; - ss::sharded _rpc; + ss::sharded _rpc; ss::sharded _admin; ss::sharded _kafka_conn_quotas; ss::sharded _kafka_server; From a9b565187f37b77a3a7946f47e3cb3de0a8c6a70 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Thu, 6 Oct 2022 00:32:37 -0700 Subject: [PATCH 05/17] util: add named_type static constructor from args It can be convenient to construct a named_type using constructors of the underlying type. A simple solution today is to construct the underlying type separately and then construct the named_type with it, but this spills the underlying type into call sites. Another approach would be to construct with the named_type::type constructor, but this can be verbose. This commit adds a templatized static constructor that callers can pass constructor args into. --- src/v/utils/named_type.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/v/utils/named_type.h b/src/v/utils/named_type.h index c70af04801e8..c76ed9bcb173 100644 --- a/src/v/utils/named_type.h +++ b/src/v/utils/named_type.h @@ -137,6 +137,11 @@ class base_named_type { explicit base_named_type(type&& v) : _value(std::move(v)) {} + template + static base_named_type from(Args&&... args) { + return base_named_type(T(std::forward(args)...)); + } + base_named_type(base_named_type&& o) noexcept(move_noexcept) = default; base_named_type& operator=(base_named_type&& o) noexcept(move_noexcept) From 55f3e76d3503a43cd8d8da101897ed3e2cca9804 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Fri, 23 Sep 2022 17:13:41 -0700 Subject: [PATCH 06/17] application: generate node UUID on startup Upon first starting up the storage layer, Redpanda will now check to see if a node UUID exists in the controller keyspace of the key-value store. If not, it will generate one and persist it to disk, and make it available on all shards. This will be used as a unique identifier when registering a node (particularly important in the future when we auto-assign node IDs). --- src/v/redpanda/application.cc | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index fd01e07a44f1..c92176e04101 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -86,6 +86,7 @@ #include #include +#include #include #include @@ -1349,6 +1350,36 @@ void application::start_bootstrap_services() { _log.info, "Started RPC server listening at {}", config::node().rpc_server()); + + // Load the local node UUID, or create one if none exists. + auto& kvs = storage.local().kvs(); + static const bytes node_uuid_key = "node_uuid"; + model::node_uuid node_uuid; + auto node_uuid_buf = kvs.get( + storage::kvstore::key_space::controller, node_uuid_key); + if (node_uuid_buf) { + node_uuid = serde::from_iobuf( + std::move(*node_uuid_buf)); + vlog( + _log.info, + "Loaded existing UUID for node: {}", + model::node_uuid(node_uuid)); + } else { + boost::uuids::random_generator uuid_gen; + node_uuid = model::node_uuid::from(uuid_gen()); + vlog(_log.info, "Generated new UUID for node: {}", node_uuid); + kvs + .put( + storage::kvstore::key_space::controller, + node_uuid_key, + serde::to_iobuf(node_uuid)) + .get(); + } + storage + .invoke_on_all([node_uuid](storage::api& storage) mutable { + storage.set_node_uuid(node_uuid); + }) + .get(); } void application::wire_up_and_start(::stop_signal& app_signal, bool test_mode) { From bfebd6d594d3184325410f41d49b629e27dba941 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Wed, 28 Sep 2022 20:48:35 -0700 Subject: [PATCH 07/17] cluster: command to register a UUID This adds a new controller command to register a node UUID. It is activated by the existing join RPC endpoint and has the following semantics: - Register: all new versions of Redpanda will send their UUID along with an optional node ID in their initial join request. If the UUID is not in the leader controller's `members_table`, the UUID is registered by replicating a `register_node_uuid_cmd`. - Join: all new version of Redpanda will, after registering, send another join request that will actually add the node to the Raft group. Old versions of Redpanda will send a join request with no UUID. --- src/v/cluster/commands.h | 6 +++ src/v/cluster/members_manager.cc | 73 +++++++++++++++++++++++++++++++- src/v/cluster/members_manager.h | 16 +++++++ 3 files changed, 94 insertions(+), 1 deletion(-) diff --git a/src/v/cluster/commands.h b/src/v/cluster/commands.h index 1f637ba8c7a7..ff5f3da3a4bb 100644 --- a/src/v/cluster/commands.h +++ b/src/v/cluster/commands.h @@ -107,6 +107,7 @@ static constexpr int8_t decommission_node_cmd_type = 0; static constexpr int8_t recommission_node_cmd_type = 1; static constexpr int8_t finish_reallocations_cmd_type = 2; static constexpr int8_t maintenance_mode_cmd_type = 3; +static constexpr int8_t register_node_uuid_cmd_type = 4; // cluster config commands static constexpr int8_t cluster_config_delta_cmd_type = 0; @@ -239,6 +240,11 @@ using finish_reallocations_cmd = controller_command< finish_reallocations_cmd_type, model::record_batch_type::node_management_cmd, serde_opts::adl_and_serde>; +using register_node_uuid_cmd = controller_command< + model::node_uuid, + std::optional, + register_node_uuid_cmd_type, + model::record_batch_type::node_management_cmd>; using maintenance_mode_cmd = controller_command< model::node_id, diff --git a/src/v/cluster/members_manager.cc b/src/v/cluster/members_manager.cc index d230c8ea866b..e448e16f6538 100644 --- a/src/v/cluster/members_manager.cc +++ b/src/v/cluster/members_manager.cc @@ -26,6 +26,7 @@ #include "random/generators.h" #include "redpanda/application.h" #include "reflection/adl.h" +#include "seastarx.h" #include "storage/api.h" #include @@ -61,7 +62,8 @@ members_manager::members_manager( , _drain_manager(drain_manager) , _as(as) , _rpc_tls_config(config::node().rpc_server_tls()) - , _update_queue(max_updates_queue_size) { + , _update_queue(max_updates_queue_size) + , _next_assigned_id(model::node_id(1)) { auto sub = _as.local().subscribe([this]() noexcept { _update_queue.abort( std::make_exception_ptr(ss::abort_requested_exception{})); @@ -202,7 +204,9 @@ ss::future<> members_manager::handle_raft0_cfg_update( ss::future members_manager::apply_update(model::record_batch b) { + vlog(clusterlog.info, "Applying update to members_manager"); if (b.header().type == model::record_batch_type::raft_configuration) { + vlog(clusterlog.info, "Raft config update"); co_return co_await apply_raft_configuration_batch(std::move(b)); } @@ -270,6 +274,30 @@ members_manager::apply_update(model::record_batch b) { } return f.then([error] { return error; }); }); + }, + [this](register_node_uuid_cmd cmd) { + const auto& node_uuid = cmd.key; + const auto& requested_node_id = cmd.value; + if (requested_node_id) { + if (likely(try_register_node_id(*requested_node_id, node_uuid))) { + return ss::make_ready_future(errc::success); + } + vlog( + clusterlog.warn, + "Couldn't register UUID {}, node ID {} already taken", + node_uuid, + requested_node_id); + return ss::make_ready_future( + errc::join_request_dispatch_error); + } + vlog( + clusterlog.info, + "Applying register_node_uuid_cmd for UUID {}", + node_uuid); + auto node_id = get_or_assign_node_id(node_uuid); + vlog( + clusterlog.info, "Node UUID {} has node ID {}", node_uuid, node_id); + return ss::make_ready_future(errc::success); }); } ss::future @@ -423,6 +451,49 @@ void members_manager::join_raft0() { }); } +bool members_manager::try_register_node_id( + const model::node_id& requested_node_id, + const model::node_uuid& requested_node_uuid) { + vassert(requested_node_id != model::node_id(-1), "invalid node ID"); + const auto it = _id_by_uuid.find(requested_node_uuid); + if (it == _id_by_uuid.end()) { + if (_members_table.local().contains(requested_node_id)) { + // The cluster was likely just upgraded from a version that didn't + // have node UUIDs. If the node ID is already a part of the + // member's table, accept the requested UUID. + clusterlog.info( + "registering node ID that is already a member of the cluster"); + } + // This is a brand new node with node ID assignment support, requesting + // the given node ID. + _id_by_uuid.emplace(requested_node_uuid, requested_node_id); + return true; + } + const auto& node_id = it->second; + return node_id == requested_node_id; +} + +std::optional +members_manager::get_or_assign_node_id(const model::node_uuid& node_uuid) { + const auto it = _id_by_uuid.find(node_uuid); + if (it == _id_by_uuid.end()) { + while (_members_table.local().contains(_next_assigned_id)) { + if (_next_assigned_id == INT_MAX) { + return std::nullopt; + } + ++_next_assigned_id; + } + _id_by_uuid.emplace(node_uuid, _next_assigned_id); + vlog( + clusterlog.info, + "Assigned node UUID {} a node ID {}", + node_uuid, + _next_assigned_id); + return _next_assigned_id++; + } + return it->second; +} + ss::future> members_manager::dispatch_join_to_seed_server( seed_iterator it, join_node_request const& req) { diff --git a/src/v/cluster/members_manager.h b/src/v/cluster/members_manager.h index 6757640c6e61..c4b7d137bf67 100644 --- a/src/v/cluster/members_manager.h +++ b/src/v/cluster/members_manager.h @@ -141,6 +141,7 @@ class members_manager { ss::future> get_node_updates(); private: + using uuid_map_t = absl::flat_hash_map; using seed_iterator = std::vector::const_iterator; // Cluster join void join_raft0(); @@ -148,6 +149,17 @@ class members_manager { ss::future<> initialize_broker_connection(const model::broker&); + // Returns the node ID for a given node UUID, assigning one if one does not + // already exist. Returns nullopt when there are no more node IDs left. + std::optional + get_or_assign_node_id(const model::node_uuid&); + + // Attempts to register the given node ID with the given node UUID. If a + // different node ID exists for the given node UUID, returns false. + // + // Does not check for duplicate node IDs. + bool try_register_node_id(const model::node_id&, const model::node_uuid&); + ss::future> dispatch_join_to_seed_server( seed_iterator it, join_node_request const& req); ss::future> dispatch_join_to_remote( @@ -207,6 +219,10 @@ class members_manager { // get_node_updates(). ss::queue _update_queue; + uuid_map_t _id_by_uuid; + + model::node_id _next_assigned_id; + // Subscription to _as with which to signal an abort to _update_queue. ss::abort_source::subscription _queue_abort_subscription; From 45d3c5dc88f0615e34af95d30318cbe09eef8f3c Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Mon, 3 Oct 2022 23:26:45 -0700 Subject: [PATCH 08/17] features: add flag for node_id_assignment --- src/v/features/feature_table.cc | 2 ++ src/v/features/feature_table.h | 8 +++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/v/features/feature_table.cc b/src/v/features/feature_table.cc index 4a8712540861..16a0a4b42acc 100644 --- a/src/v/features/feature_table.cc +++ b/src/v/features/feature_table.cc @@ -43,6 +43,8 @@ std::string_view to_string_view(feature f) { return "raftless_node_status"; case feature::rpc_v2_by_default: return "rpc_v2_by_default"; + case feature::node_id_assignment: + return "node_id_assignment"; case feature::test_alpha: return "__test_alpha"; } diff --git a/src/v/features/feature_table.h b/src/v/features/feature_table.h index a1c839f02fcf..b676e8fbaeb3 100644 --- a/src/v/features/feature_table.h +++ b/src/v/features/feature_table.h @@ -42,6 +42,7 @@ enum class feature : std::uint64_t { transaction_ga = 0x100, raftless_node_status = 0x200, rpc_v2_by_default = 0x400, + node_id_assignment = 0x800, // Dummy features for testing only test_alpha = uint64_t(1) << 63, @@ -160,7 +161,12 @@ constexpr static std::array feature_schema{ feature::rpc_v2_by_default, feature_spec::available_policy::always, feature_spec::prepare_policy::always}, - + feature_spec{ + cluster_version{7}, + "node_id_assignment", + feature::node_id_assignment, + feature_spec::available_policy::always, + feature_spec::prepare_policy::always}, feature_spec{ cluster_version{2001}, "__test_alpha", From 58057ab561dcdb6cb4f54af873a661b0362e810a Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Thu, 6 Oct 2022 00:59:55 -0700 Subject: [PATCH 09/17] config: make node ID optional This commit makes the node_id config property optional. It doesn't make any checks that the field is set. A follow-up will change the Redpanda's behavior when seeing an empty node ID. --- src/v/cluster/cluster_discovery.cc | 2 +- src/v/cluster/cluster_utils.cc | 9 ++++++++- src/v/cluster/config_frontend.cc | 2 +- src/v/cluster/config_manager.cc | 2 +- src/v/cluster/controller_backend.cc | 2 +- src/v/cluster/feature_manager.cc | 6 +++--- src/v/cluster/members_frontend.cc | 2 +- src/v/cluster/topic_table_probe.cc | 2 +- src/v/config/node_config.cc | 6 ++++-- src/v/config/node_config.h | 7 ++++++- src/v/coproc/reconciliation_backend.cc | 2 +- src/v/coproc/tests/partition_movement_tests.cc | 3 ++- src/v/kafka/server/handlers/describe_configs.cc | 4 ++-- src/v/kafka/server/handlers/metadata.cc | 4 ++-- src/v/redpanda/admin_server.cc | 4 ++-- 15 files changed, 36 insertions(+), 21 deletions(-) diff --git a/src/v/cluster/cluster_discovery.cc b/src/v/cluster/cluster_discovery.cc index 4aa4fec3e4a7..06811dac0b00 100644 --- a/src/v/cluster/cluster_discovery.cc +++ b/src/v/cluster/cluster_discovery.cc @@ -28,7 +28,7 @@ cluster_discovery::cluster_discovery(const model::node_uuid& node_uuid) : _node_uuid(node_uuid) {} ss::future cluster_discovery::determine_node_id() { - co_return config::node().node_id(); + co_return *config::node().node_id(); } vector cluster_discovery::initial_raft0_brokers() const { diff --git a/src/v/cluster/cluster_utils.cc b/src/v/cluster/cluster_utils.cc index 0b25d2258d1f..4d35fa993c1a 100644 --- a/src/v/cluster/cluster_utils.cc +++ b/src/v/cluster/cluster_utils.cc @@ -193,8 +193,15 @@ model::broker make_self_broker(const config::node_config& node_cfg) { // As for memory, if disk_gb is zero this is handled like a legacy broker. uint32_t disk_gb = space_info.capacity >> 30; + // If this node hasn't been configured with a node ID, use -1 to indicate + // that we don't yet know it yet. This shouldn't be used during the normal + // operation of a broker, and instead should be used to indicate a broker + // that needs to be assigned a node ID when it first starts up. + model::node_id node_id = node_cfg.node_id() == std::nullopt + ? model::node_id(-1) + : *node_cfg.node_id(); return model::broker( - model::node_id(node_cfg.node_id), + node_id, kafka_addr, rpc_addr, node_cfg.rack, diff --git a/src/v/cluster/config_frontend.cc b/src/v/cluster/config_frontend.cc index de941291a455..4ec8e0c7bcc4 100644 --- a/src/v/cluster/config_frontend.cc +++ b/src/v/cluster/config_frontend.cc @@ -28,7 +28,7 @@ config_frontend::config_frontend( , _leaders(leaders) , _features(features) , _as(as) - , _self(config::node().node_id()) {} + , _self(*config::node().node_id()) {} /** * RPC wrapper on do_patch, to dispatch to the controller leader diff --git a/src/v/cluster/config_manager.cc b/src/v/cluster/config_manager.cc index 306a5d000bed..99e2824644c9 100644 --- a/src/v/cluster/config_manager.cc +++ b/src/v/cluster/config_manager.cc @@ -55,7 +55,7 @@ config_manager::config_manager( ss::sharded& pl, ss::sharded& ft, ss::sharded& as) - : _self(config::node().node_id()) + : _self(*config::node().node_id()) , _frontend(cf) , _connection_cache(cc) , _leaders(pl) diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index 411de5b5d4aa..c860ede682c2 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -260,7 +260,7 @@ controller_backend::controller_backend( , _partition_leaders_table(leaders) , _topics_frontend(frontend) , _storage(storage) - , _self(model::node_id(config::node().node_id)) + , _self(*config::node().node_id()) , _data_directory(config::node().data_directory().as_sstring()) , _housekeeping_timer_interval( config::shard_local_cfg().controller_backend_housekeeping_interval_ms()) diff --git a/src/v/cluster/feature_manager.cc b/src/v/cluster/feature_manager.cc index 81c396c5bce6..a119635aacd9 100644 --- a/src/v/cluster/feature_manager.cc +++ b/src/v/cluster/feature_manager.cc @@ -45,7 +45,7 @@ feature_manager::feature_manager( , _connection_cache(connection_cache) , _raft0_group(raft0_group) , _barrier_state( - config::node().node_id(), + *config::node().node_id(), members.local(), as.local(), _gate, @@ -109,7 +109,7 @@ ss::future<> feature_manager::start() { vlog( clusterlog.debug, "Controller leader notification term {}", term); - _am_controller_leader = leader_id == config::node().node_id(); + _am_controller_leader = leader_id == *config::node().node_id(); // This hook avoids the need for the controller leader to receive // its own health report to generate a call to update_node_version. @@ -128,7 +128,7 @@ ss::future<> feature_manager::start() { leader_id.value(), features::feature_table::get_latest_logical_version()); update_node_version( - config::node().node_id, + *config::node().node_id(), features::feature_table::get_latest_logical_version()); } }); diff --git a/src/v/cluster/members_frontend.cc b/src/v/cluster/members_frontend.cc index 5e3ec545d825..be3714b7cb81 100644 --- a/src/v/cluster/members_frontend.cc +++ b/src/v/cluster/members_frontend.cc @@ -34,7 +34,7 @@ members_frontend::members_frontend( ss::sharded& leaders, ss::sharded& feature_table, ss::sharded& as) - : _self(config::node().node_id()) + : _self(*config::node().node_id()) , _node_op_timeout( config::shard_local_cfg().node_management_operation_timeout_ms) , _stm(stm) diff --git a/src/v/cluster/topic_table_probe.cc b/src/v/cluster/topic_table_probe.cc index 279ed516fbe2..ac3169cb6675 100644 --- a/src/v/cluster/topic_table_probe.cc +++ b/src/v/cluster/topic_table_probe.cc @@ -19,7 +19,7 @@ namespace cluster { topic_table_probe::topic_table_probe(const topic_table& topic_table) : _topic_table(topic_table) - , _node_id(config::node().node_id()) { + , _node_id(*config::node().node_id()) { setup_metrics(); } diff --git a/src/v/config/node_config.cc b/src/v/config/node_config.cc index 11f26c3642de..9784ac243173 100644 --- a/src/v/config/node_config.cc +++ b/src/v/config/node_config.cc @@ -29,8 +29,10 @@ node_config::node_config() noexcept , node_id( *this, "node_id", - "Unique id identifying a node in the cluster", - {.required = required::yes, .visibility = visibility::user}) + "Unique id identifying a node in the cluster. If missing, a unique id " + "will be assigned for this node when it joins the cluster", + {.visibility = visibility::user}, + std::nullopt) , rack( *this, "rack", diff --git a/src/v/config/node_config.h b/src/v/config/node_config.h index c85eaf2fc36b..faf6b2d39598 100644 --- a/src/v/config/node_config.h +++ b/src/v/config/node_config.h @@ -26,7 +26,12 @@ struct node_config final : public config_store { public: property developer_mode; property data_directory; - property node_id; + + // NOTE: during the normal runtime of a cluster, it is safe to assume that + // the value of the node ID has been determined, and that there is a value + // set for this property. + property> node_id; + property> rack; property> seed_servers; diff --git a/src/v/coproc/reconciliation_backend.cc b/src/v/coproc/reconciliation_backend.cc index d6f8c34f6808..473960bc083d 100644 --- a/src/v/coproc/reconciliation_backend.cc +++ b/src/v/coproc/reconciliation_backend.cc @@ -79,7 +79,7 @@ reconciliation_backend::reconciliation_backend( ss::sharded& coproc_pm, ss::sharded& pacemaker, ss::sharded& sdb) noexcept - : _self(model::node_id(config::node().node_id)) + : _self(model::node_id(*config::node().node_id())) , _data_directory(config::node().data_directory().as_sstring()) , _topics(topics) , _shard_table(shard_table) diff --git a/src/v/coproc/tests/partition_movement_tests.cc b/src/v/coproc/tests/partition_movement_tests.cc index 0e0a880cdc4d..cdb36b80a0a0 100644 --- a/src/v/coproc/tests/partition_movement_tests.cc +++ b/src/v/coproc/tests/partition_movement_tests.cc @@ -58,7 +58,8 @@ FIXTURE_TEST(test_move_source_topic, coproc_test_fixture) { const ss::shard_id next_shard = (*shard + 1) % ss::smp::count; info("Current target shard {} and next shard {}", *shard, next_shard); model::broker_shard bs{ - .node_id = model::node_id(config::node().node_id), .shard = next_shard}; + .node_id = model::node_id(*config::node().node_id()), + .shard = next_shard}; /// Move the input onto the new desired target auto& topics_fe = root_fixture()->app.controller->get_topics_frontend(); diff --git a/src/v/kafka/server/handlers/describe_configs.cc b/src/v/kafka/server/handlers/describe_configs.cc index ce74c64e85a0..c6c900c2f691 100644 --- a/src/v/kafka/server/handlers/describe_configs.cc +++ b/src/v/kafka/server/handlers/describe_configs.cc @@ -294,12 +294,12 @@ static void report_broker_config( result.resource_name.data() + result.resource_name.size(), // NOLINT broker_id); if (res.ec == std::errc()) { - if (broker_id != config::node().node_id()) { + if (broker_id != *config::node().node_id()) { result.error_code = error_code::invalid_request; result.error_message = ssx::sformat( "Unexpected broker id {} expected {}", broker_id, - config::node().node_id()); + *config::node().node_id()); return; } } else { diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index f6dc64e04da5..8c0a0e76de22 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -83,7 +83,7 @@ std::optional get_leader_term( const auto previous = md_cache.get_previous_leader_id(tp_ns, p_id); leader_term->leader = previous; - if (previous == config::node().node_id()) { + if (previous == *config::node().node_id()) { auto idx = fast_prng_source() % replicas.size(); leader_term->leader = replicas[idx]; } @@ -333,7 +333,7 @@ guess_peer_listener(request_context& ctx, cluster::broker_ptr broker) { // is not yet consistent with what's in members_table, // because a node configuration update didn't propagate // via raft0 yet - if (broker->id() == config::node().node_id()) { + if (broker->id() == *config::node().node_id()) { return l; } } diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index 1d202a09dc0c..48a2d9fc178c 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -437,7 +437,7 @@ ss::future admin_server::redirect_to_leader( ss::httpd::reply::status_type::service_unavailable); } - if (leader_id_opt.value() == config::node().node_id()) { + if (leader_id_opt.value() == *config::node().node_id()) { vlog( logger.info, "Can't redirect to leader from leader node ({})", @@ -558,7 +558,7 @@ bool need_redirect_to_leader( ss::httpd::reply::status_type::service_unavailable); } - return leader_id_opt.value() != config::node().node_id(); + return leader_id_opt.value() != *config::node().node_id(); } model::node_id parse_broker_id(const ss::httpd::request& req) { From 8252b80b9102d47eb19f4949e65bcb5f2e27be06 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Thu, 6 Oct 2022 01:09:09 -0700 Subject: [PATCH 10/17] cluster: auto-assign node ID when empty This commit adds the ability for Redpanda to assign node ID based on the node UUID when a node ID isn't set. --- src/v/cluster/cluster_discovery.cc | 77 +++++++- src/v/cluster/cluster_discovery.h | 61 ++++-- src/v/cluster/controller.cc | 2 + src/v/cluster/controller_stm.h | 3 +- src/v/cluster/members_manager.cc | 286 +++++++++++++++++++++-------- src/v/cluster/members_manager.h | 52 +++++- src/v/redpanda/application.cc | 7 + 7 files changed, 398 insertions(+), 90 deletions(-) diff --git a/src/v/cluster/cluster_discovery.cc b/src/v/cluster/cluster_discovery.cc index 06811dac0b00..ef136c387972 100644 --- a/src/v/cluster/cluster_discovery.cc +++ b/src/v/cluster/cluster_discovery.cc @@ -10,6 +10,7 @@ #include "cluster/cluster_discovery.h" #include "cluster/cluster_utils.h" +#include "cluster/controller_service.h" #include "cluster/logger.h" #include "config/node_config.h" #include "model/fundamental.h" @@ -26,9 +27,28 @@ namespace cluster { cluster_discovery::cluster_discovery(const model::node_uuid& node_uuid) : _node_uuid(node_uuid) {} + , _join_retry_jitter(config::shard_local_cfg().join_retry_timeout_ms()) + , _join_timeout(std::chrono::seconds(2)) {} ss::future cluster_discovery::determine_node_id() { - co_return *config::node().node_id(); + // TODO: read from disk if empty. + const auto& configured_node_id = config::node().node_id(); + if (configured_node_id != std::nullopt) { + clusterlog.info("Using configured node ID {}", configured_node_id); + co_return *configured_node_id; + } + // TODO: once is_cluster_founder() refers to all seeds, verify that all the + // seeds' seed_servers lists match and assign node IDs based on the + // ordering. + if (is_cluster_founder()) { + // If this is the root node, assign node 0. + co_return model::node_id(0); + } + model::node_id assigned_node_id; + co_await ss::repeat([this, &assigned_node_id] { + return dispatch_node_uuid_registration_to_seeds(assigned_node_id); + }); + co_return assigned_node_id; } vector cluster_discovery::initial_raft0_brokers() const { @@ -41,6 +61,61 @@ vector cluster_discovery::initial_raft0_brokers() const { return {}; } +ss::future +cluster_discovery::dispatch_node_uuid_registration_to_seeds( + model::node_id& assigned_node_id) { + const auto& seed_servers = config::node().seed_servers(); + auto self = make_self_broker(config::node()); + for (const auto& s : seed_servers) { + vlog( + clusterlog.info, + "Requesting node ID for UUID {} from {}", + _node_uuid, + s.addr); + result r(join_node_reply{}); + try { + r = co_await do_with_client_one_shot( + s.addr, + config::node().rpc_server_tls(), + _join_timeout, + [&self, this](controller_client_protocol c) { + return c + .join_node( + join_node_request( + features::feature_table::get_latest_logical_version(), + _node_uuid().to_vector(), + self), + rpc::client_opts(rpc::clock_type::now() + _join_timeout)) + .then(&rpc::get_ctx_data); + }); + } catch (...) { + vlog( + clusterlog.debug, + "Error registering UUID {}, retrying: {}", + _node_uuid, + std::current_exception()); + continue; + } + if (!r || r.has_error() || !r.value().success) { + vlog( + clusterlog.debug, + "Error registering UUID {}, retrying", + _node_uuid); + continue; + } + const auto& reply = r.value(); + if (reply.id < 0) { + // Something else went wrong. Maybe duplicate UUID? + vlog(clusterlog.debug, "Negative node ID {}", reply.id); + continue; + } + assigned_node_id = reply.id; + co_return ss::stop_iteration::yes; + } + co_await ss::sleep_abortable(_join_retry_jitter.next_duration(), _as); + co_return ss::stop_iteration::no; +} + bool cluster_discovery::is_cluster_founder() const { return config::node().seed_servers().empty(); } diff --git a/src/v/cluster/cluster_discovery.h b/src/v/cluster/cluster_discovery.h index ab1b8a1be5ce..1e88b626b79a 100644 --- a/src/v/cluster/cluster_discovery.h +++ b/src/v/cluster/cluster_discovery.h @@ -10,6 +10,8 @@ #include "model/fundamental.h" #include "model/metadata.h" +#include "model/timeout_clock.h" +#include "random/simple_time_jitter.h" #include "seastarx.h" #include @@ -19,24 +21,50 @@ namespace cluster { -// Provides metadata pertaining to initial cluster discovery. +// Provides metadata pertaining to initial cluster discovery. It is the +// entrypoint into the steps to join a cluster. +// +// Node ID assignment and joining a cluster +// ======================================== +// When a node starts up, before it can initialize most of its subsystems, it +// must be made aware of its node ID. It can either get this from its config, +// the kv-store, or be assigned one by the controller leader. In all cases, the +// node ID and node UUID must be registered with the controller, after which +// the node can proceed join the cluster. +// +// The high level steps are as follows: +// +// When the node ID is unknown: +// 1. Generate or load node UUID +// 2. Get assigned a node ID by sending a request to the controller leader +// 3. Start subsystems with our known node ID +// 4. Join the cluster and get added to the controller Raft group by sending a +// request to the controller leader +// 5. Once added to the cluster, open endpoints for user traffic +// +// When the node ID is known: +// 1. Generate or load node UUID +// 2. Load node ID from config or kv-store +// 3. Start subsystems with our known node ID +// 4. Register our UUID with our node ID and join the cluster by sending a +// request to the controller leader +// 5. Once added to the cluster, open endpoints for user traffic +// +// These steps are implemented here, in redpanda/application.cc, and in +// cluster/members_manager.cc +// +// TODO: reconcile the RPC dispatch logic here with that in members_manager. class cluster_discovery { public: explicit cluster_discovery(const model::node_uuid& node_uuid); - // Returns this node's node ID. - // - // TODO: implement the below behavior. - // // Determines what the node ID for this node should be. Once called, we can - // proceed with initializing anything that depends on node ID. + // proceed with initializing anything that depends on node ID (Raft + // subsystem, etc). // - // On an empty seed server, this sends requests to all other seed servers - // to determine if there is a valid assigned of node IDs for the seeds. - // - // On a non-seed server with no node ID specified via config, this sends a - // request to the controllers to register this node's UUID and assign it a - // node ID. + // On a non-seed server with no node ID specified via config or on disk, + // this sends a request to the controllers to register this node's UUID and + // assign it a node ID. ss::future determine_node_id(); // If configured as the root, return this broker as the sole initial Raft0 @@ -63,7 +91,16 @@ class cluster_discovery { // the seed servers. bool is_cluster_founder() const; + // Sends requests to each seed server to register the local node UUID until + // one succeeds. Upon success, sets `node_id` to the assigned node ID and + // returns stop_iteration::yes. + ss::future + dispatch_node_uuid_registration_to_seeds(model::node_id&); + const model::node_uuid _node_uuid; + simple_time_jitter _join_retry_jitter; + const std::chrono::milliseconds _join_timeout; + }; } // namespace cluster diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index ef053e45dc4f..d5805e87899a 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -112,6 +112,8 @@ controller::start(std::vector initial_raft0_brokers) { .then([this] { return _members_manager.start_single( _raft0, + std::ref(_stm), + std::ref(_feature_table), std::ref(_members_table), std::ref(_connections), std::ref(_partition_allocator), diff --git a/src/v/cluster/controller_stm.h b/src/v/cluster/controller_stm.h index c070b86089a1..6a780898ab3d 100644 --- a/src/v/cluster/controller_stm.h +++ b/src/v/cluster/controller_stm.h @@ -14,13 +14,14 @@ #include "cluster/config_manager.h" #include "cluster/data_policy_manager.h" #include "cluster/feature_backend.h" -#include "cluster/members_manager.h" #include "cluster/security_manager.h" #include "cluster/topic_updates_dispatcher.h" #include "raft/mux_state_machine.h" namespace cluster { +class members_manager; + // single instance using controller_stm = raft::mux_state_machine< topic_updates_dispatcher, diff --git a/src/v/cluster/members_manager.cc b/src/v/cluster/members_manager.cc index e448e16f6538..7cd2755b60d3 100644 --- a/src/v/cluster/members_manager.cc +++ b/src/v/cluster/members_manager.cc @@ -12,6 +12,7 @@ #include "cluster/cluster_utils.h" #include "cluster/commands.h" #include "cluster/controller_service.h" +#include "cluster/controller_stm.h" #include "cluster/drain_manager.h" #include "cluster/fwd.h" #include "cluster/logger.h" @@ -44,6 +45,8 @@ namespace cluster { members_manager::members_manager( consensus_ptr raft0, + ss::sharded& controller_stm, + ss::sharded& feature_table, ss::sharded& members_table, ss::sharded& connections, ss::sharded& allocator, @@ -55,6 +58,8 @@ members_manager::members_manager( , _join_retry_jitter(config::shard_local_cfg().join_retry_timeout_ms()) , _join_timeout(std::chrono::seconds(2)) , _raft0(raft0) + , _controller_stm(controller_stm) + , _feature_table(feature_table) , _members_table(members_table) , _connection_cache(connections) , _allocator(allocator) @@ -278,6 +283,10 @@ members_manager::apply_update(model::record_batch b) { [this](register_node_uuid_cmd cmd) { const auto& node_uuid = cmd.key; const auto& requested_node_id = cmd.value; + vlog( + clusterlog.info, + "Applying register_node_uuid_cmd for UUID {}", + node_uuid); if (requested_node_id) { if (likely(try_register_node_id(*requested_node_id, node_uuid))) { return ss::make_ready_future(errc::success); @@ -332,6 +341,14 @@ members_manager::get_node_updates() { return ss::make_ready_future>(std::move(ret)); } +model::node_id members_manager::get_node_id(const model::node_uuid& node_uuid) { + const auto it = _id_by_uuid.find(node_uuid); + vassert( + it != _id_by_uuid.end(), + "Node registration must be completed before calling"); + return it->second; +} + template ss::future members_manager::dispatch_updates_to_cores( model::offset update_offset, Cmd cmd) { @@ -423,6 +440,7 @@ void members_manager::join_raft0() { std::move(join_node_request{ features::feature_table:: get_latest_logical_version(), + _storage.local().node_uuid()().to_vector(), _self})) .then([this](result r) { bool success = r && r.value().success; @@ -455,6 +473,11 @@ bool members_manager::try_register_node_id( const model::node_id& requested_node_id, const model::node_uuid& requested_node_uuid) { vassert(requested_node_id != model::node_id(-1), "invalid node ID"); + vlog( + clusterlog.info, + "Registering node ID {} as node UUID {}", + requested_node_id, + requested_node_uuid); const auto it = _id_by_uuid.find(requested_node_uuid); if (it == _id_by_uuid.end()) { if (_members_table.local().contains(requested_node_id)) { @@ -464,8 +487,8 @@ bool members_manager::try_register_node_id( clusterlog.info( "registering node ID that is already a member of the cluster"); } - // This is a brand new node with node ID assignment support, requesting - // the given node ID. + // This is a brand new node with node ID assignment support that's + // requesting the given node ID. _id_by_uuid.emplace(requested_node_uuid, requested_node_id); return true; } @@ -561,92 +584,211 @@ auto members_manager::dispatch_rpc_to_leader( std::forward(f)); } +ss::future> members_manager::replicate_new_node_uuid( + const model::node_uuid& node_uuid, + const std::optional& node_id) { + using ret_t = result; + ss::sstring node_id_str = node_id ? ssx::sformat("node ID {}", *node_id) + : "no node ID"; + vlog( + clusterlog.debug, + "Replicating registration of UUID {} with {}", + node_uuid, + node_id_str); + // Otherwise, replicate a request to register the UUID. + auto errc = co_await replicate_and_wait( + _controller_stm, + _feature_table, + _as, + register_node_uuid_cmd(node_uuid, node_id), + model::timeout_clock::now() + 30s); + vlog( + clusterlog.debug, + "Registration replication completed for UUID '{}': {}", + node_uuid, + errc); + if (errc != errc::success) { + co_return ret_t(join_node_reply{false, model::node_id(-1)}); + } + const auto assigned_node_id = get_node_id(node_uuid); + if (node_id && assigned_node_id != *node_id) { + vlog( + clusterlog.warn, + "Node registration for UUID {} as {} completed but already already " + "assigned as {}", + node_uuid, + *node_id, + assigned_node_id); + co_return ret_t(join_node_reply{false, model::node_id(-1)}); + } + + // On success, return the node ID. + co_return ret_t(join_node_reply{true, get_node_id(node_uuid)}); +} + ss::future> members_manager::handle_join_request(join_node_request const req) { using ret_t = result; + + bool node_id_assignment_supported = _feature_table.local().is_active( + features::feature::node_id_assignment); + bool req_has_node_uuid = !req.node_uuid.empty(); + if (node_id_assignment_supported && !req_has_node_uuid) { + vlog( + clusterlog.warn, + "Invalid join request for node ID {}, node UUID is required", + req.node.id()); + co_return errc::invalid_request; + } + std::optional req_node_id = std::nullopt; + if (req.node.id() >= 0) { + req_node_id = req.node.id(); + } + if (!node_id_assignment_supported && !req_node_id) { + vlog( + clusterlog.warn, + "Got request to assign node ID, but feature not active", + req.node.id()); + co_return errc::invalid_request; + } + if ( + req_has_node_uuid + && req.node_uuid.size() != model::node_uuid::type::length) { + vlog( + clusterlog.warn, + "Invalid join request, expected UUID or empty; got {}-byte value", + req.node_uuid.size()); + co_return errc::invalid_request; + } + model::node_uuid node_uuid; + if (!req_node_id && !req_has_node_uuid) { + vlog(clusterlog.warn, "Node ID assignment attempt had no node UUID"); + co_return errc::invalid_request; + } + + ss::sstring node_uuid_str = "no node_uuid"; + if (req_has_node_uuid) { + node_uuid = model::node_uuid::from(req.node_uuid); + node_uuid_str = ssx::sformat("{}", node_uuid); + } vlog( clusterlog.info, - "Processing node '{}' join request (version {})", + "Processing node '{} ({})' join request (version {})", req.node.id(), + node_uuid_str, req.logical_version); - // curent node is a leader - if (_raft0->is_elected_leader()) { - // if configuration contains the broker already just update its config - // with data from join request - - if (_raft0->config().contains_broker(req.node.id())) { - vlog( - clusterlog.info, - "Broker {} is already member of a cluster, updating " - "configuration", - req.node.id()); - auto node_id = req.node.id(); - auto update_req = configuration_update_request( - req.node, _self.id()); - co_return co_await handle_configuration_update_request( - std::move(update_req)) - .then([node_id](result r) { - if (r) { - auto success = r.value().success; - return ret_t(join_node_reply{ - success, success ? node_id : model::node_id{-1}}); - } - return ret_t(r.error()); - }); - } + if (!_raft0->is_elected_leader()) { + vlog(clusterlog.debug, "Not the leader; dispatching to leader node"); + // Current node is not the leader have to send an RPC to leader + // controller + co_return co_await dispatch_rpc_to_leader( + _join_timeout, + [req, tout = rpc::clock_type::now() + _join_timeout]( + controller_client_protocol c) mutable { + return c.join_node(join_node_request(req), rpc::client_opts(tout)) + .then(&rpc::get_ctx_data); + }) + .handle_exception([](const std::exception_ptr& e) { + vlog( + clusterlog.warn, + "Error while dispatching join request to leader node - {}", + e); + return ss::make_ready_future( + errc::join_request_dispatch_error); + }); + } - if (_raft0->config().contains_address(req.node.rpc_address())) { - vlog( - clusterlog.info, - "Broker {} address ({}) conflicts with the address of another " - "node", - req.node.id(), - req.node.rpc_address()); - co_return ret_t(join_node_reply{false, model::node_id(-1)}); + if (likely(node_id_assignment_supported && req_has_node_uuid)) { + const auto it = _id_by_uuid.find(node_uuid); + if (!req_node_id) { + if (it == _id_by_uuid.end()) { + // The UUID isn't yet in our table. Register it, but return, + // expecting the node to come back with another join request + // once its Raft subsystems are up. + co_return co_await replicate_new_node_uuid( + node_uuid, req_node_id); + } + // The requested UUID already exists; this is a duplicate request + // to assign a node ID. Just return the registered node ID. + co_return ret_t(join_node_reply{true, it->second}); } - if (req.node.id() != _self.id()) { - co_await update_broker_client( - _self.id(), - _connection_cache, - req.node.id(), - req.node.rpc_address(), - _rpc_tls_config); + // We've been passed a node ID. The caller expects to be added to the + // Raft group by the end of this function. + if (it == _id_by_uuid.end()) { + // The node ID was manually provided and this is a new attempt to + // register the UUID. + auto r = co_await replicate_new_node_uuid(node_uuid, req_node_id); + if (r.has_error() || !r.value().success) { + co_return r; + } + } else { + // Validate that the node ID matches the one in our table. + if (*req_node_id != it->second) { + co_return ret_t(join_node_reply{false, model::node_id(-1)}); + } } - // Just update raft0 configuration - // we do not use revisions in raft0 configuration, it is always revision - // 0 which is perfectly fine. this will work like revision less raft - // protocol. - co_return co_await _raft0 - ->add_group_members({req.node}, model::revision_id(0)) - .then([broker = req.node](std::error_code ec) { - if (!ec) { - return ret_t(join_node_reply{true, broker.id()}); + // Proceed to adding the node ID to the controller Raft group. + // Presumably the node that made this join request started its Raft + // subsystem with the node ID and is waiting to join the group. + } + + // if configuration contains the broker already just update its config + // with data from join request + + if (_raft0->config().contains_broker(req.node.id())) { + vlog( + clusterlog.info, + "Broker {} is already member of a cluster, updating " + "configuration", + req.node.id()); + auto node_id = req.node.id(); + auto update_req = configuration_update_request(req.node, _self.id()); + co_return co_await handle_configuration_update_request( + std::move(update_req)) + .then([node_id](result r) { + if (r) { + auto success = r.value().success; + return ret_t(join_node_reply{ + success, success ? node_id : model::node_id{-1}}); } - vlog( - clusterlog.warn, - "Error adding node {} to cluster - {}", - broker, - ec.message()); - return ret_t(ec); + return ret_t(r.error()); }); } - // Current node is not the leader have to send an RPC to leader - // controller - co_return co_await dispatch_rpc_to_leader( - _join_timeout, - [req, tout = rpc::clock_type::now() + _join_timeout]( - controller_client_protocol c) mutable { - return c.join_node(join_node_request(req), rpc::client_opts(tout)) - .then(&rpc::get_ctx_data); - }) - .handle_exception([](const std::exception_ptr& e) { + + if (_raft0->config().contains_address(req.node.rpc_address())) { + vlog( + clusterlog.info, + "Broker {} address ({}) conflicts with the address of another " + "node", + req.node.id(), + req.node.rpc_address()); + co_return ret_t(join_node_reply{false, model::node_id(-1)}); + } + if (req.node.id() != _self.id()) { + co_await update_broker_client( + _self.id(), + _connection_cache, + req.node.id(), + req.node.rpc_address(), + _rpc_tls_config); + } + // Just update raft0 configuration + // we do not use revisions in raft0 configuration, it is always revision + // 0 which is perfectly fine. this will work like revision less raft + // protocol. + co_return co_await _raft0 + ->add_group_members({req.node}, model::revision_id(0)) + .then([broker = req.node](std::error_code ec) { + if (!ec) { + return ret_t(join_node_reply{true, broker.id()}); + } vlog( clusterlog.warn, - "Error while dispatching join request to leader node - {}", - e); - return ss::make_ready_future( - errc::join_request_dispatch_error); + "Error adding node {} to cluster - {}", + broker, + ec.message()); + return ret_t(ec); }); } diff --git a/src/v/cluster/members_manager.h b/src/v/cluster/members_manager.h index c4b7d137bf67..06a6529aa960 100644 --- a/src/v/cluster/members_manager.h +++ b/src/v/cluster/members_manager.h @@ -12,6 +12,7 @@ #pragma once #include "cluster/commands.h" +#include "cluster/controller_stm.h" #include "cluster/fwd.h" #include "cluster/types.h" #include "config/seed_server.h" @@ -24,12 +25,18 @@ #include "rpc/fwd.h" #include "storage/fwd.h" +namespace features { +class feature_table; +} // namespace features + namespace cluster { // Implementation of a raft::mux_state_machine that is responsible for // updating information about cluster members, joining the cluster, updating // member states, and creating intra-cluster connections. // +// Node state updates +// ================== // This class receives updates from members_frontend by way of a Raft record // batch being committed. In addition to various controller command batch // types, it reacts to Raft configuration batch types, e.g. when a new node is @@ -39,13 +46,32 @@ namespace cluster { // instances. There is only one instance of members_manager running on // core-0. The members_manager is also responsible for validation of node // configuration invariants. +// +// Node joining and node ID assignment +// =================================== +// This class may be called directly by the frontend when nodes request to +// register with or join the cluster, rather than responding to a Raft record +// batch. Joining a cluster is a two step process, both driven by +// join_node_requests: +// +// Registration: a node's UUID hasn't yet been registered with a node ID. A +// node ID is either provided by the caller or assigned in the apply phase +// (serially, so it is deterministic across nodes). In both cases, a +// controller batch is written to record the existence of the node UUID. +// +// Joining: the node's UUID has already been registered with a node ID. The +// node can be added to the controller Raft group. +// +// Node UUID registrations are tracked in an internal map. Cluster membership +// (completed joins) modifies the controller Raft group directly. class members_manager { public: static constexpr auto accepted_commands = make_commands_list< decommission_node_cmd, recommission_node_cmd, finish_reallocations_cmd, - maintenance_mode_cmd>{}; + maintenance_mode_cmd, + register_node_uuid_cmd>{}; static constexpr ss::shard_id shard = 0; static constexpr size_t max_updates_queue_size = 100; @@ -85,6 +111,8 @@ class members_manager { members_manager( consensus_ptr, + ss::sharded&, + ss::sharded&, ss::sharded&, ss::sharded&, ss::sharded&, @@ -114,9 +142,13 @@ class members_manager { // the queue are aborted separately. ss::future<> stop(); - // Adds a node to the controller Raft group, dispatching to the leader if - // necessary. If the node already exists, just updates the node config - // instead. + // If the given request contains a node ID, adds a node to the controller + // Raft group, dispatching to the leader if necessary. If the node already + // exists, just updates the node config instead. + // + // If no node ID is provided (indicated by a negative value), replicates a + // controller command to register the requested node UUID, responding with + // a newly assigned node ID. ss::future> handle_join_request(join_node_request const r); @@ -140,6 +172,10 @@ class members_manager { // concurrently from multiple fibers. ss::future> get_node_updates(); + // Returns the node ID for the given node UUID. Callers must have a + // guarantee that the UUID has already been registered before calling. + model::node_id get_node_id(const model::node_uuid&); + private: using uuid_map_t = absl::flat_hash_map; using seed_iterator = std::vector::const_iterator; @@ -149,6 +185,10 @@ class members_manager { ss::future<> initialize_broker_connection(const model::broker&); + ss::future> replicate_new_node_uuid( + const model::node_uuid&, + const std::optional& = std::nullopt); + // Returns the node ID for a given node UUID, assigning one if one does not // already exist. Returns nullopt when there are no more node IDs left. std::optional @@ -190,7 +230,11 @@ class members_manager { simple_time_jitter _join_retry_jitter; const std::chrono::milliseconds _join_timeout; const consensus_ptr _raft0; + + ss::sharded& _controller_stm; + ss::sharded& _feature_table; ss::sharded& _members_table; + ss::sharded& _connection_cache; // Partition allocator to update when receiving node lifecycle commands. diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index c92176e04101..57caab2955e7 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -23,6 +23,7 @@ #include "cluster/fwd.h" #include "cluster/id_allocator.h" #include "cluster/id_allocator_frontend.h" +#include "cluster/members_manager.h" #include "cluster/members_table.h" #include "cluster/metadata_dissemination_handler.h" #include "cluster/metadata_dissemination_service.h" @@ -1392,6 +1393,12 @@ void application::wire_up_and_start(::stop_signal& app_signal, bool test_mode) { const auto& node_uuid = storage.local().node_uuid(); cluster::cluster_discovery cd(node_uuid); auto node_id = cd.determine_node_id().get(); + if (config::node().node_id() == std::nullopt) { + // If we previously didn't have a node ID, set it in the config. We + // will persist it in the kvstore when the controller starts up. + config::node().node_id.set_value( + std::make_optional(node_id)); + } vlog(_log.info, "Starting Redpanda with node_id {}", node_id); From 380254602638ec2354c412502edd3a68c70c8044 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Thu, 6 Oct 2022 01:18:07 -0700 Subject: [PATCH 11/17] tests: add option to not configure node ID --- src/v/cluster/tests/cluster_test_fixture.h | 36 ++++++++++++++-------- src/v/redpanda/tests/fixture.h | 23 ++++++++++---- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/src/v/cluster/tests/cluster_test_fixture.h b/src/v/cluster/tests/cluster_test_fixture.h index 7c0fd22a7174..9b849fc5908c 100644 --- a/src/v/cluster/tests/cluster_test_fixture.h +++ b/src/v/cluster/tests/cluster_test_fixture.h @@ -72,7 +72,8 @@ class cluster_test_fixture { int16_t proxy_port, int16_t schema_reg_port, int16_t coproc_supervisor_port, - std::vector seeds) { + std::vector seeds, + configure_node_id use_node_id = configure_node_id::yes) { _instances.emplace( node_id, std::make_unique( @@ -85,7 +86,8 @@ class cluster_test_fixture { seeds, ssx::sformat("{}.{}", _base_dir, node_id()), _sgroups, - false)); + false, + use_node_id)); } application* get_node_application(model::node_id id) { @@ -107,11 +109,12 @@ class cluster_test_fixture { application* create_node_application( model::node_id node_id, - int kafka_port = 9092, - int rpc_port = 11000, - int proxy_port = 8082, - int schema_reg_port = 8081, - int coproc_supervisor_port = 43189) { + int kafka_port_base = 9092, + int rpc_port_base = 11000, + int proxy_port_base = 8082, + int schema_reg_port_base = 8081, + int coproc_supervisor_port_base = 43189, + configure_node_id use_node_id = configure_node_id::yes) { std::vector seeds = {}; if (node_id != 0) { seeds.push_back( @@ -119,15 +122,22 @@ class cluster_test_fixture { } add_node( node_id, - kafka_port + node_id(), - rpc_port + node_id(), - proxy_port + node_id(), - schema_reg_port + node_id(), - coproc_supervisor_port + node_id(), - std::move(seeds)); + kafka_port_base + node_id(), + rpc_port_base + node_id(), + proxy_port_base + node_id(), + schema_reg_port_base + node_id(), + coproc_supervisor_port_base + node_id(), + std::move(seeds), + use_node_id); return get_node_application(node_id); } + application* create_node_application( + model::node_id node_id, configure_node_id use_node_id) { + return create_node_application( + node_id, 9092, 11000, 8082, 8081, 43189, use_node_id); + } + void remove_node_application(model::node_id node_id) { _instances.erase(node_id); } diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index 94d79fef1d30..6d8d3d33d10d 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -48,6 +48,11 @@ #include +// Whether or not the fixtures should be configured with a node ID. +// NOTE: several fixtures may still require a node ID be supplied for the sake +// of differentiating ports, data directories, loggers, etc. +using configure_node_id = ss::bool_class; + class redpanda_thread_fixture { public: static constexpr const char* rack_name = "i-am-rack"; @@ -62,7 +67,8 @@ class redpanda_thread_fixture { std::vector seed_servers, ss::sstring base_dir, std::optional sch_groups, - bool remove_on_shutdown) + bool remove_on_shutdown, + configure_node_id use_node_id = configure_node_id::yes) : app(ssx::sformat("redpanda-{}", node_id())) , proxy_port(proxy_port) , schema_reg_port(schema_reg_port) @@ -74,7 +80,8 @@ class redpanda_thread_fixture { kafka_port, rpc_port, coproc_supervisor_port, - std::move(seed_servers)); + std::move(seed_servers), + use_node_id); app.initialize( proxy_config(proxy_port), proxy_client_config(kafka_port), @@ -156,14 +163,16 @@ class redpanda_thread_fixture { int32_t kafka_port, int32_t rpc_port, int32_t coproc_supervisor_port, - std::vector seed_servers) { + std::vector seed_servers, + configure_node_id use_node_id) { auto base_path = std::filesystem::path(data_dir); ss::smp::invoke_on_all([node_id, kafka_port, rpc_port, coproc_supervisor_port, seed_servers = std::move(seed_servers), - base_path]() mutable { + base_path, + use_node_id]() mutable { auto& config = config::shard_local_cfg(); config.get("enable_pid_file").set_value(false); @@ -176,9 +185,11 @@ class redpanda_thread_fixture { node_config.get("admin").set_value( std::vector()); node_config.get("developer_mode").set_value(true); - node_config.get("node_id").set_value(node_id); + node_config.get("node_id").set_value( + use_node_id ? std::make_optional(node_id) + : std::optional(std::nullopt)); node_config.get("rack").set_value( - std::optional(model::rack_id(rack_name))); + std::make_optional(model::rack_id(rack_name))); node_config.get("seed_servers").set_value(seed_servers); node_config.get("rpc_server") .set_value(net::unresolved_address("127.0.0.1", rpc_port)); From eacafa9de4b8ea760b195cdf1e5ed526207724d5 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Thu, 6 Oct 2022 01:07:36 -0700 Subject: [PATCH 12/17] cluster: unit tests for node ID assignment --- src/v/cluster/tests/cluster_tests.cc | 43 ++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/src/v/cluster/tests/cluster_tests.cc b/src/v/cluster/tests/cluster_tests.cc index 731f2796da2c..4ae7f113f970 100644 --- a/src/v/cluster/tests/cluster_tests.cc +++ b/src/v/cluster/tests/cluster_tests.cc @@ -45,3 +45,46 @@ FIXTURE_TEST(test_three_node_cluster, cluster_test_fixture) { wait_for_all_members(3s).get(); } + +FIXTURE_TEST(test_auto_assign_node_id, cluster_test_fixture) { + create_node_application(model::node_id{0}, configure_node_id::no); + BOOST_REQUIRE_EQUAL(0, *config::node().node_id()); + + create_node_application(model::node_id{1}, configure_node_id::no); + BOOST_REQUIRE_EQUAL(1, *config::node().node_id()); + + create_node_application(model::node_id{2}, configure_node_id::no); + BOOST_REQUIRE_EQUAL(2, *config::node().node_id()); + + wait_for_all_members(3s).get(); +} + +FIXTURE_TEST(test_auto_assign_non_seeds, cluster_test_fixture) { + create_node_application(model::node_id{0}); + BOOST_REQUIRE_EQUAL(0, *config::node().node_id()); + + create_node_application(model::node_id{1}, configure_node_id::no); + BOOST_REQUIRE_EQUAL(1, *config::node().node_id()); + + create_node_application(model::node_id{2}, configure_node_id::no); + BOOST_REQUIRE_EQUAL(2, *config::node().node_id()); + + wait_for_all_members(3s).get(); +} + +FIXTURE_TEST(test_auto_assign_with_explicit_node_id, cluster_test_fixture) { + create_node_application(model::node_id{0}); + BOOST_REQUIRE_EQUAL(0, *config::node().node_id()); + + // Explicitly assign node ID 2. Node ID assignment should assign around it. + create_node_application(model::node_id{2}); + BOOST_REQUIRE_EQUAL(2, *config::node().node_id()); + + create_node_application(model::node_id{1}, configure_node_id::no); + BOOST_REQUIRE_EQUAL(1, *config::node().node_id()); + + create_node_application(model::node_id{3}, configure_node_id::no); + BOOST_REQUIRE_EQUAL(3, *config::node().node_id()); + + wait_for_all_members(3s).get(); +} From 09f40d9fa1ee605b273f815e584b7fe2070c422c Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Thu, 6 Oct 2022 08:20:04 -0700 Subject: [PATCH 13/17] cluster_discovery: read node ID from kvstore When the controller starts up, we check to ensure the configured node ID is identical to that in the kvstore if one exists, persisting one if not. We should use this node ID at startup if weren't configured with one, e.g. if we're upgrading an existing cluster and begin omitting node ID from the node config. --- src/v/cluster/cluster_discovery.cc | 16 ++++++++++++++-- src/v/cluster/cluster_discovery.h | 8 +++++++- src/v/redpanda/application.cc | 2 +- 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/v/cluster/cluster_discovery.cc b/src/v/cluster/cluster_discovery.cc index ef136c387972..783295951146 100644 --- a/src/v/cluster/cluster_discovery.cc +++ b/src/v/cluster/cluster_discovery.cc @@ -16,6 +16,7 @@ #include "model/fundamental.h" #include "model/metadata.h" #include "seastarx.h" +#include "storage/kvstore.h" #include @@ -25,10 +26,12 @@ using std::vector; namespace cluster { -cluster_discovery::cluster_discovery(const model::node_uuid& node_uuid) - : _node_uuid(node_uuid) {} +cluster_discovery::cluster_discovery( + const model::node_uuid& node_uuid, storage::kvstore& kvstore) + : _node_uuid(node_uuid) , _join_retry_jitter(config::shard_local_cfg().join_retry_timeout_ms()) , _join_timeout(std::chrono::seconds(2)) {} + , _kvstore(kvstore) {} ss::future cluster_discovery::determine_node_id() { // TODO: read from disk if empty. @@ -37,6 +40,15 @@ ss::future cluster_discovery::determine_node_id() { clusterlog.info("Using configured node ID {}", configured_node_id); co_return *configured_node_id; } + static const bytes invariants_key("configuration_invariants"); + auto invariants_buf = _kvstore.get( + storage::kvstore::key_space::controller, invariants_key); + + if (invariants_buf) { + auto invariants = reflection::from_iobuf( + std::move(*invariants_buf)); + co_return invariants.node_id; + } // TODO: once is_cluster_founder() refers to all seeds, verify that all the // seeds' seed_servers lists match and assign node IDs based on the // ordering. diff --git a/src/v/cluster/cluster_discovery.h b/src/v/cluster/cluster_discovery.h index 1e88b626b79a..e9ebc18f8f81 100644 --- a/src/v/cluster/cluster_discovery.h +++ b/src/v/cluster/cluster_discovery.h @@ -19,6 +19,10 @@ #include #include +namespace storage { +class kvstore; +} // namespace storage + namespace cluster { // Provides metadata pertaining to initial cluster discovery. It is the @@ -56,7 +60,8 @@ namespace cluster { // TODO: reconcile the RPC dispatch logic here with that in members_manager. class cluster_discovery { public: - explicit cluster_discovery(const model::node_uuid& node_uuid); + cluster_discovery( + const model::node_uuid& node_uuid, storage::kvstore& kvstore); // Determines what the node ID for this node should be. Once called, we can // proceed with initializing anything that depends on node ID (Raft @@ -101,6 +106,7 @@ class cluster_discovery { simple_time_jitter _join_retry_jitter; const std::chrono::milliseconds _join_timeout; + storage::kvstore& _kvstore; }; } // namespace cluster diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 57caab2955e7..0f97dbba6892 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1391,7 +1391,7 @@ void application::wire_up_and_start(::stop_signal& app_signal, bool test_mode) { // ID. A valid node ID is required before we can initialize the rest of our // subsystems. const auto& node_uuid = storage.local().node_uuid(); - cluster::cluster_discovery cd(node_uuid); + cluster::cluster_discovery cd(node_uuid, storage.local().kvs()); auto node_id = cd.determine_node_id().get(); if (config::node().node_id() == std::nullopt) { // If we previously didn't have a node ID, set it in the config. We From 30489d934c483ff084c1ee934ec4f520fd532198 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Thu, 6 Oct 2022 09:07:04 -0700 Subject: [PATCH 14/17] cluster_discovery: add an abort source --- src/v/cluster/cluster_discovery.cc | 9 ++++++--- src/v/cluster/cluster_discovery.h | 5 ++++- src/v/redpanda/application.cc | 3 ++- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/v/cluster/cluster_discovery.cc b/src/v/cluster/cluster_discovery.cc index 783295951146..8747d9ea6b17 100644 --- a/src/v/cluster/cluster_discovery.cc +++ b/src/v/cluster/cluster_discovery.cc @@ -27,11 +27,14 @@ using std::vector; namespace cluster { cluster_discovery::cluster_discovery( - const model::node_uuid& node_uuid, storage::kvstore& kvstore) + const model::node_uuid& node_uuid, + storage::kvstore& kvstore, + ss::abort_source& as) : _node_uuid(node_uuid) , _join_retry_jitter(config::shard_local_cfg().join_retry_timeout_ms()) - , _join_timeout(std::chrono::seconds(2)) {} - , _kvstore(kvstore) {} + , _join_timeout(std::chrono::seconds(2)) + , _kvstore(kvstore) + , _as(as) {} ss::future cluster_discovery::determine_node_id() { // TODO: read from disk if empty. diff --git a/src/v/cluster/cluster_discovery.h b/src/v/cluster/cluster_discovery.h index e9ebc18f8f81..44233adb0c56 100644 --- a/src/v/cluster/cluster_discovery.h +++ b/src/v/cluster/cluster_discovery.h @@ -61,7 +61,9 @@ namespace cluster { class cluster_discovery { public: cluster_discovery( - const model::node_uuid& node_uuid, storage::kvstore& kvstore); + const model::node_uuid& node_uuid, + storage::kvstore& kvstore, + ss::abort_source&); // Determines what the node ID for this node should be. Once called, we can // proceed with initializing anything that depends on node ID (Raft @@ -107,6 +109,7 @@ class cluster_discovery { const std::chrono::milliseconds _join_timeout; storage::kvstore& _kvstore; + ss::abort_source& _as; }; } // namespace cluster diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 0f97dbba6892..c1be3195b226 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1391,7 +1391,8 @@ void application::wire_up_and_start(::stop_signal& app_signal, bool test_mode) { // ID. A valid node ID is required before we can initialize the rest of our // subsystems. const auto& node_uuid = storage.local().node_uuid(); - cluster::cluster_discovery cd(node_uuid, storage.local().kvs()); + cluster::cluster_discovery cd( + node_uuid, storage.local().kvs(), app_signal.abort_source()); auto node_id = cd.determine_node_id().get(); if (config::node().node_id() == std::nullopt) { // If we previously didn't have a node ID, set it in the config. We From c9deb4e828f693fe33976c2b822a67d6eadd0fbd Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Mon, 3 Oct 2022 18:46:06 -0700 Subject: [PATCH 15/17] net: add node_id label to client metrics RPC `client_probe` metrics currently leverage a labeling scheme defined by Seastar in which, for a given metric, a set of metric labels can only be registered once per metric. Currently, the label used is solely based on the server address associated with a given `rpc::transport`. As such, we currently cannot start multiple `rpc::transport`s pointed at the same server. This functionality could be useful though: consider when a node is restarted empty with a new node ID. Redpanda currently has a check that nodes being added to the controller Raft group don't overlap with existing group members' addresses. But if we were to remove this check, when the new node _is_ added to the Raft group, each node will try to create a new `rpc::transport` pointing at the new node, and register metrics with identical labels to those registered by the old node, and be met with a Seastar `double_registration` exception. To enable the above scenario, this commit adds the `node_id` as a label for client metrics, and aggregates them by this label. --- src/v/net/client_probe.h | 2 ++ src/v/net/probes.cc | 53 ++++++++++++++++++++++++--------- src/v/rpc/connection_cache.cc | 2 +- src/v/rpc/reconnect_transport.h | 9 ++++-- src/v/rpc/transport.cc | 11 ++++--- src/v/rpc/transport.h | 8 +++-- 6 files changed, 62 insertions(+), 23 deletions(-) diff --git a/src/v/net/client_probe.h b/src/v/net/client_probe.h index 8994077d3e38..d5fa3f652457 100644 --- a/src/v/net/client_probe.h +++ b/src/v/net/client_probe.h @@ -10,6 +10,7 @@ */ #pragma once +#include "model/metadata.h" #include "net/unresolved_address.h" #include "rpc/logger.h" #include "rpc/types.h" @@ -70,6 +71,7 @@ class client_probe { void setup_metrics( ss::metrics::metric_groups& mgs, const std::optional& label, + const std::optional& node_id, const net::unresolved_address& target_addr); private: diff --git a/src/v/net/probes.cc b/src/v/net/probes.cc index 210a8eb48b08..62a184ebe987 100644 --- a/src/v/net/probes.cc +++ b/src/v/net/probes.cc @@ -148,6 +148,7 @@ std::ostream& operator<<(std::ostream& o, const server_probe& p) { void client_probe::setup_metrics( ss::metrics::metric_groups& mgs, const std::optional& label, + const std::optional& node_id, const net::unresolved_address& target_addr) { namespace sm = ss::metrics; auto target = sm::label("target"); @@ -156,6 +157,16 @@ void client_probe::setup_metrics( if (label) { labels.push_back(sm::label("connection_cache_label")((*label)())); } + std::vector aggregate_labels; + // Label the metrics for a given server with the node ID so Seastar can + // differentiate between them, in case multiple node IDs start at the same + // address (e.g. in an ungraceful decommission). Aggregate on node ID so + // the user is presented metrics for each server regardless of node ID. + if (node_id) { + auto node_id_label = sm::label("node_id"); + labels.push_back(node_id_label(*node_id)); + aggregate_labels.push_back(node_id_label); + } mgs.add_group( prometheus_sanitize::metrics_name("rpc_client"), { @@ -163,73 +174,87 @@ void client_probe::setup_metrics( "active_connections", [this] { return _connections; }, sm::description("Currently active connections"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "connects", [this] { return _connects; }, sm::description("Connection attempts"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "requests", [this] { return _requests; }, sm::description("Number of requests"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_gauge( "requests_pending", [this] { return _requests_pending; }, sm::description("Number of requests pending"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "request_errors", [this] { return _request_errors; }, sm::description("Number or requests errors"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "request_timeouts", [this] { return _request_timeouts; }, sm::description("Number or requests timeouts"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_total_bytes( "in_bytes", [this] { return _out_bytes; }, sm::description("Total number of bytes sent (including headers)"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_total_bytes( "out_bytes", [this] { return _in_bytes; }, sm::description("Total number of bytes received"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "connection_errors", [this] { return _connection_errors; }, sm::description("Number of connection errors"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "read_dispatch_errors", [this] { return _read_dispatch_errors; }, sm::description("Number of errors while dispatching responses"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "corrupted_headers", [this] { return _corrupted_headers; }, sm::description("Number of responses with corrupted headers"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "server_correlation_errors", [this] { return _server_correlation_errors; }, sm::description("Number of responses with wrong correlation id"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "client_correlation_errors", [this] { return _client_correlation_errors; }, sm::description("Number of errors in client correlation id"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "requests_blocked_memory", [this] { return _requests_blocked_memory; }, sm::description("Number of requests that are blocked because" " of insufficient memory"), - labels), + labels) + .aggregate(aggregate_labels), }); } diff --git a/src/v/rpc/connection_cache.cc b/src/v/rpc/connection_cache.cc index e6a3906cffa6..7ec3075860a1 100644 --- a/src/v/rpc/connection_cache.cc +++ b/src/v/rpc/connection_cache.cc @@ -36,7 +36,7 @@ ss::future<> connection_cache::emplace( _cache.emplace( n, ss::make_lw_shared( - std::move(c), std::move(backoff_policy), _label)); + std::move(c), std::move(backoff_policy), _label, n)); }); } ss::future<> connection_cache::remove(model::node_id n) { diff --git a/src/v/rpc/reconnect_transport.h b/src/v/rpc/reconnect_transport.h index 609344b0f26c..8188ec200518 100644 --- a/src/v/rpc/reconnect_transport.h +++ b/src/v/rpc/reconnect_transport.h @@ -11,6 +11,7 @@ #pragma once +#include "model/metadata.h" #include "outcome.h" #include "rpc/backoff_policy.h" #include "rpc/transport.h" @@ -31,11 +32,15 @@ namespace rpc { */ class reconnect_transport { public: + // Instantiates an underlying rpc::transport, using the given node ID (if + // provided) to distinguish client metrics that target the same server and + // to indicate that the client metrics should be aggregated by node ID. explicit reconnect_transport( rpc::transport_configuration c, backoff_policy backoff_policy, - std::optional label = std::nullopt) - : _transport(std::move(c), std::move(label)) + const std::optional& label = std::nullopt, + const std::optional& node_id = std::nullopt) + : _transport(std::move(c), std::move(label), std::move(node_id)) , _backoff_policy(std::move(backoff_policy)) {} bool is_valid() const { return _transport.is_valid(); } diff --git a/src/v/rpc/transport.cc b/src/v/rpc/transport.cc index 300aad4a78af..f3c483d3daba 100644 --- a/src/v/rpc/transport.cc +++ b/src/v/rpc/transport.cc @@ -52,7 +52,9 @@ struct client_context_impl final : streaming_context { }; transport::transport( - transport_configuration c, std::optional label) + transport_configuration c, + std::optional label, + std::optional node_id) : base_transport(base_transport::configuration{ .server_addr = std::move(c.server_addr), .credentials = std::move(c.credentials), @@ -61,7 +63,7 @@ transport::transport( , _version(c.version) , _default_version(c.version) { if (!c.disable_metrics) { - setup_metrics(label); + setup_metrics(label, node_id); } } @@ -289,8 +291,9 @@ ss::future<> transport::dispatch(header h) { } void transport::setup_metrics( - const std::optional& label) { - _probe.setup_metrics(_metrics, label, server_address()); + const std::optional& label, + const std::optional& node_id) { + _probe.setup_metrics(_metrics, label, node_id, server_address()); } transport::~transport() { diff --git a/src/v/rpc/transport.h b/src/v/rpc/transport.h index 7052b3b4dc9f..7f21c8f3a1bf 100644 --- a/src/v/rpc/transport.h +++ b/src/v/rpc/transport.h @@ -11,6 +11,7 @@ #pragma once +#include "model/metadata.h" #include "net/transport.h" #include "outcome.h" #include "reflection/async_adl.h" @@ -56,7 +57,8 @@ class transport final : public net::base_transport { public: explicit transport( transport_configuration c, - std::optional label = std::nullopt); + std::optional label = std::nullopt, + std::optional node_id = std::nullopt); ~transport() override; transport(transport&&) noexcept = default; // semaphore is not move assignable @@ -93,7 +95,9 @@ class transport final : public net::base_transport { ss::future<> do_reads(); ss::future<> dispatch(header); void fail_outstanding_futures() noexcept final; - void setup_metrics(const std::optional&); + void setup_metrics( + const std::optional&, + const std::optional&); ss::future>> do_send(sequence_t, netbuf, rpc::client_opts); From 1a84eb7658325d481b3dec48f1279f8029d48923 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Mon, 3 Oct 2022 23:38:13 -0700 Subject: [PATCH 16/17] cluster: allow node to join if address is used --- src/v/cluster/members_manager.cc | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/v/cluster/members_manager.cc b/src/v/cluster/members_manager.cc index 7cd2755b60d3..36a155bd8a52 100644 --- a/src/v/cluster/members_manager.cc +++ b/src/v/cluster/members_manager.cc @@ -756,15 +756,6 @@ members_manager::handle_join_request(join_node_request const req) { }); } - if (_raft0->config().contains_address(req.node.rpc_address())) { - vlog( - clusterlog.info, - "Broker {} address ({}) conflicts with the address of another " - "node", - req.node.id(), - req.node.rpc_address()); - co_return ret_t(join_node_reply{false, model::node_id(-1)}); - } if (req.node.id() != _self.id()) { co_await update_broker_client( _self.id(), From e80e9eb20bdd519f81fc7ecaac39fff9f351cc58 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Mon, 3 Oct 2022 23:42:02 -0700 Subject: [PATCH 17/17] tests: ducktape test for node ID assignment --- tests/rptest/services/admin.py | 4 +- tests/rptest/services/redpanda.py | 67 +++++++++---- tests/rptest/services/templates/redpanda.yaml | 5 +- tests/rptest/tests/node_id_assignment_test.py | 94 +++++++++++++++++++ 4 files changed, 150 insertions(+), 20 deletions(-) create mode 100644 tests/rptest/tests/node_id_assignment_test.py diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index 69783fddaf20..785b95c2d8b2 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -383,8 +383,8 @@ def patch_cluster_config(self, def get_cluster_config_status(self, node: ClusterNode = None): return self._request("GET", "cluster_config/status", node=node).json() - def get_node_config(self): - return self._request("GET", "node_config").json() + def get_node_config(self, node=None): + return self._request("GET", "node_config", node).json() def get_features(self, node=None): return self._request("GET", "features", node=node).json() diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index 923efa943f6c..896370bd5c14 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -34,14 +34,15 @@ from ducktape.errors import TimeoutError from rptest.clients.kafka_cat import KafkaCat +from rptest.clients.rpk_remote import RpkRemoteTool +from rptest.clients.python_librdkafka import PythonLibrdkafka +from rptest.services import tls from rptest.services.admin import Admin from rptest.services.redpanda_installer import RedpandaInstaller from rptest.services.rolling_restarter import RollingRestarter -from rptest.clients.rpk_remote import RpkRemoteTool from rptest.services.storage import ClusterStorage, NodeStorage from rptest.services.utils import BadLogLines, NodeCrash -from rptest.clients.python_librdkafka import PythonLibrdkafka -from rptest.services import tls +from rptest.util import wait_until_result Partition = collections.namedtuple('Partition', ['index', 'leader', 'replicas']) @@ -740,7 +741,8 @@ def start(self, clean_nodes=True, start_si=True, parallel: bool = True, - expect_fail: bool = False): + expect_fail: bool = False, + auto_assign_node_id: bool = False): """ Start the service on all nodes. @@ -795,7 +797,8 @@ def start_one(node): self.logger.debug("%s: starting node" % self.who_am_i(node)) self.start_node(node, first_start=first_start, - expect_fail=expect_fail) + expect_fail=expect_fail, + auto_assign_node_id=auto_assign_node_id) self._for_nodes(to_start, start_one, parallel=parallel) @@ -999,7 +1002,8 @@ def start_node(self, timeout=None, write_config=True, first_start=False, - expect_fail: bool = False): + expect_fail: bool = False, + auto_assign_node_id: bool = False): """ Start a single instance of redpanda. This function will not return until redpanda appears to have started successfully. If redpanda does not @@ -1010,7 +1014,9 @@ def start_node(self, node.account.mkdirs(os.path.dirname(RedpandaService.NODE_CONFIG_FILE)) if write_config: - self.write_node_conf_file(node, override_cfg_params) + self.write_node_conf_file(node, + override_cfg_params, + auto_assign_node_id=auto_assign_node_id) if timeout is None: timeout = self.node_ready_timeout_s @@ -1545,7 +1551,10 @@ def pids(self, node): def started_nodes(self): return self._started - def write_node_conf_file(self, node, override_cfg_params=None): + def write_node_conf_file(self, + node, + override_cfg_params=None, + auto_assign_node_id=False): """ Write the node config file for a redpanda node: this is the YAML representation of Redpanda's `node_config` class. Distinct from Redpanda's _cluster_ configuration @@ -1553,6 +1562,12 @@ def write_node_conf_file(self, node, override_cfg_params=None): """ node_info = {self.idx(n): n for n in self.nodes} + node_id = self.idx(node) + include_seed_servers = node_id > 1 + if auto_assign_node_id: + # Supply None so it's omitted from the config. + node_id = None + # Grab the IP to use it as an alternative listener address, to # exercise code paths that deal with multiple listeners node_ip = socket.gethostbyname(node.account.hostname) @@ -1561,7 +1576,8 @@ def write_node_conf_file(self, node, override_cfg_params=None): node=node, data_dir=RedpandaService.DATA_DIR, nodes=node_info, - node_id=self.idx(node), + node_id=node_id, + include_seed_servers=include_seed_servers, node_ip=node_ip, kafka_alternate_port=self.KAFKA_ALTERNATE_PORT, admin_alternate_port=self.ADMIN_ALTERNATE_PORT, @@ -1644,7 +1660,8 @@ def restart_nodes(self, nodes, override_cfg_params=None, start_timeout=None, - stop_timeout=None): + stop_timeout=None, + auto_assign_node_id=False): nodes = [nodes] if isinstance(nodes, ClusterNode) else nodes with concurrent.futures.ThreadPoolExecutor( @@ -1656,8 +1673,11 @@ def restart_nodes(self, nodes)) list( executor.map( - lambda n: self.start_node( - n, override_cfg_params, timeout=start_timeout), nodes)) + lambda n: self.start_node(n, + override_cfg_params, + timeout=start_timeout, + auto_assign_node_id= + auto_assign_node_id), nodes)) def rolling_restart_nodes(self, nodes, @@ -1681,9 +1701,9 @@ def registered(self, node): We first check the admin API to do a kafka-independent check, and then verify that kafka clients see the same thing. """ - idx = self.idx(node) + node_id = self.node_id(node) self.logger.debug( - f"registered: checking if broker {idx} ({node.name} is registered..." + f"registered: checking if broker {node_id} ({node.name}) is registered..." ) # Query all nodes' admin APIs, so that we don't advance during setup until @@ -1703,7 +1723,7 @@ def registered(self, node): return False found = None for b in admin_brokers: - if b['node_id'] == idx: + if b['node_id'] == node_id: found = b break @@ -1733,7 +1753,7 @@ def registered(self, node): client = PythonLibrdkafka(self, tls_cert=self._tls_cert, **auth_args) brokers = client.brokers() - broker = brokers.get(idx, None) + broker = brokers.get(node_id, None) if broker is None: # This should never happen, because we already checked via the admin API # that the node of interest had become visible to all peers. @@ -2003,6 +2023,21 @@ def shards(self): shards_per_node[self.idx(node)] = num_shards return shards_per_node + def node_id(self, node): + def _try_get_node_id(): + try: + node_cfg = self._admin.get_node_config(node) + except: + return (False, -1) + return (True, node_cfg["node_id"]) + + node_id = wait_until_result( + _try_get_node_id, + timeout_sec=30, + err_msg=f"couldn't reach admin endpoing for {node.account.hostname}" + ) + return node_id + def healthy(self): """ A primitive health check on all the nodes which returns True when all diff --git a/tests/rptest/services/templates/redpanda.yaml b/tests/rptest/services/templates/redpanda.yaml index 05df0302d030..5f62b5228797 100644 --- a/tests/rptest/services/templates/redpanda.yaml +++ b/tests/rptest/services/templates/redpanda.yaml @@ -12,7 +12,9 @@ organization: "vectorized" redpanda: developer_mode: true data_directory: "{{data_dir}}" +{% if node_id is not none %} node_id: {{node_id}} +{% endif %} rpc_server: address: "{{node.account.hostname}}" port: 33145 @@ -39,12 +41,11 @@ redpanda: port: {{admin_alternate_port}} -{% if node_id > 1 %} +{% if include_seed_servers %} seed_servers: - host: address: {{nodes[1].account.hostname}} port: 33145 - node_id: 1 {% endif %} {% if enable_pp %} diff --git a/tests/rptest/tests/node_id_assignment_test.py b/tests/rptest/tests/node_id_assignment_test.py new file mode 100644 index 000000000000..320837faeea4 --- /dev/null +++ b/tests/rptest/tests/node_id_assignment_test.py @@ -0,0 +1,94 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from ducktape.utils.util import wait_until +from rptest.tests.redpanda_test import RedpandaTest +from rptest.services.cluster import cluster +from rptest.services.redpanda_installer import RedpandaInstaller + + +def wipe_and_restart(redpanda, node): + """ + Stops, clears, and restarts a node, priming it to be assigned a node ID. + """ + redpanda.stop_node(node) + redpanda.clean_node(node, + preserve_logs=True, + preserve_current_install=True) + redpanda.start_node(node, auto_assign_node_id=True) + + +class NodeIdAssignment(RedpandaTest): + """ + Test that exercises cluster formation when node IDs are automatically + assigned by Redpanda. + """ + def __init__(self, test_context): + super(NodeIdAssignment, self).__init__(test_context=test_context, + num_brokers=3) + self.admin = self.redpanda._admin + + def setUp(self): + self.redpanda.start(auto_assign_node_id=True) + self._create_initial_topics() + + @cluster(num_nodes=3) + def test_basic_assignment(self): + brokers = self.admin.get_brokers() + assert 3 == len(brokers), f"Got {len(brokers)} brokers" + + @cluster(num_nodes=3) + def test_assign_after_clear(self): + brokers = self.admin.get_brokers() + assert 3 == len(brokers), f"Got {len(brokers)} brokers" + + clean_node = self.redpanda.nodes[-1] + original_node_id = self.redpanda.node_id(clean_node) + wipe_and_restart(self.redpanda, clean_node) + + brokers = self.admin.get_brokers() + assert 4 == len(brokers), f"Got {len(brokers)} brokers" + new_node_id = self.redpanda.node_id(clean_node) + assert original_node_id != new_node_id, f"Cleaned node came back with node ID {new_node_id}" + + +class NodeIdAssignmentUpgrade(RedpandaTest): + """ + Test that exercises cluster formation when node IDs are automatically + assigned by Redpanda after an upgrade. + """ + def __init__(self, test_context): + super(NodeIdAssignmentUpgrade, + self).__init__(test_context=test_context, num_brokers=3) + self.installer = self.redpanda._installer + self.admin = self.redpanda._admin + + def setUp(self): + self.installer.install(self.redpanda.nodes, (22, 2, 1)) + super(NodeIdAssignmentUpgrade, self).setUp() + + @cluster(num_nodes=3) + def test_assign_after_upgrade(self): + self.installer.install(self.redpanda.nodes, RedpandaInstaller.HEAD) + self.redpanda.restart_nodes(self.redpanda.nodes, + auto_assign_node_id=True) + wait_until( + lambda: self.admin.supports_feature("node_id_assignment"), + timeout_sec=30, + backoff_sec=1, + err_msg="Timeout waiting for cluster to support 'license' feature") + + clean_node = self.redpanda.nodes[-1] + original_node_id = self.redpanda.node_id(clean_node) + wipe_and_restart(self.redpanda, clean_node) + + brokers = self.admin.get_brokers() + assert 4 == len(brokers), f"Got {len(brokers)} brokers" + new_node_id = self.redpanda.node_id(clean_node) + assert original_node_id != new_node_id, f"Cleaned node came back with node ID {new_node_id}"