Skip to content

Commit

Permalink
tests: ducktape test for node ID assignment
Browse files Browse the repository at this point in the history
  • Loading branch information
andrwng committed Oct 6, 2022
1 parent 8aa8c7d commit 5972afd
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 20 deletions.
4 changes: 2 additions & 2 deletions tests/rptest/services/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,8 @@ def patch_cluster_config(self,
def get_cluster_config_status(self, node: ClusterNode = None):
return self._request("GET", "cluster_config/status", node=node).json()

def get_node_config(self):
return self._request("GET", "node_config").json()
def get_node_config(self, node=None):
return self._request("GET", "node_config", node).json()

def get_features(self, node=None):
return self._request("GET", "features", node=node).json()
Expand Down
67 changes: 51 additions & 16 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@
from ducktape.errors import TimeoutError

from rptest.clients.kafka_cat import KafkaCat
from rptest.clients.rpk_remote import RpkRemoteTool
from rptest.clients.python_librdkafka import PythonLibrdkafka
from rptest.services import tls
from rptest.services.admin import Admin
from rptest.services.redpanda_installer import RedpandaInstaller
from rptest.services.rolling_restarter import RollingRestarter
from rptest.clients.rpk_remote import RpkRemoteTool
from rptest.services.storage import ClusterStorage, NodeStorage
from rptest.services.utils import BadLogLines, NodeCrash
from rptest.clients.python_librdkafka import PythonLibrdkafka
from rptest.services import tls
from rptest.util import wait_until_result

Partition = collections.namedtuple('Partition',
['index', 'leader', 'replicas'])
Expand Down Expand Up @@ -740,7 +741,8 @@ def start(self,
clean_nodes=True,
start_si=True,
parallel: bool = True,
expect_fail: bool = False):
expect_fail: bool = False,
auto_assign_node_id: bool = False):
"""
Start the service on all nodes.
Expand Down Expand Up @@ -795,7 +797,8 @@ def start_one(node):
self.logger.debug("%s: starting node" % self.who_am_i(node))
self.start_node(node,
first_start=first_start,
expect_fail=expect_fail)
expect_fail=expect_fail,
auto_assign_node_id=auto_assign_node_id)

self._for_nodes(to_start, start_one, parallel=parallel)

Expand Down Expand Up @@ -999,7 +1002,8 @@ def start_node(self,
timeout=None,
write_config=True,
first_start=False,
expect_fail: bool = False):
expect_fail: bool = False,
auto_assign_node_id: bool = False):
"""
Start a single instance of redpanda. This function will not return until
redpanda appears to have started successfully. If redpanda does not
Expand All @@ -1010,7 +1014,9 @@ def start_node(self,
node.account.mkdirs(os.path.dirname(RedpandaService.NODE_CONFIG_FILE))

if write_config:
self.write_node_conf_file(node, override_cfg_params)
self.write_node_conf_file(node,
override_cfg_params,
auto_assign_node_id=auto_assign_node_id)

if timeout is None:
timeout = self.node_ready_timeout_s
Expand Down Expand Up @@ -1545,14 +1551,23 @@ def pids(self, node):
def started_nodes(self):
return self._started

def write_node_conf_file(self, node, override_cfg_params=None):
def write_node_conf_file(self,
node,
override_cfg_params=None,
auto_assign_node_id=False):
"""
Write the node config file for a redpanda node: this is the YAML representation
of Redpanda's `node_config` class. Distinct from Redpanda's _cluster_ configuration
which is written separately.
"""
node_info = {self.idx(n): n for n in self.nodes}

node_id = self.idx(node)
include_seed_servers = node_id > 1
if auto_assign_node_id:
# Supply None so it's omitted from the config.
node_id = None

# Grab the IP to use it as an alternative listener address, to
# exercise code paths that deal with multiple listeners
node_ip = socket.gethostbyname(node.account.hostname)
Expand All @@ -1561,7 +1576,8 @@ def write_node_conf_file(self, node, override_cfg_params=None):
node=node,
data_dir=RedpandaService.DATA_DIR,
nodes=node_info,
node_id=self.idx(node),
node_id=node_id,
include_seed_servers=include_seed_servers,
node_ip=node_ip,
kafka_alternate_port=self.KAFKA_ALTERNATE_PORT,
admin_alternate_port=self.ADMIN_ALTERNATE_PORT,
Expand Down Expand Up @@ -1644,7 +1660,8 @@ def restart_nodes(self,
nodes,
override_cfg_params=None,
start_timeout=None,
stop_timeout=None):
stop_timeout=None,
auto_assign_node_id=False):

nodes = [nodes] if isinstance(nodes, ClusterNode) else nodes
with concurrent.futures.ThreadPoolExecutor(
Expand All @@ -1656,8 +1673,11 @@ def restart_nodes(self,
nodes))
list(
executor.map(
lambda n: self.start_node(
n, override_cfg_params, timeout=start_timeout), nodes))
lambda n: self.start_node(n,
override_cfg_params,
timeout=start_timeout,
auto_assign_node_id=
auto_assign_node_id), nodes))

def rolling_restart_nodes(self,
nodes,
Expand All @@ -1681,9 +1701,9 @@ def registered(self, node):
We first check the admin API to do a kafka-independent check, and then verify
that kafka clients see the same thing.
"""
idx = self.idx(node)
node_id = self.node_id(node)
self.logger.debug(
f"registered: checking if broker {idx} ({node.name} is registered..."
f"registered: checking if broker {node_id} ({node.name}) is registered..."
)

# Query all nodes' admin APIs, so that we don't advance during setup until
Expand All @@ -1703,7 +1723,7 @@ def registered(self, node):
return False
found = None
for b in admin_brokers:
if b['node_id'] == idx:
if b['node_id'] == node_id:
found = b
break

Expand Down Expand Up @@ -1733,7 +1753,7 @@ def registered(self, node):
client = PythonLibrdkafka(self, tls_cert=self._tls_cert, **auth_args)

brokers = client.brokers()
broker = brokers.get(idx, None)
broker = brokers.get(node_id, None)
if broker is None:
# This should never happen, because we already checked via the admin API
# that the node of interest had become visible to all peers.
Expand Down Expand Up @@ -2003,6 +2023,21 @@ def shards(self):
shards_per_node[self.idx(node)] = num_shards
return shards_per_node

def node_id(self, node):
def _try_get_node_id():
try:
node_cfg = self._admin.get_node_config(node)
except:
return (False, -1)
return (True, node_cfg["node_id"])

node_id = wait_until_result(
_try_get_node_id,
timeout_sec=30,
err_msg=f"couldn't reach admin endpoing for {node.account.hostname}"
)
return node_id

def healthy(self):
"""
A primitive health check on all the nodes which returns True when all
Expand Down
5 changes: 3 additions & 2 deletions tests/rptest/services/templates/redpanda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ organization: "vectorized"
redpanda:
developer_mode: true
data_directory: "{{data_dir}}"
{% if node_id is not none %}
node_id: {{node_id}}
{% endif %}
rpc_server:
address: "{{node.account.hostname}}"
port: 33145
Expand All @@ -39,12 +41,11 @@ redpanda:
port: {{admin_alternate_port}}


{% if node_id > 1 %}
{% if include_seed_servers %}
seed_servers:
- host:
address: {{nodes[1].account.hostname}}
port: 33145
node_id: 1
{% endif %}

{% if enable_pp %}
Expand Down
94 changes: 94 additions & 0 deletions tests/rptest/tests/node_id_assignment_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright 2022 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from ducktape.utils.util import wait_until
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.cluster import cluster
from rptest.services.redpanda_installer import RedpandaInstaller


def wipe_and_restart(redpanda, node):
"""
Stops, clears, and restarts a node, priming it to be assigned a node ID.
"""
redpanda.stop_node(node)
redpanda.clean_node(node,
preserve_logs=True,
preserve_current_install=True)
redpanda.start_node(node, auto_assign_node_id=True)


class NodeIdAssignment(RedpandaTest):
"""
Test that exercises cluster formation when node IDs are automatically
assigned by Redpanda.
"""
def __init__(self, test_context):
super(NodeIdAssignment, self).__init__(test_context=test_context,
num_brokers=3)
self.admin = self.redpanda._admin

def setUp(self):
self.redpanda.start(auto_assign_node_id=True)
self._create_initial_topics()

@cluster(num_nodes=3)
def test_basic_assignment(self):
brokers = self.admin.get_brokers()
assert 3 == len(brokers), f"Got {len(brokers)} brokers"

@cluster(num_nodes=3)
def test_assign_after_clear(self):
brokers = self.admin.get_brokers()
assert 3 == len(brokers), f"Got {len(brokers)} brokers"

clean_node = self.redpanda.nodes[-1]
original_node_id = self.redpanda.node_id(clean_node)
wipe_and_restart(self.redpanda, clean_node)

brokers = self.admin.get_brokers()
assert 4 == len(brokers), f"Got {len(brokers)} brokers"
new_node_id = self.redpanda.node_id(clean_node)
assert original_node_id != new_node_id, f"Cleaned node came back with node ID {new_node_id}"


class NodeIdAssignmentUpgrade(RedpandaTest):
"""
Test that exercises cluster formation when node IDs are automatically
assigned by Redpanda after an upgrade.
"""
def __init__(self, test_context):
super(NodeIdAssignmentUpgrade,
self).__init__(test_context=test_context, num_brokers=3)
self.installer = self.redpanda._installer
self.admin = self.redpanda._admin

def setUp(self):
self.installer.install(self.redpanda.nodes, (22, 2, 1))
super(NodeIdAssignmentUpgrade, self).setUp()

@cluster(num_nodes=3)
def test_assign_after_upgrade(self):
self.installer.install(self.redpanda.nodes, RedpandaInstaller.HEAD)
self.redpanda.restart_nodes(self.redpanda.nodes,
auto_assign_node_id=True)
wait_until(
lambda: self.admin.supports_feature("node_id_assignment"),
timeout_sec=30,
backoff_sec=1,
err_msg="Timeout waiting for cluster to support 'license' feature")

clean_node = self.redpanda.nodes[-1]
original_node_id = self.redpanda.node_id(clean_node)
wipe_and_restart(self.redpanda, clean_node)

brokers = self.admin.get_brokers()
assert 4 == len(brokers), f"Got {len(brokers)} brokers"
new_node_id = self.redpanda.node_id(clean_node)
assert original_node_id != new_node_id, f"Cleaned node came back with node ID {new_node_id}"

0 comments on commit 5972afd

Please sign in to comment.