Skip to content

Commit

Permalink
config: make node ID optional
Browse files Browse the repository at this point in the history
  • Loading branch information
andrwng committed Sep 29, 2022
1 parent dabff99 commit 960e599
Show file tree
Hide file tree
Showing 20 changed files with 129 additions and 43 deletions.
6 changes: 3 additions & 3 deletions src/v/cluster/cluster_discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ cluster_discovery::cluster_discovery(

ss::future<node_id> cluster_discovery::determine_node_id() {
const auto& configured_node_id = _node_config.node_id();
if (configured_node_id == -1) {
clusterlog.info("Using configured node ID {}", configured_node_id);
co_return configured_node_id;
if (configured_node_id != std::nullopt) {
clusterlog.info("Using configured node ID {}", *configured_node_id);
co_return *configured_node_id;
}
// TODO: once is_cluster_founder() refers to all seeds, verify that all the
// seeds' seed_servers lists match and assign node IDs based on the
Expand Down
9 changes: 8 additions & 1 deletion src/v/cluster/cluster_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,15 @@ model::broker make_self_broker(const config::node_config& node_cfg) {
// As for memory, if disk_gb is zero this is handled like a legacy broker.
uint32_t disk_gb = space_info.capacity >> 30;

// If this node hasn't been configured with a node ID, use -1 to indicate
// that we don't yet know it yet. This shouldn't be used during the normal
// operation of a broker, and instead should be used to indicate a broker
// that needs to be assigned a node ID when it first starts up.
model::node_id node_id = node_cfg.node_id() == std::nullopt
? model::node_id(-1)
: *node_cfg.node_id();
return model::broker(
model::node_id(node_cfg.node_id),
node_id,
kafka_addr,
rpc_addr,
node_cfg.rack,
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once
#include "bytes/iobuf_parser.h"
#include "cluster/logger.h"
#include "cluster/simple_batch_builder.h"
#include "cluster/types.h"
#include "model/metadata.h"
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/config_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ config_frontend::config_frontend(
, _leaders(leaders)
, _features(features)
, _as(as)
, _self(config::node().node_id()) {}
, _self(*config::node().node_id()) {}

/**
* RPC wrapper on do_patch, to dispatch to the controller leader
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/config_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ config_manager::config_manager(
ss::sharded<partition_leaders_table>& pl,
ss::sharded<features::feature_table>& ft,
ss::sharded<ss::abort_source>& as)
: _self(config::node().node_id())
: _self(*config::node().node_id())
, _frontend(cf)
, _connection_cache(cc)
, _leaders(pl)
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ controller_backend::controller_backend(
, _partition_leaders_table(leaders)
, _topics_frontend(frontend)
, _storage(storage)
, _self(model::node_id(config::node().node_id))
, _self(*config::node().node_id())
, _data_directory(config::node().data_directory().as_sstring())
, _housekeeping_timer_interval(
config::shard_local_cfg().controller_backend_housekeeping_interval_ms())
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/feature_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ feature_manager::feature_manager(
, _connection_cache(connection_cache)
, _raft0_group(raft0_group)
, _barrier_state(
config::node().node_id(),
*config::node().node_id(),
members.local(),
as.local(),
_gate,
Expand Down Expand Up @@ -109,7 +109,7 @@ ss::future<> feature_manager::start() {

vlog(
clusterlog.debug, "Controller leader notification term {}", term);
_am_controller_leader = leader_id == config::node().node_id();
_am_controller_leader = leader_id == *config::node().node_id();

// This hook avoids the need for the controller leader to receive
// its own health report to generate a call to update_node_version.
Expand All @@ -128,7 +128,7 @@ ss::future<> feature_manager::start() {
leader_id.value(),
features::feature_table::get_latest_logical_version());
update_node_version(
config::node().node_id,
*config::node().node_id(),
features::feature_table::get_latest_logical_version());
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/members_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ members_frontend::members_frontend(
ss::sharded<partition_leaders_table>& leaders,
ss::sharded<features::feature_table>& feature_table,
ss::sharded<ss::abort_source>& as)
: _self(config::node().node_id())
: _self(*config::node().node_id())
, _node_op_timeout(
config::shard_local_cfg().node_management_operation_timeout_ms)
, _stm(stm)
Expand Down
40 changes: 27 additions & 13 deletions src/v/cluster/tests/cluster_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once
#include "cluster/fwd.h"
#include "cluster/logger.h"
#include "cluster/tests/utils.h"
#include "config/seed_server.h"
#include "model/metadata.h"
Expand Down Expand Up @@ -72,7 +73,9 @@ class cluster_test_fixture {
int16_t proxy_port,
int16_t schema_reg_port,
int16_t coproc_supervisor_port,
std::vector<config::seed_server> seeds) {
std::vector<config::seed_server> seeds,
configure_node_id use_node_id = configure_node_id::yes) {
cluster::clusterlog.info("AWONG emplacing");
_instances.emplace(
node_id,
std::make_unique<redpanda_thread_fixture>(
Expand All @@ -85,7 +88,8 @@ class cluster_test_fixture {
seeds,
ssx::sformat("{}.{}", _base_dir, node_id()),
_sgroups,
false));
false,
use_node_id));
}

application* get_node_application(model::node_id id) {
Expand All @@ -107,27 +111,37 @@ class cluster_test_fixture {

application* create_node_application(
model::node_id node_id,
int kafka_port = 9092,
int rpc_port = 11000,
int proxy_port = 8082,
int schema_reg_port = 8081,
int coproc_supervisor_port = 43189) {
int kafka_port_base = 9092,
int rpc_port_base = 11000,
int proxy_port_base = 8082,
int schema_reg_port_base = 8081,
int coproc_supervisor_port_base = 43189,
configure_node_id use_node_id = configure_node_id::yes) {
cluster::clusterlog.info("AWONG creating");
std::vector<config::seed_server> seeds = {};
if (node_id != 0) {
seeds.push_back(
{.addr = net::unresolved_address("127.0.0.1", 11000)});
}
cluster::clusterlog.info("AWONG adding");
add_node(
node_id,
kafka_port + node_id(),
rpc_port + node_id(),
proxy_port + node_id(),
schema_reg_port + node_id(),
coproc_supervisor_port + node_id(),
std::move(seeds));
kafka_port_base + node_id(),
rpc_port_base + node_id(),
proxy_port_base + node_id(),
schema_reg_port_base + node_id(),
coproc_supervisor_port_base + node_id(),
std::move(seeds),
use_node_id);
return get_node_application(node_id);
}

application* create_node_application(
model::node_id node_id, configure_node_id use_node_id) {
return create_node_application(
node_id, 9092, 11000, 8082, 8081, 43189, use_node_id);
}

void remove_node_application(model::node_id node_id) {
_instances.erase(node_id);
}
Expand Down
44 changes: 44 additions & 0 deletions src/v/cluster/tests/cluster_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "test_utils/fixture.h"

using namespace std::chrono_literals; // NOLINT
static ss::logger logger("cluster_tests");

FIXTURE_TEST(test_join_single_node, cluster_test_fixture) {
model::node_id id{0};
Expand Down Expand Up @@ -45,3 +46,46 @@ FIXTURE_TEST(test_three_node_cluster, cluster_test_fixture) {

wait_for_all_members(3s).get();
}

FIXTURE_TEST(test_auto_assign_node_id, cluster_test_fixture) {
create_node_application(model::node_id{0}, configure_node_id::no);
BOOST_REQUIRE_EQUAL(0, *config::node().node_id());

create_node_application(model::node_id{1}, configure_node_id::no);
BOOST_REQUIRE_EQUAL(1, *config::node().node_id());

create_node_application(model::node_id{2}, configure_node_id::no);
BOOST_REQUIRE_EQUAL(2, *config::node().node_id());

wait_for_all_members(3s).get();
}

FIXTURE_TEST(test_auto_assign_non_seeds, cluster_test_fixture) {
create_node_application(model::node_id{0});
BOOST_REQUIRE_EQUAL(0, *config::node().node_id());

create_node_application(model::node_id{1}, configure_node_id::no);
BOOST_REQUIRE_EQUAL(1, *config::node().node_id());

create_node_application(model::node_id{2}, configure_node_id::no);
BOOST_REQUIRE_EQUAL(2, *config::node().node_id());

wait_for_all_members(3s).get();
}

FIXTURE_TEST(test_auto_assign_with_explicit_node_id, cluster_test_fixture) {
create_node_application(model::node_id{0});
BOOST_REQUIRE_EQUAL(0, *config::node().node_id());

// Explicitly assign node ID 2. Node ID assignment should assign around it.
create_node_application(model::node_id{2});
BOOST_REQUIRE_EQUAL(2, *config::node().node_id());

create_node_application(model::node_id{1}, configure_node_id::no);
BOOST_REQUIRE_EQUAL(1, *config::node().node_id());

create_node_application(model::node_id{3}, configure_node_id::no);
BOOST_REQUIRE_EQUAL(3, *config::node().node_id());

wait_for_all_members(3s).get();
}
2 changes: 1 addition & 1 deletion src/v/cluster/topic_table_probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace cluster {

topic_table_probe::topic_table_probe(const topic_table& topic_table)
: _topic_table(topic_table)
, _node_id(config::node().node_id()) {
, _node_id(*config::node().node_id()) {
setup_metrics();
}

Expand Down
6 changes: 4 additions & 2 deletions src/v/config/node_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ node_config::node_config() noexcept
, node_id(
*this,
"node_id",
"Unique id identifying a node in the cluster",
{.required = required::yes, .visibility = visibility::user})
"Unique id identifying a node in the cluster. If missing, a unique id "
"will be assigned for this node when it joins the cluster",
{.visibility = visibility::user},
std::nullopt)
, rack(
*this,
"rack",
Expand Down
7 changes: 6 additions & 1 deletion src/v/config/node_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ struct node_config final : public config_store {
public:
property<bool> developer_mode;
property<data_directory_path> data_directory;
property<model::node_id> node_id;

// NOTE: during the normal runtime of a cluster, it is safe to assume that
// the value of the node ID has been determined, and that there is a value
// set for this property.
property<std::optional<model::node_id>> node_id;

property<std::optional<model::rack_id>> rack;
property<std::vector<seed_server>> seed_servers;

Expand Down
2 changes: 1 addition & 1 deletion src/v/coproc/reconciliation_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ reconciliation_backend::reconciliation_backend(
ss::sharded<partition_manager>& coproc_pm,
ss::sharded<pacemaker>& pacemaker,
ss::sharded<wasm::script_database>& sdb) noexcept
: _self(model::node_id(config::node().node_id))
: _self(model::node_id(*config::node().node_id()))
, _data_directory(config::node().data_directory().as_sstring())
, _topics(topics)
, _shard_table(shard_table)
Expand Down
3 changes: 2 additions & 1 deletion src/v/coproc/tests/partition_movement_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ FIXTURE_TEST(test_move_source_topic, coproc_test_fixture) {
const ss::shard_id next_shard = (*shard + 1) % ss::smp::count;
info("Current target shard {} and next shard {}", *shard, next_shard);
model::broker_shard bs{
.node_id = model::node_id(config::node().node_id), .shard = next_shard};
.node_id = model::node_id(*config::node().node_id()),
.shard = next_shard};

/// Move the input onto the new desired target
auto& topics_fe = root_fixture()->app.controller->get_topics_frontend();
Expand Down
4 changes: 2 additions & 2 deletions src/v/kafka/server/handlers/describe_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,12 @@ static void report_broker_config(
result.resource_name.data() + result.resource_name.size(), // NOLINT
broker_id);
if (res.ec == std::errc()) {
if (broker_id != config::node().node_id()) {
if (broker_id != *config::node().node_id()) {
result.error_code = error_code::invalid_request;
result.error_message = ssx::sformat(
"Unexpected broker id {} expected {}",
broker_id,
config::node().node_id());
*config::node().node_id());
return;
}
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/v/kafka/server/handlers/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ std::optional<cluster::leader_term> get_leader_term(
const auto previous = md_cache.get_previous_leader_id(tp_ns, p_id);
leader_term->leader = previous;

if (previous == config::node().node_id()) {
if (previous == *config::node().node_id()) {
auto idx = fast_prng_source() % replicas.size();
leader_term->leader = replicas[idx];
}
Expand Down Expand Up @@ -333,7 +333,7 @@ guess_peer_listener(request_context& ctx, cluster::broker_ptr broker) {
// is not yet consistent with what's in members_table,
// because a node configuration update didn't propagate
// via raft0 yet
if (broker->id() == config::node().node_id()) {
if (broker->id() == *config::node().node_id()) {
return l;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/redpanda/admin_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ ss::future<ss::httpd::redirect_exception> admin_server::redirect_to_leader(
ss::httpd::reply::status_type::service_unavailable);
}

if (leader_id_opt.value() == config::node().node_id()) {
if (leader_id_opt.value() == *config::node().node_id()) {
vlog(
logger.info,
"Can't redirect to leader from leader node ({})",
Expand Down Expand Up @@ -555,7 +555,7 @@ bool need_redirect_to_leader(
ss::httpd::reply::status_type::service_unavailable);
}

return leader_id_opt.value() != config::node().node_id();
return leader_id_opt.value() != *config::node().node_id();
}

model::node_id parse_broker_id(const ss::httpd::request& req) {
Expand Down
2 changes: 1 addition & 1 deletion src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1553,7 +1553,7 @@ void application::start_kafka(::stop_signal& app_signal) {
vlog(_log.info, "Waiting for cluster membership");
controller->get_members_table()
.local()
.await_membership(config::node().node_id(), app_signal.abort_source())
.await_membership(*config::node().node_id(), app_signal.abort_source())
.get();
_kafka_server.invoke_on_all(&net::server::start).get();
vlog(
Expand Down
Loading

0 comments on commit 960e599

Please sign in to comment.