From 5972afdb0472580ba227bf2391a41de8eaa5a9ca Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Mon, 3 Oct 2022 23:42:02 -0700 Subject: [PATCH] tests: ducktape test for node ID assignment --- tests/rptest/services/admin.py | 4 +- tests/rptest/services/redpanda.py | 67 +++++++++---- tests/rptest/services/templates/redpanda.yaml | 5 +- tests/rptest/tests/node_id_assignment_test.py | 94 +++++++++++++++++++ 4 files changed, 150 insertions(+), 20 deletions(-) create mode 100644 tests/rptest/tests/node_id_assignment_test.py diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index 69783fddaf20..785b95c2d8b2 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -383,8 +383,8 @@ def patch_cluster_config(self, def get_cluster_config_status(self, node: ClusterNode = None): return self._request("GET", "cluster_config/status", node=node).json() - def get_node_config(self): - return self._request("GET", "node_config").json() + def get_node_config(self, node=None): + return self._request("GET", "node_config", node).json() def get_features(self, node=None): return self._request("GET", "features", node=node).json() diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index 923efa943f6c..896370bd5c14 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -34,14 +34,15 @@ from ducktape.errors import TimeoutError from rptest.clients.kafka_cat import KafkaCat +from rptest.clients.rpk_remote import RpkRemoteTool +from rptest.clients.python_librdkafka import PythonLibrdkafka +from rptest.services import tls from rptest.services.admin import Admin from rptest.services.redpanda_installer import RedpandaInstaller from rptest.services.rolling_restarter import RollingRestarter -from rptest.clients.rpk_remote import RpkRemoteTool from rptest.services.storage import ClusterStorage, NodeStorage from rptest.services.utils import BadLogLines, NodeCrash -from rptest.clients.python_librdkafka import PythonLibrdkafka -from rptest.services import tls +from rptest.util import wait_until_result Partition = collections.namedtuple('Partition', ['index', 'leader', 'replicas']) @@ -740,7 +741,8 @@ def start(self, clean_nodes=True, start_si=True, parallel: bool = True, - expect_fail: bool = False): + expect_fail: bool = False, + auto_assign_node_id: bool = False): """ Start the service on all nodes. @@ -795,7 +797,8 @@ def start_one(node): self.logger.debug("%s: starting node" % self.who_am_i(node)) self.start_node(node, first_start=first_start, - expect_fail=expect_fail) + expect_fail=expect_fail, + auto_assign_node_id=auto_assign_node_id) self._for_nodes(to_start, start_one, parallel=parallel) @@ -999,7 +1002,8 @@ def start_node(self, timeout=None, write_config=True, first_start=False, - expect_fail: bool = False): + expect_fail: bool = False, + auto_assign_node_id: bool = False): """ Start a single instance of redpanda. This function will not return until redpanda appears to have started successfully. If redpanda does not @@ -1010,7 +1014,9 @@ def start_node(self, node.account.mkdirs(os.path.dirname(RedpandaService.NODE_CONFIG_FILE)) if write_config: - self.write_node_conf_file(node, override_cfg_params) + self.write_node_conf_file(node, + override_cfg_params, + auto_assign_node_id=auto_assign_node_id) if timeout is None: timeout = self.node_ready_timeout_s @@ -1545,7 +1551,10 @@ def pids(self, node): def started_nodes(self): return self._started - def write_node_conf_file(self, node, override_cfg_params=None): + def write_node_conf_file(self, + node, + override_cfg_params=None, + auto_assign_node_id=False): """ Write the node config file for a redpanda node: this is the YAML representation of Redpanda's `node_config` class. Distinct from Redpanda's _cluster_ configuration @@ -1553,6 +1562,12 @@ def write_node_conf_file(self, node, override_cfg_params=None): """ node_info = {self.idx(n): n for n in self.nodes} + node_id = self.idx(node) + include_seed_servers = node_id > 1 + if auto_assign_node_id: + # Supply None so it's omitted from the config. + node_id = None + # Grab the IP to use it as an alternative listener address, to # exercise code paths that deal with multiple listeners node_ip = socket.gethostbyname(node.account.hostname) @@ -1561,7 +1576,8 @@ def write_node_conf_file(self, node, override_cfg_params=None): node=node, data_dir=RedpandaService.DATA_DIR, nodes=node_info, - node_id=self.idx(node), + node_id=node_id, + include_seed_servers=include_seed_servers, node_ip=node_ip, kafka_alternate_port=self.KAFKA_ALTERNATE_PORT, admin_alternate_port=self.ADMIN_ALTERNATE_PORT, @@ -1644,7 +1660,8 @@ def restart_nodes(self, nodes, override_cfg_params=None, start_timeout=None, - stop_timeout=None): + stop_timeout=None, + auto_assign_node_id=False): nodes = [nodes] if isinstance(nodes, ClusterNode) else nodes with concurrent.futures.ThreadPoolExecutor( @@ -1656,8 +1673,11 @@ def restart_nodes(self, nodes)) list( executor.map( - lambda n: self.start_node( - n, override_cfg_params, timeout=start_timeout), nodes)) + lambda n: self.start_node(n, + override_cfg_params, + timeout=start_timeout, + auto_assign_node_id= + auto_assign_node_id), nodes)) def rolling_restart_nodes(self, nodes, @@ -1681,9 +1701,9 @@ def registered(self, node): We first check the admin API to do a kafka-independent check, and then verify that kafka clients see the same thing. """ - idx = self.idx(node) + node_id = self.node_id(node) self.logger.debug( - f"registered: checking if broker {idx} ({node.name} is registered..." + f"registered: checking if broker {node_id} ({node.name}) is registered..." ) # Query all nodes' admin APIs, so that we don't advance during setup until @@ -1703,7 +1723,7 @@ def registered(self, node): return False found = None for b in admin_brokers: - if b['node_id'] == idx: + if b['node_id'] == node_id: found = b break @@ -1733,7 +1753,7 @@ def registered(self, node): client = PythonLibrdkafka(self, tls_cert=self._tls_cert, **auth_args) brokers = client.brokers() - broker = brokers.get(idx, None) + broker = brokers.get(node_id, None) if broker is None: # This should never happen, because we already checked via the admin API # that the node of interest had become visible to all peers. @@ -2003,6 +2023,21 @@ def shards(self): shards_per_node[self.idx(node)] = num_shards return shards_per_node + def node_id(self, node): + def _try_get_node_id(): + try: + node_cfg = self._admin.get_node_config(node) + except: + return (False, -1) + return (True, node_cfg["node_id"]) + + node_id = wait_until_result( + _try_get_node_id, + timeout_sec=30, + err_msg=f"couldn't reach admin endpoing for {node.account.hostname}" + ) + return node_id + def healthy(self): """ A primitive health check on all the nodes which returns True when all diff --git a/tests/rptest/services/templates/redpanda.yaml b/tests/rptest/services/templates/redpanda.yaml index 05df0302d030..5f62b5228797 100644 --- a/tests/rptest/services/templates/redpanda.yaml +++ b/tests/rptest/services/templates/redpanda.yaml @@ -12,7 +12,9 @@ organization: "vectorized" redpanda: developer_mode: true data_directory: "{{data_dir}}" +{% if node_id is not none %} node_id: {{node_id}} +{% endif %} rpc_server: address: "{{node.account.hostname}}" port: 33145 @@ -39,12 +41,11 @@ redpanda: port: {{admin_alternate_port}} -{% if node_id > 1 %} +{% if include_seed_servers %} seed_servers: - host: address: {{nodes[1].account.hostname}} port: 33145 - node_id: 1 {% endif %} {% if enable_pp %} diff --git a/tests/rptest/tests/node_id_assignment_test.py b/tests/rptest/tests/node_id_assignment_test.py new file mode 100644 index 000000000000..320837faeea4 --- /dev/null +++ b/tests/rptest/tests/node_id_assignment_test.py @@ -0,0 +1,94 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from ducktape.utils.util import wait_until +from rptest.tests.redpanda_test import RedpandaTest +from rptest.services.cluster import cluster +from rptest.services.redpanda_installer import RedpandaInstaller + + +def wipe_and_restart(redpanda, node): + """ + Stops, clears, and restarts a node, priming it to be assigned a node ID. + """ + redpanda.stop_node(node) + redpanda.clean_node(node, + preserve_logs=True, + preserve_current_install=True) + redpanda.start_node(node, auto_assign_node_id=True) + + +class NodeIdAssignment(RedpandaTest): + """ + Test that exercises cluster formation when node IDs are automatically + assigned by Redpanda. + """ + def __init__(self, test_context): + super(NodeIdAssignment, self).__init__(test_context=test_context, + num_brokers=3) + self.admin = self.redpanda._admin + + def setUp(self): + self.redpanda.start(auto_assign_node_id=True) + self._create_initial_topics() + + @cluster(num_nodes=3) + def test_basic_assignment(self): + brokers = self.admin.get_brokers() + assert 3 == len(brokers), f"Got {len(brokers)} brokers" + + @cluster(num_nodes=3) + def test_assign_after_clear(self): + brokers = self.admin.get_brokers() + assert 3 == len(brokers), f"Got {len(brokers)} brokers" + + clean_node = self.redpanda.nodes[-1] + original_node_id = self.redpanda.node_id(clean_node) + wipe_and_restart(self.redpanda, clean_node) + + brokers = self.admin.get_brokers() + assert 4 == len(brokers), f"Got {len(brokers)} brokers" + new_node_id = self.redpanda.node_id(clean_node) + assert original_node_id != new_node_id, f"Cleaned node came back with node ID {new_node_id}" + + +class NodeIdAssignmentUpgrade(RedpandaTest): + """ + Test that exercises cluster formation when node IDs are automatically + assigned by Redpanda after an upgrade. + """ + def __init__(self, test_context): + super(NodeIdAssignmentUpgrade, + self).__init__(test_context=test_context, num_brokers=3) + self.installer = self.redpanda._installer + self.admin = self.redpanda._admin + + def setUp(self): + self.installer.install(self.redpanda.nodes, (22, 2, 1)) + super(NodeIdAssignmentUpgrade, self).setUp() + + @cluster(num_nodes=3) + def test_assign_after_upgrade(self): + self.installer.install(self.redpanda.nodes, RedpandaInstaller.HEAD) + self.redpanda.restart_nodes(self.redpanda.nodes, + auto_assign_node_id=True) + wait_until( + lambda: self.admin.supports_feature("node_id_assignment"), + timeout_sec=30, + backoff_sec=1, + err_msg="Timeout waiting for cluster to support 'license' feature") + + clean_node = self.redpanda.nodes[-1] + original_node_id = self.redpanda.node_id(clean_node) + wipe_and_restart(self.redpanda, clean_node) + + brokers = self.admin.get_brokers() + assert 4 == len(brokers), f"Got {len(brokers)} brokers" + new_node_id = self.redpanda.node_id(clean_node) + assert original_node_id != new_node_id, f"Cleaned node came back with node ID {new_node_id}"