Skip to content

Commit

Permalink
rptest: upgrade kgo
Browse files Browse the repository at this point in the history
The new version contains additional features to facilitate testing of
the write caching feature which is in development. Updated existing
services to support these.

Fixed some type hinting problems along the way.
  • Loading branch information
nvartolomei committed Apr 4, 2024
1 parent 126bf56 commit 8d9d677
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 9 deletions.
2 changes: 1 addition & 1 deletion tests/docker/ducktape-deps/kgo-verifier
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
set -e
git -C /opt clone https://github.com/redpanda-data/kgo-verifier.git
cd /opt/kgo-verifier
git reset --hard bb6953c81662237c9a8fb42ee90cc870df258907
git reset --hard 8f4fdb77f2c6173d8e1b7020c9899601a441d0d6
go mod tidy
make
61 changes: 53 additions & 8 deletions tests/rptest/services/kgo_verifier_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
import signal
import threading
import requests
from typing import Any, Optional
from typing import Any, List, Optional

from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from ducktape.cluster.remoteaccount import RemoteCommandError

from rptest.services.redpanda import RedpandaService

# Install location, specified by Dockerfile or AMI
TESTS_DIR = os.path.join("/opt", "kgo-verifier")

Expand Down Expand Up @@ -60,7 +62,7 @@ def __init__(self,
assert not self.nodes
self.nodes = custom_node

self._redpanda = redpanda
self._redpanda: RedpandaService = redpanda
self._topic = topic
self._msg_size = msg_size
self._pid = None
Expand Down Expand Up @@ -448,7 +450,9 @@ class ValidatorStatus:
differ per-worker, although at time of writing they don't.
"""
def __init__(self, name: str, valid_reads: int, invalid_reads: int,
out_of_scope_invalid_reads: int, max_offsets_consumed: int):
out_of_scope_invalid_reads: int,
max_offsets_consumed: Optional[int],
lost_offsets: Optional[List[int]]):
# Validator name is just a unique name per worker thread in kgo-verifier: useful in logging
# but we mostly don't care
self.name = name
Expand All @@ -457,6 +461,7 @@ def __init__(self, name: str, valid_reads: int, invalid_reads: int,
self.invalid_reads = invalid_reads
self.out_of_scope_invalid_reads = out_of_scope_invalid_reads
self.max_offsets_consumed = max_offsets_consumed
self.lost_offsets = lost_offsets

@property
def total_reads(self):
Expand All @@ -468,17 +473,25 @@ def merge(self, rhs: ValidatorStatus):
# Clear name if we are merging multiple statuses together, to avoid confusion.
self.name = ""

# Clear other fields we aren't interested in, to avoid confusion.
self.max_offsets_consumed = None
self.lost_offsets = None

self.valid_reads += rhs.valid_reads
self.invalid_reads += rhs.invalid_reads
self.out_of_scope_invalid_reads += rhs.out_of_scope_invalid_reads

def __str__(self):
return f"ValidatorStatus<{self.valid_reads} {self.invalid_reads} {self.out_of_scope_invalid_reads}>"
return f"ValidatorStatus<" \
f"valid_reads={self.valid_reads}, " \
f"invalid_reads={self.invalid_reads}, " \
f"out_of_scope_invalid_reads={self.out_of_scope_invalid_reads}, " \
f"lost_offsets={self.lost_offsets}>"


class ConsumerStatus:
def __init__(self,
topic: str = None,
topic: Optional[str] = None,
validator: dict[str, Any] | None = None,
errors: int = 0,
active: bool = True):
Expand All @@ -493,7 +506,8 @@ def __init__(self,
'invalid_reads': 0,
'out_of_scope_invalid_reads': 0,
'name': "",
'max_offsets_consumed': dict()
'max_offsets_consumed': dict(),
'lost_offsets': dict()
}

self.validator = ValidatorStatus(**validator)
Expand Down Expand Up @@ -530,7 +544,9 @@ def __init__(self,
username=None,
password=None,
enable_tls=False,
msgs_per_producer_id=None):
msgs_per_producer_id=None,
max_buffered_records=None,
tolerate_data_loss=False):
super(KgoVerifierProducer,
self).__init__(context, redpanda, topic, msg_size, custom_node,
debug_logs, trace_logs, username, password,
Expand All @@ -546,6 +562,8 @@ def __init__(self,
self._rate_limit_bps = rate_limit_bps
self._key_set_cardinality = key_set_cardinality
self._msgs_per_producer_id = msgs_per_producer_id
self._max_buffered_records = max_buffered_records
self._tolerate_data_loss = tolerate_data_loss

@property
def produce_status(self):
Expand Down Expand Up @@ -638,6 +656,13 @@ def start_node(self, node, clean=False):
cmd += f" --key-set-cardinality {self._key_set_cardinality}"
if self._msgs_per_producer_id is not None:
cmd += f" --msgs-per-producer-id {self._msgs_per_producer_id}"

if self._max_buffered_records is not None:
cmd += f" --max-buffered-records {self._max_buffered_records}"

if self._tolerate_data_loss:
cmd += " --tolerate-data-loss"

self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ProduceStatus)
Expand All @@ -657,6 +682,8 @@ def __init__(
debug_logs=False,
trace_logs=False,
loop=True,
continuous=False,
tolerate_data_loss=False,
producer: Optional[KgoVerifierProducer] = None,
username: Optional[str] = None,
password: Optional[str] = None,
Expand All @@ -669,6 +696,8 @@ def __init__(
self._max_throughput_mb = max_throughput_mb
self._status = ConsumerStatus()
self._loop = loop
self._continuous = continuous
self._tolerate_data_loss = tolerate_data_loss
self._producer = producer

@property
Expand All @@ -691,6 +720,10 @@ def start_node(self, node, clean=False):
cmd += f" --seq_read_msgs {self._max_msgs}"
if self._max_throughput_mb is not None:
cmd += f" --consume-throughput-mb {self._max_throughput_mb}"
if self._continuous:
cmd += " --continuous"
if self._tolerate_data_loss:
cmd += " --tolerate-data-loss"
self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ConsumerStatus)
Expand Down Expand Up @@ -784,14 +817,20 @@ def __init__(self,
trace_logs=False,
username=None,
password=None,
enable_tls=False):
enable_tls=False,
continuous=False,
tolerate_data_loss=False,
group_name=None):
super().__init__(context, redpanda, topic, msg_size, nodes, debug_logs,
trace_logs, username, password, enable_tls)

self._readers = readers
self._loop = loop
self._max_msgs = max_msgs
self._max_throughput_mb = max_throughput_mb
self._group_name = group_name
self._continuous = continuous
self._tolerate_data_loss = tolerate_data_loss
self._status = ConsumerStatus()

@property
Expand All @@ -815,6 +854,12 @@ def start_node(self, node, clean=False):
cmd += f" --seq_read_msgs {self._max_msgs}"
if self._max_throughput_mb is not None:
cmd += f" --consume-throughput-mb {self._max_throughput_mb}"
if self._continuous:
cmd += " --continuous"
if self._tolerate_data_loss:
cmd += " --tolerate-data-loss"
if self._group_name is not None:
cmd += f" --consumer_group_name {self._group_name}"
self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ConsumerStatus)
Expand Down

0 comments on commit 8d9d677

Please sign in to comment.