Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rptest: perform full log reads with KgoVerifierSeqConsumer #11759

Merged
merged 5 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/docker/ducktape-deps/kgo-verifier
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
set -e
git -C /opt clone https://github.com/redpanda-data/kgo-verifier.git
cd /opt/kgo-verifier
git reset --hard 5b1b6b6b802a30962963cca8b364bee148fee515
git reset --hard 3508c6e64e1b9ef92dc325a27cd62c16b411f0e4
go mod tidy
make
272 changes: 156 additions & 116 deletions tests/rptest/services/kgo_verifier_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import time
import threading
import requests
from typing import Optional

from ducktape.services.service import Service
from ducktape.utils.util import wait_until
Expand Down Expand Up @@ -313,6 +314,7 @@ def is_ready(self):
return r.status_code == 200

def _ingest_status(self, worker_statuses):
self.logger.debug(f"{self.who_am_i} status: {worker_statuses}")
reduced = self._status_cls(**worker_statuses[0])
for s in worker_statuses[1:]:
reduced.merge(self._status_cls(**s))
Expand Down Expand Up @@ -399,14 +401,15 @@ class ValidatorStatus:
differ per-worker, although at time of writing they don't.
"""
def __init__(self, name, valid_reads, invalid_reads,
out_of_scope_invalid_reads):
out_of_scope_invalid_reads, max_offsets_consumed):
# Validator name is just a unique name per worker thread in kgo-verifier: useful in logging
# but we mostly don't care
self.name = name

self.valid_reads = valid_reads
self.invalid_reads = invalid_reads
self.out_of_scope_invalid_reads = out_of_scope_invalid_reads
self.max_offsets_consumed = max_offsets_consumed

@property
def total_reads(self):
Expand All @@ -427,7 +430,7 @@ def __str__(self):


class ConsumerStatus:
def __init__(self, validator=None, errors=0, active=True):
def __init__(self, topic=None, validator=None, errors=0, active=True):
"""
`active` defaults to True, because we use it for deciding when to drop out in `wait()` -- the initial
state of a worker should be presumed that it is busy, and we must wait to see it go `active=False`
Expand All @@ -438,7 +441,8 @@ def __init__(self, validator=None, errors=0, active=True):
'valid_reads': 0,
'invalid_reads': 0,
'out_of_scope_invalid_reads': 0,
'name': ""
'name': "",
'max_offsets_consumed': dict()
}

self.validator = ValidatorStatus(**validator)
Expand All @@ -454,6 +458,118 @@ def __str__(self):
return f"ConsumerStatus<{self.active}, {self.errors}, {self.validator}>"


class KgoVerifierProducer(KgoVerifierService):
def __init__(self,
context,
redpanda,
topic,
msg_size,
msg_count,
custom_node=None,
batch_max_bytes=None,
debug_logs=False,
trace_logs=False,
fake_timestamp_ms=None,
use_transactions=False,
transaction_abort_rate=None,
msgs_per_transaction=None,
rate_limit_bps=None,
key_set_cardinality=None):
super(KgoVerifierProducer,
self).__init__(context, redpanda, topic, msg_size, custom_node,
debug_logs, trace_logs)
self._msg_count = msg_count
self._status = ProduceStatus()
self._batch_max_bytes = batch_max_bytes
self._fake_timestamp_ms = fake_timestamp_ms
self._use_transactions = use_transactions
self._transaction_abort_rate = transaction_abort_rate
self._msgs_per_transaction = msgs_per_transaction
self._rate_limit_bps = rate_limit_bps
self._key_set_cardinality = key_set_cardinality

@property
def produce_status(self):
return self._status

def wait_node(self, node, timeout_sec=None):
if not self._status_thread:
return True

self.logger.debug(f"{self.who_am_i()} wait: awaiting message count")
try:
self._redpanda.wait_until(lambda: self._status_thread.errored or
self._status.acked >= self._msg_count,
timeout_sec=timeout_sec,
backoff_sec=self._status_thread.INTERVAL)
except:
self.stop_node(node)
raise

self._status_thread.raise_on_error()

if self._status.bad_offsets != 0:
# This either means that the test sent multiple producers' traffic to
# the same topic, or that Redpanda showed a buggy behavior with
# idempotency: producer records should always land at the next offset
# after the last record they wrote.
raise RuntimeError(
f"{self.who_am_i()} possible idempotency bug: {self._status}")

return super().wait_node(node, timeout_sec=timeout_sec)

def wait_for_acks(self, count, timeout_sec, backoff_sec):
self._redpanda.wait_until(
lambda: self._status_thread.errored or self._status.acked >= count,
timeout_sec=timeout_sec,
backoff_sec=backoff_sec)
self._status_thread.raise_on_error()

def wait_for_offset_map(self):
# Producer worker aims to checkpoint every 5 seconds, so we should see this promptly.
self._redpanda.wait_until(lambda: self._status_thread.errored or all(
node.account.exists(f"valid_offsets_{self._topic}.json")
for node in self.nodes),
timeout_sec=15,
backoff_sec=1)
self._status_thread.raise_on_error()

def is_complete(self):
return self._status.acked >= self._msg_count

def start_node(self, node, clean=False):
if clean:
self.clean_node(node)

cmd = f"{TESTS_DIR}/kgo-verifier --brokers {self._redpanda.brokers()} --topic {self._topic} --msg_size {self._msg_size} --produce_msgs {self._msg_count} --rand_read_msgs 0 --seq_read=0 --client-name {self.who_am_i()}"

if self._batch_max_bytes is not None:
cmd = cmd + f' --batch_max_bytes {self._batch_max_bytes}'

if self._fake_timestamp_ms is not None:
cmd = cmd + f' --fake-timestamp-ms {self._fake_timestamp_ms}'

if self._use_transactions:
cmd = cmd + f' --use-transactions'

if self._msgs_per_transaction is not None:
cmd = cmd + f' --msgs-per-transaction {self._msgs_per_transaction}'

if self._transaction_abort_rate is not None:
cmd = cmd + f' --transaction-abort-rate {self._transaction_abort_rate}'

if self._rate_limit_bps is not None:
cmd = cmd + f' --produce-throughput-bps {self._rate_limit_bps}'

if self._key_set_cardinality is not None:
cmd += f" --key-set-cardinality {self._key_set_cardinality}"

self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ProduceStatus)
self._status_thread.start()


class KgoVerifierSeqConsumer(KgoVerifierService):
def __init__(
self,
Expand All @@ -466,14 +582,16 @@ def __init__(
nodes=None,
debug_logs=False,
trace_logs=False,
loop=True):
loop=True,
producer: Optional[KgoVerifierProducer] = None):
super(KgoVerifierSeqConsumer,
self).__init__(context, redpanda, topic, msg_size, nodes,
debug_logs, trace_logs)
self._max_msgs = max_msgs
self._max_throughput_mb = max_throughput_mb
self._status = ConsumerStatus()
self._loop = loop
self._producer = producer

@property
def consumer_status(self):
Expand All @@ -494,6 +612,36 @@ def start_node(self, node, clean=False):
self._status_thread = StatusThread(self, node, ConsumerStatus)
self._status_thread.start()

def wait_node(self, node, timeout_sec=None):
if self._producer:

def consumed_whole_log():
producer_done = self._producer._status.sent == self._producer._msg_count
if not producer_done:
self.logger.debug(
f"Producer {self._producer.who_am_i()} hasn't finished yet"
)
return False

consumed = self._status.validator.max_offsets_consumed
produced = self._producer._status.max_offsets_produced
if consumed != produced:
self.logger.debug(
f"Consumer {self.who_am_i()} hasn't read all produced data yet: {consumed=} {produced=}"
)
return False
return True

wait_until(
consumed_whole_log,
timeout_sec=timeout_sec,
backoff_sec=2,
err_msg=
f"Consumer hasn't read all produced data: consumed={self._status.validator.max_offsets_consumed} produced={self._producer._status.max_offsets_produced}"
)

return super().wait_node(node, timeout_sec=timeout_sec)


class KgoVerifierRandomConsumer(KgoVerifierService):
def __init__(self,
Expand Down Expand Up @@ -572,17 +720,21 @@ def start_node(self, node, clean=False):

class ProduceStatus:
def __init__(self,
topic=None,
sent=0,
acked=0,
bad_offsets=0,
max_offsets_produced=dict(),
restarts=0,
latency=None,
active=False,
failed_transactions=0,
aborted_transaction_msgs=0):
self.topic = topic
self.sent = sent
self.acked = acked
self.bad_offsets = bad_offsets
self.max_offsets_produced = max_offsets_produced
self.restarts = restarts
if latency is None:
latency = {'p50': 0, 'p90': 0, 'p99': 0}
Expand All @@ -594,115 +746,3 @@ def __init__(self,
def __str__(self):
l = self.latency
return f"ProduceStatus<{self.sent} {self.acked} {self.bad_offsets} {self.restarts} {self.failed_transactions} {self.aborted_transaction_messages} {l['p50']}/{l['p90']}/{l['p99']}>"


class KgoVerifierProducer(KgoVerifierService):
def __init__(self,
context,
redpanda,
topic,
msg_size,
msg_count,
custom_node=None,
batch_max_bytes=None,
debug_logs=False,
trace_logs=False,
fake_timestamp_ms=None,
use_transactions=False,
transaction_abort_rate=None,
msgs_per_transaction=None,
rate_limit_bps=None,
key_set_cardinality=None):
super(KgoVerifierProducer,
self).__init__(context, redpanda, topic, msg_size, custom_node,
debug_logs, trace_logs)
self._msg_count = msg_count
self._status = ProduceStatus()
self._batch_max_bytes = batch_max_bytes
self._fake_timestamp_ms = fake_timestamp_ms
self._use_transactions = use_transactions
self._transaction_abort_rate = transaction_abort_rate
self._msgs_per_transaction = msgs_per_transaction
self._rate_limit_bps = rate_limit_bps
self._key_set_cardinality = key_set_cardinality

@property
def produce_status(self):
return self._status

def wait_node(self, node, timeout_sec=None):
if not self._status_thread:
return True

self.logger.debug(f"{self.who_am_i()} wait: awaiting message count")
try:
self._redpanda.wait_until(lambda: self._status_thread.errored or
self._status.acked >= self._msg_count,
timeout_sec=timeout_sec,
backoff_sec=self._status_thread.INTERVAL)
except:
self.stop_node(node)
raise

self._status_thread.raise_on_error()

if self._status.bad_offsets != 0:
# This either means that the test sent multiple producers' traffic to
# the same topic, or that Redpanda showed a buggy behavior with
# idempotency: producer records should always land at the next offset
# after the last record they wrote.
raise RuntimeError(
f"{self.who_am_i()} possible idempotency bug: {self._status}")

return super().wait_node(node, timeout_sec=timeout_sec)

def wait_for_acks(self, count, timeout_sec, backoff_sec):
self._redpanda.wait_until(
lambda: self._status_thread.errored or self._status.acked >= count,
timeout_sec=timeout_sec,
backoff_sec=backoff_sec)
self._status_thread.raise_on_error()

def wait_for_offset_map(self):
# Producer worker aims to checkpoint every 5 seconds, so we should see this promptly.
self._redpanda.wait_until(lambda: self._status_thread.errored or all(
node.account.exists(f"valid_offsets_{self._topic}.json")
for node in self.nodes),
timeout_sec=15,
backoff_sec=1)
self._status_thread.raise_on_error()

def is_complete(self):
return self._status.acked >= self._msg_count

def start_node(self, node, clean=False):
if clean:
self.clean_node(node)

cmd = f"{TESTS_DIR}/kgo-verifier --brokers {self._redpanda.brokers()} --topic {self._topic} --msg_size {self._msg_size} --produce_msgs {self._msg_count} --rand_read_msgs 0 --seq_read=0 --client-name {self.who_am_i()}"

if self._batch_max_bytes is not None:
cmd = cmd + f' --batch_max_bytes {self._batch_max_bytes}'

if self._fake_timestamp_ms is not None:
cmd = cmd + f' --fake-timestamp-ms {self._fake_timestamp_ms}'

if self._use_transactions:
cmd = cmd + f' --use-transactions'

if self._msgs_per_transaction is not None:
cmd = cmd + f' --msgs-per-transaction {self._msgs_per_transaction}'

if self._transaction_abort_rate is not None:
cmd = cmd + f' --transaction-abort-rate {self._transaction_abort_rate}'

if self._rate_limit_bps is not None:
cmd = cmd + f' --produce-throughput-bps {self._rate_limit_bps}'

if self._key_set_cardinality is not None:
cmd += f" --key-set-cardinality {self._key_set_cardinality}"

self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ProduceStatus)
self._status_thread.start()
Loading