From a33520b1aaa5fcafb6a3615b41e59e6a35c49a67 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 28 Jun 2023 15:43:27 +0100 Subject: [PATCH 1/5] rptest: make kgo seq consumer read full log Previously, whenever a KgoVerifierSequentialConsumer and a KgoVerifierProducer ran in parallel (which a number of tests do), there was no guarantee that the consumer would read all partitions entirely. Whenever `wait` is called on the consumer, the `last_pass` command is triggered without taking into account if the producer has completed. This is problematic because it can hide stuck consumer issues. This patch makes use of the enhanced kgo-verifier status which now contains the high watermark for each partition. When creating a sequential consumer, a reference to the producer can be provided. If that was the case, when `wait` is called on the consumer we will wait until the producer has finished and the high watermarks match. --- .../rptest/services/kgo_verifier_services.py | 272 ++++++++++-------- 1 file changed, 156 insertions(+), 116 deletions(-) diff --git a/tests/rptest/services/kgo_verifier_services.py b/tests/rptest/services/kgo_verifier_services.py index b2bf20411ec3..5205d7091808 100644 --- a/tests/rptest/services/kgo_verifier_services.py +++ b/tests/rptest/services/kgo_verifier_services.py @@ -11,6 +11,7 @@ import time import threading import requests +from typing import Optional from ducktape.services.service import Service from ducktape.utils.util import wait_until @@ -313,6 +314,7 @@ def is_ready(self): return r.status_code == 200 def _ingest_status(self, worker_statuses): + self.logger.debug(f"{self.who_am_i} status: {worker_statuses}") reduced = self._status_cls(**worker_statuses[0]) for s in worker_statuses[1:]: reduced.merge(self._status_cls(**s)) @@ -399,7 +401,7 @@ class ValidatorStatus: differ per-worker, although at time of writing they don't. """ def __init__(self, name, valid_reads, invalid_reads, - out_of_scope_invalid_reads): + out_of_scope_invalid_reads, max_offsets_consumed): # Validator name is just a unique name per worker thread in kgo-verifier: useful in logging # but we mostly don't care self.name = name @@ -407,6 +409,7 @@ def __init__(self, name, valid_reads, invalid_reads, self.valid_reads = valid_reads self.invalid_reads = invalid_reads self.out_of_scope_invalid_reads = out_of_scope_invalid_reads + self.max_offsets_consumed = max_offsets_consumed @property def total_reads(self): @@ -427,7 +430,7 @@ def __str__(self): class ConsumerStatus: - def __init__(self, validator=None, errors=0, active=True): + def __init__(self, topic=None, validator=None, errors=0, active=True): """ `active` defaults to True, because we use it for deciding when to drop out in `wait()` -- the initial state of a worker should be presumed that it is busy, and we must wait to see it go `active=False` @@ -438,7 +441,8 @@ def __init__(self, validator=None, errors=0, active=True): 'valid_reads': 0, 'invalid_reads': 0, 'out_of_scope_invalid_reads': 0, - 'name': "" + 'name': "", + 'max_offsets_consumed': dict() } self.validator = ValidatorStatus(**validator) @@ -454,6 +458,118 @@ def __str__(self): return f"ConsumerStatus<{self.active}, {self.errors}, {self.validator}>" +class KgoVerifierProducer(KgoVerifierService): + def __init__(self, + context, + redpanda, + topic, + msg_size, + msg_count, + custom_node=None, + batch_max_bytes=None, + debug_logs=False, + trace_logs=False, + fake_timestamp_ms=None, + use_transactions=False, + transaction_abort_rate=None, + msgs_per_transaction=None, + rate_limit_bps=None, + key_set_cardinality=None): + super(KgoVerifierProducer, + self).__init__(context, redpanda, topic, msg_size, custom_node, + debug_logs, trace_logs) + self._msg_count = msg_count + self._status = ProduceStatus() + self._batch_max_bytes = batch_max_bytes + self._fake_timestamp_ms = fake_timestamp_ms + self._use_transactions = use_transactions + self._transaction_abort_rate = transaction_abort_rate + self._msgs_per_transaction = msgs_per_transaction + self._rate_limit_bps = rate_limit_bps + self._key_set_cardinality = key_set_cardinality + + @property + def produce_status(self): + return self._status + + def wait_node(self, node, timeout_sec=None): + if not self._status_thread: + return True + + self.logger.debug(f"{self.who_am_i()} wait: awaiting message count") + try: + self._redpanda.wait_until(lambda: self._status_thread.errored or + self._status.acked >= self._msg_count, + timeout_sec=timeout_sec, + backoff_sec=self._status_thread.INTERVAL) + except: + self.stop_node(node) + raise + + self._status_thread.raise_on_error() + + if self._status.bad_offsets != 0: + # This either means that the test sent multiple producers' traffic to + # the same topic, or that Redpanda showed a buggy behavior with + # idempotency: producer records should always land at the next offset + # after the last record they wrote. + raise RuntimeError( + f"{self.who_am_i()} possible idempotency bug: {self._status}") + + return super().wait_node(node, timeout_sec=timeout_sec) + + def wait_for_acks(self, count, timeout_sec, backoff_sec): + self._redpanda.wait_until( + lambda: self._status_thread.errored or self._status.acked >= count, + timeout_sec=timeout_sec, + backoff_sec=backoff_sec) + self._status_thread.raise_on_error() + + def wait_for_offset_map(self): + # Producer worker aims to checkpoint every 5 seconds, so we should see this promptly. + self._redpanda.wait_until(lambda: self._status_thread.errored or all( + node.account.exists(f"valid_offsets_{self._topic}.json") + for node in self.nodes), + timeout_sec=15, + backoff_sec=1) + self._status_thread.raise_on_error() + + def is_complete(self): + return self._status.acked >= self._msg_count + + def start_node(self, node, clean=False): + if clean: + self.clean_node(node) + + cmd = f"{TESTS_DIR}/kgo-verifier --brokers {self._redpanda.brokers()} --topic {self._topic} --msg_size {self._msg_size} --produce_msgs {self._msg_count} --rand_read_msgs 0 --seq_read=0 --client-name {self.who_am_i()}" + + if self._batch_max_bytes is not None: + cmd = cmd + f' --batch_max_bytes {self._batch_max_bytes}' + + if self._fake_timestamp_ms is not None: + cmd = cmd + f' --fake-timestamp-ms {self._fake_timestamp_ms}' + + if self._use_transactions: + cmd = cmd + f' --use-transactions' + + if self._msgs_per_transaction is not None: + cmd = cmd + f' --msgs-per-transaction {self._msgs_per_transaction}' + + if self._transaction_abort_rate is not None: + cmd = cmd + f' --transaction-abort-rate {self._transaction_abort_rate}' + + if self._rate_limit_bps is not None: + cmd = cmd + f' --produce-throughput-bps {self._rate_limit_bps}' + + if self._key_set_cardinality is not None: + cmd += f" --key-set-cardinality {self._key_set_cardinality}" + + self.spawn(cmd, node) + + self._status_thread = StatusThread(self, node, ProduceStatus) + self._status_thread.start() + + class KgoVerifierSeqConsumer(KgoVerifierService): def __init__( self, @@ -466,7 +582,8 @@ def __init__( nodes=None, debug_logs=False, trace_logs=False, - loop=True): + loop=True, + producer: Optional[KgoVerifierProducer] = None): super(KgoVerifierSeqConsumer, self).__init__(context, redpanda, topic, msg_size, nodes, debug_logs, trace_logs) @@ -474,6 +591,7 @@ def __init__( self._max_throughput_mb = max_throughput_mb self._status = ConsumerStatus() self._loop = loop + self._producer = producer @property def consumer_status(self): @@ -494,6 +612,36 @@ def start_node(self, node, clean=False): self._status_thread = StatusThread(self, node, ConsumerStatus) self._status_thread.start() + def wait_node(self, node, timeout_sec=None): + if self._producer: + + def consumed_whole_log(): + producer_done = self._producer._status.sent == self._producer._msg_count + if not producer_done: + self.logger.debug( + f"Producer {self._producer.who_am_i()} hasn't finished yet" + ) + return False + + consumed = self._status.validator.max_offsets_consumed + produced = self._producer._status.max_offsets_produced + if consumed != produced: + self.logger.debug( + f"Consumer {self.who_am_i()} hasn't read all produced data yet: {consumed=} {produced=}" + ) + return False + return True + + wait_until( + consumed_whole_log, + timeout_sec=timeout_sec, + backoff_sec=2, + err_msg= + f"Consumer hasn't read all produced data: consumed={self._status.validator.max_offsets_consumed} produced={self._producer._status.max_offsets_produced}" + ) + + return super().wait_node(node, timeout_sec=timeout_sec) + class KgoVerifierRandomConsumer(KgoVerifierService): def __init__(self, @@ -572,17 +720,21 @@ def start_node(self, node, clean=False): class ProduceStatus: def __init__(self, + topic=None, sent=0, acked=0, bad_offsets=0, + max_offsets_produced=dict(), restarts=0, latency=None, active=False, failed_transactions=0, aborted_transaction_msgs=0): + self.topic = topic self.sent = sent self.acked = acked self.bad_offsets = bad_offsets + self.max_offsets_produced = max_offsets_produced self.restarts = restarts if latency is None: latency = {'p50': 0, 'p90': 0, 'p99': 0} @@ -594,115 +746,3 @@ def __init__(self, def __str__(self): l = self.latency return f"ProduceStatus<{self.sent} {self.acked} {self.bad_offsets} {self.restarts} {self.failed_transactions} {self.aborted_transaction_messages} {l['p50']}/{l['p90']}/{l['p99']}>" - - -class KgoVerifierProducer(KgoVerifierService): - def __init__(self, - context, - redpanda, - topic, - msg_size, - msg_count, - custom_node=None, - batch_max_bytes=None, - debug_logs=False, - trace_logs=False, - fake_timestamp_ms=None, - use_transactions=False, - transaction_abort_rate=None, - msgs_per_transaction=None, - rate_limit_bps=None, - key_set_cardinality=None): - super(KgoVerifierProducer, - self).__init__(context, redpanda, topic, msg_size, custom_node, - debug_logs, trace_logs) - self._msg_count = msg_count - self._status = ProduceStatus() - self._batch_max_bytes = batch_max_bytes - self._fake_timestamp_ms = fake_timestamp_ms - self._use_transactions = use_transactions - self._transaction_abort_rate = transaction_abort_rate - self._msgs_per_transaction = msgs_per_transaction - self._rate_limit_bps = rate_limit_bps - self._key_set_cardinality = key_set_cardinality - - @property - def produce_status(self): - return self._status - - def wait_node(self, node, timeout_sec=None): - if not self._status_thread: - return True - - self.logger.debug(f"{self.who_am_i()} wait: awaiting message count") - try: - self._redpanda.wait_until(lambda: self._status_thread.errored or - self._status.acked >= self._msg_count, - timeout_sec=timeout_sec, - backoff_sec=self._status_thread.INTERVAL) - except: - self.stop_node(node) - raise - - self._status_thread.raise_on_error() - - if self._status.bad_offsets != 0: - # This either means that the test sent multiple producers' traffic to - # the same topic, or that Redpanda showed a buggy behavior with - # idempotency: producer records should always land at the next offset - # after the last record they wrote. - raise RuntimeError( - f"{self.who_am_i()} possible idempotency bug: {self._status}") - - return super().wait_node(node, timeout_sec=timeout_sec) - - def wait_for_acks(self, count, timeout_sec, backoff_sec): - self._redpanda.wait_until( - lambda: self._status_thread.errored or self._status.acked >= count, - timeout_sec=timeout_sec, - backoff_sec=backoff_sec) - self._status_thread.raise_on_error() - - def wait_for_offset_map(self): - # Producer worker aims to checkpoint every 5 seconds, so we should see this promptly. - self._redpanda.wait_until(lambda: self._status_thread.errored or all( - node.account.exists(f"valid_offsets_{self._topic}.json") - for node in self.nodes), - timeout_sec=15, - backoff_sec=1) - self._status_thread.raise_on_error() - - def is_complete(self): - return self._status.acked >= self._msg_count - - def start_node(self, node, clean=False): - if clean: - self.clean_node(node) - - cmd = f"{TESTS_DIR}/kgo-verifier --brokers {self._redpanda.brokers()} --topic {self._topic} --msg_size {self._msg_size} --produce_msgs {self._msg_count} --rand_read_msgs 0 --seq_read=0 --client-name {self.who_am_i()}" - - if self._batch_max_bytes is not None: - cmd = cmd + f' --batch_max_bytes {self._batch_max_bytes}' - - if self._fake_timestamp_ms is not None: - cmd = cmd + f' --fake-timestamp-ms {self._fake_timestamp_ms}' - - if self._use_transactions: - cmd = cmd + f' --use-transactions' - - if self._msgs_per_transaction is not None: - cmd = cmd + f' --msgs-per-transaction {self._msgs_per_transaction}' - - if self._transaction_abort_rate is not None: - cmd = cmd + f' --transaction-abort-rate {self._transaction_abort_rate}' - - if self._rate_limit_bps is not None: - cmd = cmd + f' --produce-throughput-bps {self._rate_limit_bps}' - - if self._key_set_cardinality is not None: - cmd += f" --key-set-cardinality {self._key_set_cardinality}" - - self.spawn(cmd, node) - - self._status_thread = StatusThread(self, node, ProduceStatus) - self._status_thread.start() From a90ab2e2b2f6f7207cbdda0d6dcaa0319652490b Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 28 Jun 2023 15:48:29 +0100 Subject: [PATCH 2/5] rptest: consume full log in timing stress tests --- tests/rptest/tests/cloud_storage_timing_stress_test.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/rptest/tests/cloud_storage_timing_stress_test.py b/tests/rptest/tests/cloud_storage_timing_stress_test.py index 0820ae49f7d5..11c3113b8f64 100644 --- a/tests/rptest/tests/cloud_storage_timing_stress_test.py +++ b/tests/rptest/tests/cloud_storage_timing_stress_test.py @@ -288,7 +288,8 @@ def _create_producer(self) -> KgoVerifierProducer: debug_logs=True, trace_logs=True) - def _create_consumer(self) -> KgoVerifierSeqConsumer: + def _create_consumer( + self, producer: KgoVerifierProducer) -> KgoVerifierSeqConsumer: bps = self.produce_byte_rate_per_ntp * self.topics[0].partition_count bytes_count = bps * self.target_runtime msg_count = bytes_count // self.message_size @@ -302,7 +303,8 @@ def _create_consumer(self) -> KgoVerifierSeqConsumer: msg_size=self.message_size, max_throughput_mb=int(bps // self.mib), debug_logs=True, - trace_logs=True) + trace_logs=True, + producer=producer) def _all_uploads_done(self): topic_description = self.rpk.describe_topic(self.topic) @@ -398,7 +400,7 @@ def prologue(self, cleanup_policy): cloud_storage_status_endpoint_check) self.producer = self._create_producer() - self.consumer = self._create_consumer() + self.consumer = self._create_consumer(self.producer) self.producer.start() From 2cce63547fa5326614ffd45605a00c51e6d4475e Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Mon, 3 Jul 2023 10:25:05 +0100 Subject: [PATCH 3/5] rptest: upload manifests more eagerly in timing test This commit updates the SI configuration usesd for the CloudStorageTimingStressTest(s) to upload the manifest at each oportunity. The change should make the test more robust. --- tests/rptest/services/redpanda.py | 26 ++++++++++++++++--- .../tests/cloud_storage_timing_stress_test.py | 10 +++++-- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index 08de2673441e..4a5217fbd0e9 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -2289,6 +2289,19 @@ def get_objects_from_si(self): return self.cloud_storage_client.list_objects( self._si_settings.cloud_storage_bucket) + def set_cluster_config_to_null(self, + name: str, + expect_restart: bool = False, + admin_client: Optional[Admin] = None, + timeout: int = 10): + if admin_client is None: + admin_client = self._admin + + patch_result = admin_client.patch_cluster_config(upsert={name: None}) + new_version = patch_result['config_version'] + + self._wait_for_config_version(new_version, expect_restart, timeout) + def set_cluster_config(self, values: dict, expect_restart: bool = False, @@ -2311,10 +2324,15 @@ def set_cluster_config(self, remove=[k for k, v in values.items() if v is None]) new_version = patch_result['config_version'] + self._wait_for_config_version(new_version, expect_restart, timeout) + + def _wait_for_config_version(self, config_version, expect_restart: bool, + timeout: int): def is_ready(): - status = admin_client.get_cluster_config_status( + status = self._admin.get_cluster_config_status( node=self.controller()) - ready = all([n['config_version'] >= new_version for n in status]) + ready = all( + [n['config_version'] >= config_version for n in status]) return ready, status @@ -2325,8 +2343,8 @@ def is_ready(): is_ready, timeout_sec=timeout, backoff_sec=0.5, - err_msg=f"Config status versions did not converge on {new_version}" - ) + err_msg= + f"Config status versions did not converge on {config_version}") any_restarts = any(n['restart'] for n in config_status) if any_restarts and expect_restart: diff --git a/tests/rptest/tests/cloud_storage_timing_stress_test.py b/tests/rptest/tests/cloud_storage_timing_stress_test.py index 11c3113b8f64..34e1d950811e 100644 --- a/tests/rptest/tests/cloud_storage_timing_stress_test.py +++ b/tests/rptest/tests/cloud_storage_timing_stress_test.py @@ -46,7 +46,10 @@ def cloud_storage_usage_check(test): # The usage inferred from the uploaded manifest # lags behind the actual reported usage. For this reason, # we maintain a sliding window of reported usages and check whether - # the manifest inferred usage can be found in it. + # the manifest inferred usage can be found in it. A deque size of 10 + # gives us a look-behind window of roughly 2 seconds (10 * 0.2). + # This should be fine since the manifest after every batch of segment uploads + # and on prior to each GC operation (housekeeping happens every 1 second). reported_usage_sliding_window = deque(maxlen=10) def check(): @@ -249,7 +252,7 @@ def __init__(self, test_context): log_segment_size=self.log_segment_size, cloud_storage_housekeeping_interval_ms=1000, cloud_storage_spillover_manifest_max_segments=10, - fast_uploads=True) + cloud_storage_segment_max_upload_interval_sec=10) extra_rp_conf = dict( log_compaction_interval_ms=1000, @@ -387,6 +390,9 @@ def get_revision(): retry_on_exc=True) def prologue(self, cleanup_policy): + self.redpanda.set_cluster_config_to_null( + "cloud_storage_manifest_max_upload_interval_sec") + self.topic_spec.cleanup_policy = cleanup_policy self.topics = [self.topic_spec] self._create_initial_topics() From 2893f9a18b59a80861d36b7924febf8bebb7b78c Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 28 Jun 2023 15:48:53 +0100 Subject: [PATCH 4/5] dt/docker: bump kgo-verifier version for HWMs --- tests/docker/ducktape-deps/kgo-verifier | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/docker/ducktape-deps/kgo-verifier b/tests/docker/ducktape-deps/kgo-verifier index 3fa71b7f1154..6867195aad25 100644 --- a/tests/docker/ducktape-deps/kgo-verifier +++ b/tests/docker/ducktape-deps/kgo-verifier @@ -2,6 +2,6 @@ set -e git -C /opt clone https://github.com/redpanda-data/kgo-verifier.git cd /opt/kgo-verifier -git reset --hard 5b1b6b6b802a30962963cca8b364bee148fee515 +git reset --hard 3508c6e64e1b9ef92dc325a27cd62c16b411f0e4 go mod tidy make From fa49a0484da1e3dac731a3ed1d04d916f4a67c87 Mon Sep 17 00:00:00 2001 From: Abhijat Malviya Date: Wed, 5 Jul 2023 17:10:55 +0530 Subject: [PATCH 5/5] ducktape: Forward admin client to helper if overridden --- tests/rptest/services/redpanda.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index 4a5217fbd0e9..0cc62e186ac5 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -2324,12 +2324,20 @@ def set_cluster_config(self, remove=[k for k, v in values.items() if v is None]) new_version = patch_result['config_version'] - self._wait_for_config_version(new_version, expect_restart, timeout) + self._wait_for_config_version(new_version, + expect_restart, + timeout, + admin_client=admin_client) + + def _wait_for_config_version(self, + config_version, + expect_restart: bool, + timeout: int, + admin_client: Optional[Admin] = None): + admin_client = admin_client or self._admin - def _wait_for_config_version(self, config_version, expect_restart: bool, - timeout: int): def is_ready(): - status = self._admin.get_cluster_config_status( + status = admin_client.get_cluster_config_status( node=self.controller()) ready = all( [n['config_version'] >= config_version for n in status]) @@ -2352,9 +2360,9 @@ def is_ready(): # Having disrupted the cluster with a restart, wait for the controller # to be available again before returning to the caller, so that they do # not have to worry about subsequent configuration actions failing. - self._admin.await_stable_leader(namespace="redpanda", - topic="controller", - partition=0) + admin_client.await_stable_leader(namespace="redpanda", + topic="controller", + partition=0) elif any_restarts: raise AssertionError( "Nodes report restart required but expect_restart is False")