Skip to content

Commit

Permalink
cluster: cluster_bootstrap_info implemented
Browse files Browse the repository at this point in the history
Client: parallel querying of all seed_servers w/o timeout, until
results are obtained from all peer seed servers.
Verify that both versions and configurations match
Server: supply data
initial_seed_brokers() now returns a future
  • Loading branch information
dlex committed Oct 7, 2022
1 parent 6d61a10 commit 87604ad
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 16 deletions.
17 changes: 16 additions & 1 deletion src/v/cluster/bootstrap_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,29 @@
#include "cluster/bootstrap_service.h"

#include "cluster/bootstrap_types.h"
#include "cluster/cluster_utils.h"
#include "cluster/logger.h"
#include "config/node_config.h"
#include "features/feature_table.h"

namespace cluster {

ss::future<cluster_bootstrap_info_reply>
bootstrap_service::cluster_bootstrap_info(
cluster_bootstrap_info_request&&, rpc::streaming_context&) {
cluster_bootstrap_info_reply r{};
// TODO: add fields!
r.broker = make_self_broker(config::node());
r.version = features::feature_table::get_latest_logical_version();
const std::vector<config::seed_server>& seed_servers
= config::node().seed_servers();
r.seed_servers.reserve(seed_servers.size());
std::transform(
seed_servers.cbegin(),
seed_servers.cend(),
std::back_inserter(r.seed_servers),
[](const config::seed_server& seed_server) { return seed_server.addr; });
r.empty_seed_starts_cluster = config::node().empty_seed_starts_cluster();
vlog(clusterlog.debug, "Replying cluster_bootstrap_info: {}", r);
co_return r;
}

Expand Down
23 changes: 17 additions & 6 deletions src/v/cluster/bootstrap_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0
#pragma once

#include "cluster/types.h"
#include "model/fundamental.h"
#include "serde/serde.h"

Expand All @@ -34,17 +35,27 @@ struct cluster_bootstrap_info_reply
: serde::envelope<cluster_bootstrap_info_reply, serde::version<0>> {
using rpc_adl_exempt = std::true_type;

model::broker broker;
cluster_version version;
std::vector<net::unresolved_address> seed_servers;
bool empty_seed_starts_cluster;
// TODO: add fields!
// - node UUID?
// - logical version of the node
// - seed servers list
// - empty_seed_starts_cluster value

auto serde_fields() { return std::tie(); }
auto serde_fields() {
return std::tie(
broker, version, seed_servers, empty_seed_starts_cluster);
}

friend std::ostream&
operator<<(std::ostream& o, const cluster_bootstrap_info_reply&) {
fmt::print(o, "{{}}");
operator<<(std::ostream& o, const cluster_bootstrap_info_reply& v) {
fmt::print(
o,
"{{broker: {}, version: {}, seed_servers: {}, ESCB: {}}}",
v.broker,
v.version,
v.seed_servers,
v.empty_seed_starts_cluster);
return o;
}
};
Expand Down
142 changes: 135 additions & 7 deletions src/v/cluster/cluster_discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@

#include "cluster/cluster_discovery.h"

#include "cluster/bootstrap_types.h"
#include "cluster/cluster_bootstrap_service.h"
#include "cluster/cluster_utils.h"
#include "cluster/controller_service.h"
#include "cluster/logger.h"
#include "config/node_config.h"
#include "features/feature_table.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "seastarx.h"
Expand Down Expand Up @@ -79,24 +82,26 @@ ss::future<node_id> cluster_discovery::determine_node_id() {
co_return assigned_node_id;
}

vector<broker>
ss::future<cluster_discovery::brokers>
cluster_discovery::initial_seed_brokers(const bool cluster_exists) const {
// If the cluster has been formed, return empty
if (cluster_exists) return {};
if (cluster_exists) co_return brokers{};
// If configured as the root node, we'll want to start the cluster with
// just this node as the initial seed.
if (config::node().empty_seed_starts_cluster()) {
if (config::node().seed_servers().empty())
return {make_self_broker(config::node())};
co_return brokers{make_self_broker(config::node())};
// Not a root
return {};
co_return brokers{};
}
if (get_node_index_in_seed_servers()) {
// TODO: return the discovered nodes plus this node
return {make_self_broker(config::node())};
std::vector<model::broker> seed_brokers
= co_await request_seed_brokers();
seed_brokers.push_back(make_self_broker(config::node()));
co_return std::move(seed_brokers);
}
// Non-seed server
return {};
co_return brokers{};
}

ss::future<ss::stop_iteration>
Expand Down Expand Up @@ -140,6 +145,129 @@ cluster_discovery::dispatch_node_uuid_registration_to_seeds(
co_return ss::stop_iteration::no;
}

ss::future<cluster_bootstrap_info_reply>
cluster_discovery::request_cluster_bootstrap_info(
const net::unresolved_address addr) const {
vlog(clusterlog.info, "Requesting cluster bootstrap info from {}", addr);
cluster_bootstrap_info_reply reply;
co_await ss::repeat(
ss::coroutine::lambda([&reply, addr]() -> ss::future<ss::stop_iteration> {
auto reply_result = co_await do_with_client_one_shot<
cluster_bootstrap_client_protocol>(
addr,
config::node().rpc_server_tls(),
2s,
[](cluster_bootstrap_client_protocol c) {
return c
.cluster_bootstrap_info(
cluster_bootstrap_info_request{},
rpc::client_opts(rpc::clock_type::now() + 2s))
.then(&rpc::get_ctx_data<cluster_bootstrap_info_reply>);
});
if (reply_result) {
reply = std::move(reply_result.value());
co_return ss::stop_iteration::yes;
}
co_await ss::sleep_abortable(1s);
vlog(
clusterlog.trace, "Retrying cluster bootstrap info from {}", addr);
co_return ss::stop_iteration::no;
}));

vlog(clusterlog.info, "Obtained cluster bootstrap info from {}", addr);
vlog(clusterlog.debug, "{}", reply);
co_return std::move(reply);
}

namespace {

bool equal(
const std::vector<net::unresolved_address>& lhs,
const std::vector<config::seed_server>& rhs) {
return std::equal(
lhs.cbegin(),
lhs.cend(),
rhs.cbegin(),
rhs.cend(),
[](const net::unresolved_address& lhs, const config::seed_server& rhs) {
return lhs == rhs.addr;
});
}

} // namespace

ss::future<std::vector<model::broker>>
cluster_discovery::request_seed_brokers() const {
const net::unresolved_address& self_addr
= config::node().advertised_rpc_api();
const std::vector<config::seed_server>& self_seed_servers
= config::node().seed_servers();

std::vector<
std::pair<net::unresolved_address, cluster_bootstrap_info_reply>>
peers;
peers.reserve(self_seed_servers.size());
for (const config::seed_server& seed_server : self_seed_servers) {
// do not call oneself
if (seed_server.addr == self_addr) continue;
peers.emplace_back(seed_server.addr, cluster_bootstrap_info_reply{});
}
co_await ss::parallel_for_each(peers, [this](auto& peer) -> ss::future<> {
peer.second = co_await request_cluster_bootstrap_info(peer.first);
co_return;
});

bool failed = false;
std::vector<model::broker> seed_brokers;
seed_brokers.reserve(peers.size());
for (auto& peer : peers) {
if (
peer.second.version
!= features::feature_table::get_latest_logical_version()) {
vlog(
clusterlog.error,
"Cluster setup error: logical version mismatch, local: {}, {}: "
"{}",
features::feature_table::get_latest_logical_version(),
peer.first,
peer.second.version);
failed = true;
}
if (!equal(peer.second.seed_servers, self_seed_servers)) {
vlog(
clusterlog.error,
"Cluster configuration error: seed server list mismatch, "
"local: "
"[{}], {}: [{}]",
self_seed_servers,
peer.first,
peer.second.seed_servers);
failed = true;
}
if (
peer.second.empty_seed_starts_cluster
!= config::node().empty_seed_starts_cluster()) {
vlog(
clusterlog.error,
"Cluster configuration error: empty_seed_starts_cluster "
"mismatch, local: {}, {}: {}",
config::node().empty_seed_starts_cluster(),
peer.first,
peer.second.empty_seed_starts_cluster);
failed = true;
}
seed_brokers.push_back(std::move(peer.second.broker));
}
if (failed)
throw std::runtime_error(fmt_with_ctx(
fmt::format,
"Cannot bootstrap a cluster due to seed servers mismatch, check "
"the "
"log for details"));
vlog(clusterlog.debug, "Seed brokers: [{}]", seed_brokers);
co_return std::move(seed_brokers);
}

/*static*/ std::optional<node_id>
cluster_discovery::get_cluster_founder_node_id() {
if (config::node().empty_seed_starts_cluster()) {
Expand Down
10 changes: 9 additions & 1 deletion src/v/cluster/cluster_discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class kvstore;
} // namespace storage

namespace cluster {
struct cluster_bootstrap_info_reply;

// Provides metadata pertaining to initial cluster discovery. It is the
// entrypoint into the steps to join a cluster.
Expand Down Expand Up @@ -60,6 +61,8 @@ namespace cluster {
// TODO: reconcile the RPC dispatch logic here with that in members_manager.
class cluster_discovery {
public:
using brokers = std::vector<model::broker>;

cluster_discovery(
const model::node_uuid& node_uuid,
storage::kvstore& kvstore,
Expand All @@ -82,7 +85,7 @@ class cluster_discovery {
// created, returns an empty list.
// In case of Emtpy Seed Cluster Bootstrap, that reflects to a list of the
// root broker in root if cluster is not there yet, and empty otherwise.
std::vector<model::broker> initial_seed_brokers(bool cluster_exists) const;
ss::future<brokers> initial_seed_brokers(bool cluster_exists) const;

private:
// Returns index-based node_id if the local node is a founding member
Expand All @@ -100,6 +103,11 @@ class cluster_discovery {
*/
static std::optional<int32_t> get_node_index_in_seed_servers();

ss::future<std::vector<model::broker>> request_seed_brokers() const;

ss::future<cluster_bootstrap_info_reply>
request_cluster_bootstrap_info(net::unresolved_address) const;

// Sends requests to each seed server to register the local node UUID until
// one succeeds. Upon success, sets `node_id` to the assigned node ID and
// returns stop_iteration::yes.
Expand Down
2 changes: 1 addition & 1 deletion src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1461,7 +1461,7 @@ void application::start_runtime_services(
syschecks::systemd_message("Starting controller").get();
controller
->start(
cd.initial_seed_brokers(stored_cluster_uuid.has_value()),
cd.initial_seed_brokers(stored_cluster_uuid.has_value()).get(),
stored_cluster_uuid)
.get0();
kafka_group_migration = ss::make_lw_shared<kafka::group_metadata_migration>(
Expand Down

0 comments on commit 87604ad

Please sign in to comment.