Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rptest: upgrade kgo #17588

Merged
merged 1 commit into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/docker/ducktape-deps/kgo-verifier
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
set -e
git -C /opt clone https://github.com/redpanda-data/kgo-verifier.git
cd /opt/kgo-verifier
git reset --hard bb6953c81662237c9a8fb42ee90cc870df258907
git reset --hard 8f4fdb77f2c6173d8e1b7020c9899601a441d0d6
go mod tidy
make
61 changes: 53 additions & 8 deletions tests/rptest/services/kgo_verifier_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
import signal
import threading
import requests
from typing import Any, Optional
from typing import Any, List, Optional

from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from ducktape.cluster.remoteaccount import RemoteCommandError

from rptest.services.redpanda import RedpandaService

# Install location, specified by Dockerfile or AMI
TESTS_DIR = os.path.join("/opt", "kgo-verifier")

Expand Down Expand Up @@ -60,7 +62,7 @@ def __init__(self,
assert not self.nodes
self.nodes = custom_node

self._redpanda = redpanda
self._redpanda: RedpandaService = redpanda
self._topic = topic
self._msg_size = msg_size
self._pid = None
Expand Down Expand Up @@ -448,7 +450,9 @@ class ValidatorStatus:
differ per-worker, although at time of writing they don't.
"""
def __init__(self, name: str, valid_reads: int, invalid_reads: int,
out_of_scope_invalid_reads: int, max_offsets_consumed: int):
out_of_scope_invalid_reads: int,
max_offsets_consumed: Optional[int],
lost_offsets: Optional[List[int]]):
# Validator name is just a unique name per worker thread in kgo-verifier: useful in logging
# but we mostly don't care
self.name = name
Expand All @@ -457,6 +461,7 @@ def __init__(self, name: str, valid_reads: int, invalid_reads: int,
self.invalid_reads = invalid_reads
self.out_of_scope_invalid_reads = out_of_scope_invalid_reads
self.max_offsets_consumed = max_offsets_consumed
self.lost_offsets = lost_offsets

@property
def total_reads(self):
Expand All @@ -468,17 +473,25 @@ def merge(self, rhs: ValidatorStatus):
# Clear name if we are merging multiple statuses together, to avoid confusion.
self.name = ""

# Clear other fields we aren't interested in, to avoid confusion.
self.max_offsets_consumed = None
self.lost_offsets = None

self.valid_reads += rhs.valid_reads
self.invalid_reads += rhs.invalid_reads
self.out_of_scope_invalid_reads += rhs.out_of_scope_invalid_reads

def __str__(self):
return f"ValidatorStatus<{self.valid_reads} {self.invalid_reads} {self.out_of_scope_invalid_reads}>"
return f"ValidatorStatus<" \
f"valid_reads={self.valid_reads}, " \
f"invalid_reads={self.invalid_reads}, " \
f"out_of_scope_invalid_reads={self.out_of_scope_invalid_reads}, " \
f"lost_offsets={self.lost_offsets}>"


class ConsumerStatus:
def __init__(self,
topic: str = None,
topic: Optional[str] = None,
validator: dict[str, Any] | None = None,
errors: int = 0,
active: bool = True):
Expand All @@ -493,7 +506,8 @@ def __init__(self,
'invalid_reads': 0,
'out_of_scope_invalid_reads': 0,
'name': "",
'max_offsets_consumed': dict()
'max_offsets_consumed': dict(),
'lost_offsets': dict()
}

self.validator = ValidatorStatus(**validator)
Expand Down Expand Up @@ -530,7 +544,9 @@ def __init__(self,
username=None,
password=None,
enable_tls=False,
msgs_per_producer_id=None):
msgs_per_producer_id=None,
max_buffered_records=None,
tolerate_data_loss=False):
super(KgoVerifierProducer,
self).__init__(context, redpanda, topic, msg_size, custom_node,
debug_logs, trace_logs, username, password,
Expand All @@ -546,6 +562,8 @@ def __init__(self,
self._rate_limit_bps = rate_limit_bps
self._key_set_cardinality = key_set_cardinality
self._msgs_per_producer_id = msgs_per_producer_id
self._max_buffered_records = max_buffered_records
self._tolerate_data_loss = tolerate_data_loss

@property
def produce_status(self):
Expand Down Expand Up @@ -638,6 +656,13 @@ def start_node(self, node, clean=False):
cmd += f" --key-set-cardinality {self._key_set_cardinality}"
if self._msgs_per_producer_id is not None:
cmd += f" --msgs-per-producer-id {self._msgs_per_producer_id}"

if self._max_buffered_records is not None:
cmd += f" --max-buffered-records {self._max_buffered_records}"

if self._tolerate_data_loss:
cmd += " --tolerate-data-loss"

self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ProduceStatus)
Expand All @@ -657,6 +682,8 @@ def __init__(
debug_logs=False,
trace_logs=False,
loop=True,
continuous=False,
tolerate_data_loss=False,
producer: Optional[KgoVerifierProducer] = None,
username: Optional[str] = None,
password: Optional[str] = None,
Expand All @@ -669,6 +696,8 @@ def __init__(
self._max_throughput_mb = max_throughput_mb
self._status = ConsumerStatus()
self._loop = loop
self._continuous = continuous
self._tolerate_data_loss = tolerate_data_loss
self._producer = producer

@property
Expand All @@ -691,6 +720,10 @@ def start_node(self, node, clean=False):
cmd += f" --seq_read_msgs {self._max_msgs}"
if self._max_throughput_mb is not None:
cmd += f" --consume-throughput-mb {self._max_throughput_mb}"
if self._continuous:
cmd += " --continuous"
if self._tolerate_data_loss:
cmd += " --tolerate-data-loss"
self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ConsumerStatus)
Expand Down Expand Up @@ -784,14 +817,20 @@ def __init__(self,
trace_logs=False,
username=None,
password=None,
enable_tls=False):
enable_tls=False,
continuous=False,
tolerate_data_loss=False,
group_name=None):
super().__init__(context, redpanda, topic, msg_size, nodes, debug_logs,
trace_logs, username, password, enable_tls)

self._readers = readers
self._loop = loop
self._max_msgs = max_msgs
self._max_throughput_mb = max_throughput_mb
self._group_name = group_name
self._continuous = continuous
self._tolerate_data_loss = tolerate_data_loss
self._status = ConsumerStatus()

@property
Expand All @@ -815,6 +854,12 @@ def start_node(self, node, clean=False):
cmd += f" --seq_read_msgs {self._max_msgs}"
if self._max_throughput_mb is not None:
cmd += f" --consume-throughput-mb {self._max_throughput_mb}"
if self._continuous:
cmd += " --continuous"
if self._tolerate_data_loss:
cmd += " --tolerate-data-loss"
if self._group_name is not None:
cmd += f" --consumer_group_name {self._group_name}"
self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ConsumerStatus)
Expand Down
Loading