Skip to content

Commit

Permalink
tests: added test triggering snapshot delivery with idempotency
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Maslanka <[email protected]>
  • Loading branch information
mmaslankaprv committed Apr 21, 2023
1 parent b9906c7 commit fad8797
Showing 1 changed file with 59 additions and 0 deletions.
59 changes: 59 additions & 0 deletions tests/rptest/tests/idempotency_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.clients.default import DefaultClient
from rptest.clients.types import TopicSpec
from rptest.services.action_injector import ActionConfig, random_process_kills
from rptest.services.admin import Admin
from rptest.services.cluster import cluster
from ducktape.utils.util import wait_until
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer
from rptest.services.redpanda import SISettings
from rptest.tests.prealloc_nodes import PreallocNodesTest

from rptest.tests.redpanda_test import RedpandaTest
from rptest.clients.rpk import RpkTool
Expand Down Expand Up @@ -47,3 +55,54 @@ def test_idempotency_compacted_topic(self):
value="value1".encode('utf-8'),
callback=on_delivery)
producer.flush()


class IdempotencySnapshotDelivery(PreallocNodesTest):
def __init__(self, test_context):
extra_rp_conf = {"enable_leader_balancer": False}

si_settings = SISettings(test_context,
log_segment_size=1024 * 1024,
fast_uploads=True)
super(IdempotencySnapshotDelivery,
self).__init__(test_context=test_context,
extra_rp_conf=extra_rp_conf,
node_prealloc_count=1,
si_settings=si_settings)

@cluster(num_nodes=4)
def test_recovery_after_snapshot_is_delivered(self):
segment_bytes = 1024 * 1024
msg_size = 128
rate_limit = 10 * (1024 * 1024) if not self.debug_mode else 1024 * 1024
msg_cnt = int(15 * rate_limit / msg_size)

topic = TopicSpec(partition_count=1,
replication_factor=3,
segment_bytes=segment_bytes,
retention_bytes=2 * segment_bytes,
redpanda_remote_read=True,
redpanda_remote_write=True)

# create topic with small segments and short retention
DefaultClient(self.redpanda).create_topic(topic)

producer = KgoVerifierProducer(self.test_context,
self.redpanda,
topic.name,
msg_size,
msg_cnt,
custom_node=self.preallocated_nodes,
rate_limit_bps=rate_limit)

producer.start(clean=False)

pkill_config = ActionConfig(cluster_start_lead_time_sec=10,
min_time_between_actions_sec=10,
max_time_between_actions_sec=20)
with random_process_kills(self.redpanda, pkill_config) as ctx:
wait_until(lambda: producer.produce_status.acked >= msg_cnt, 240,
1)
producer.stop()

assert producer.produce_status.bad_offsets == 0, "Producer bad offsets detected"

0 comments on commit fad8797

Please sign in to comment.