From 0f6bbf6a6e450342c54592f8417a2b81339b4755 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 24 Nov 2023 10:30:29 +0100 Subject: [PATCH] tests: added tx manager migration test Signed-off-by: Michal Maslanka --- tests/rptest/services/admin.py | 3 + .../tests/tx_coordinator_migration_test.py | 171 ++++++++++++++++++ 2 files changed, 174 insertions(+) create mode 100644 tests/rptest/tests/tx_coordinator_migration_test.py diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index f1bbf7b3a4070..33716e82e3f03 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -1177,3 +1177,6 @@ def set_partitions_disabled(self, def reset_crash_tracking(self, node): return self._request("PUT", "reset_crash_tracking", node=node) + + def migrate_tx_manager_in_recovery(self, node): + return self._request("POST", "recovery/migrate_tx_manager", node=node) diff --git a/tests/rptest/tests/tx_coordinator_migration_test.py b/tests/rptest/tests/tx_coordinator_migration_test.py new file mode 100644 index 0000000000000..8e21ee484559c --- /dev/null +++ b/tests/rptest/tests/tx_coordinator_migration_test.py @@ -0,0 +1,171 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from concurrent.futures import ThreadPoolExecutor +import time +import requests +from rptest.services.cluster import cluster +from rptest.clients.types import TopicSpec + +import random +from rptest.services.failure_injector import FailureInjector, FailureSpec + +from rptest.tests.redpanda_test import RedpandaTest +from rptest.services.admin import Admin +from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST +import confluent_kafka as ck +from rptest.services.admin import Admin +from rptest.utils.mode_checks import skip_debug_mode +from ducktape.mark import parametrize + + +class TxCoordinatorMigrationTest(RedpandaTest): + def __init__(self, test_context): + self.partition_count = 8 + self.initial_tx_manager_partitions = 4 + self.producer_count = 100 + super(TxCoordinatorMigrationTest, + self).__init__(test_context=test_context, + num_brokers=3, + extra_rp_conf={ + 'transaction_coordinator_partitions': + self.initial_tx_manager_partitions + }) + + def _tx_id(self, idx): + return f"test-producer-{idx}" + + def _populate_tx_coordinator(self, topic): + def delivery_callback(err, msg): + if err: + assert False, "failed to deliver message: %s" % err + + for i in range(self.producer_count): + producer = ck.Producer({ + 'bootstrap.servers': self.redpanda.brokers(), + 'transactional.id': self._tx_id(i), + 'transaction.timeout.ms': 10000, + }) + producer.init_transactions() + producer.begin_transaction() + for m in range(random.randint(1, 50)): + producer.produce(topic, + f"p-{i}-key-{m}", + f"p-{i}-value-{m}", + random.randint(0, self.partition_count - 1), + callback=delivery_callback) + producer.commit_transaction() + producer.flush() + + def _get_tx_id_mapping(self): + mapping = {} + admin = Admin(self.redpanda) + for idx in range(self.producer_count): + c = admin.find_tx_coordinator(self._tx_id(idx)) + mapping[self._tx_id( + idx)] = f"{c['ntp']['topic']}/{c['ntp']['partition']}" + + return mapping + + def _get_tx_manager_topic_meta(self): + """_summary_ + Collects partition metadata i.e. mapping between partition id and + raft group, this mapping changes when topic is deleted and recreated + + """ + metadata = Admin(self.redpanda).get_partitions( + namespace="kafka_internal", topic="tx") + + return {(p['partition_id'], p['raft_group_id']) for p in metadata} + + def _migrate_until_success(self): + admin = Admin(self.redpanda) + finished = False + cnt = 0 + max_failures = 5 + fi = FailureInjector(self.redpanda) + with ThreadPoolExecutor(max_workers=1) as executor: + while not finished: + cnt += 1 + try: + if cnt < max_failures: + executor.submit(lambda: fi.inject_failure( + FailureSpec(FailureSpec.FAILURE_KILL, self.redpanda + .nodes[0], 0))) + admin.migrate_tx_manager_in_recovery( + self.redpanda.nodes[0]) + finished = True + except Exception as e: + self.logger.info(f"Migration error: {e}") + time.sleep(0.5) + + @skip_debug_mode + @cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST) + @parametrize(with_failures=True) + @parametrize(with_failures=False) + def test_migrating_tx_manager_coordinator(self, with_failures): + admin = Admin(self.redpanda) + topic = TopicSpec(partition_count=self.partition_count) + self.client().create_topic(topic) + + # check that migration isn't possible when Redpanda is not running in recovery mode + try: + admin.migrate_tx_manager_in_recovery(self.redpanda.nodes[0]) + assert False, "migrate tx manager endpoint is not supposed to be present in normal mode" + except requests.exceptions.HTTPError as e: + assert e.response.status_code == 404, "migrate tx manager endpoint is not supposed to be present in normal mode" + + self._populate_tx_coordinator(topic=topic.name) + initial_mapping = self._get_tx_id_mapping() + self.logger.info(f"Initial tx_id mapping {initial_mapping}") + initial_metadata = self._get_tx_manager_topic_meta() + self.logger.info(f"Initial topic metadata {initial_metadata}") + + self.redpanda.restart_nodes( + self.redpanda.nodes, + override_cfg_params={"recovery_mode_enabled": True}) + + # with the same number of partitions the operation should be a no op + admin.migrate_tx_manager_in_recovery(self.redpanda.nodes[0]) + metadata = self._get_tx_manager_topic_meta() + + assert initial_metadata == metadata, "when number of partitions is the same metadata should be identical" + new_partition_count = 32 + # change number of partitions + self.redpanda.set_cluster_config( + {"transaction_coordinator_partitions": new_partition_count}) + + if not with_failures: + # try migrating partitions once again + admin.migrate_tx_manager_in_recovery(self.redpanda.nodes[0]) + else: + self._migrate_until_success() + + new_tp_metadata = self._get_tx_manager_topic_meta() + assert len( + new_tp_metadata + ) == new_partition_count, "Tx manager topic after migration is expected to have new number of partitions" + + # restart back in normal mode + self.redpanda.restart_nodes( + self.redpanda.nodes, + override_cfg_params={"recovery_mode_enabled": False}) + + new_mapping = self._get_tx_id_mapping() + self.logger.info(f"New tx_id mapping {new_mapping}") + + used_partitions = set() + for t_id, p in new_mapping.items(): + used_partitions.add(p) + + assert len(new_mapping) == len( + initial_mapping + ), f"All tx ids should be present after repartitioning" + + assert len(used_partitions) > self.initial_tx_manager_partitions