Skip to content

Commit

Permalink
ducky: add draining transactions tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ZeDRoman authored and Denis Rystsov committed Sep 21, 2023
1 parent 1fae7be commit 491cb7f
Show file tree
Hide file tree
Showing 3 changed files with 377 additions and 0 deletions.
25 changes: 25 additions & 0 deletions tests/rptest/services/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,31 @@ def describe_tx_registry(self, node=None):
info["tx_mapping"] = mapping
return info

def start_tx_draining(self,
tm_partition,
repartitioning_id,
tx_ids,
tx_ranges,
node=None):
"""
Mark tx ids and ranges as draining
"""
path = f"transactions/draining_transactions?tm_partition={tm_partition}"
json = dict(repartitioning_id=repartitioning_id,
transactional_ids=tx_ids,
hash_ranges=[{
"from": range[0],
"to": range[1]
} for range in tx_ranges])
return self._request('post', path, json=json, node=node)

def get_draining_transactions(self, tm_partition, node=None):
"""
Get draining transaction for tm_partition
"""
path = f"transactions/draining_transactions?tm_partition={tm_partition}"
return self._request('get', path, node=node).json()

def set_partition_replicas(self,
topic,
partition,
Expand Down
92 changes: 92 additions & 0 deletions tests/rptest/tests/transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import uuid
import random

from ducktape.mark import parametrize
from ducktape.utils.util import wait_until
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.admin import Admin
Expand Down Expand Up @@ -797,6 +798,97 @@ def check_pids_overflow_test(self):
assert num_consumed == should_be_consumed


class GATransaction_MixedVersionsTest(RedpandaTest):
def consume(self, consumer, max_records=10, timeout_s=2):
def consume_records():
records = consumer.consume(max_records, timeout_s)

if records:
return True, records
else:
False

return wait_until_result(consume_records,
timeout_sec=30,
backoff_sec=2,
err_msg="Can not consume data")

def check_consume(self, topic_name, max_tx):
consumer = ck.Consumer({
'bootstrap.servers': self.redpanda.brokers(),
'group.id': f"consumer-{uuid.uuid4()}",
'auto.offset.reset': 'earliest',
})

consumer.subscribe([topic_name])

num_consumed = 0
prev_rec = bytes("0", 'UTF-8')

while num_consumed < max_tx:
self.redpanda.logger.debug(
f"Consumed {num_consumed}. Should consume at the end {max_tx}")
records = self.consume(consumer)

for record in records:
assert prev_rec == record.key()
prev_rec = bytes(str(int(prev_rec) + 1), 'UTF-8')

num_consumed += len(records)

assert num_consumed == max_tx

@cluster(num_nodes=2, log_allow_list=RESTART_LOG_ALLOW_LIST)
def check_parsing_test(self):
self.redpanda = make_redpanda_service(
self.test_context,
1,
extra_rp_conf={
"enable_leader_balancer": False,
"enable_auto_rebalance_on_node_add": False,
},
environment={
"__REDPANDA_LATEST_LOGICAL_VERSION": 5,
"__REDPANDA_EARLIEST_LOGICAL_VERSION": 5
})

self.redpanda.start()

spec = TopicSpec(partition_count=1, replication_factor=1)
self._client = DefaultClient(self.redpanda)
self.client().create_topic(spec)
topic_name = spec.name

producer = ck.Producer({
'bootstrap.servers': self.redpanda.brokers(),
'transactional.id': '123',
'transaction.timeout.ms': 10000,
})

producer.init_transactions()

def on_del(err, msg):
assert err == None

max_tx = 100
for i in range(max_tx):
producer.begin_transaction()
producer.produce(topic_name, str(i), str(i), 0, on_del)
producer.commit_transaction()
producer.flush()

self.check_consume(topic_name, max_tx)

self.redpanda.set_environment({
"__REDPANDA_LATEST_LOGICAL_VERSION": 6,
"__REDPANDA_EARLIEST_LOGICAL_VERSION": 5
})
for n in self.redpanda.nodes:
self.redpanda.restart_nodes(n, stop_timeout=60)

self.check_consume(topic_name, max_tx)


class GATransaction_v22_1_UpgradeTest(RedpandaTest):
topics = (TopicSpec(partition_count=1, replication_factor=3), )

Expand Down
Loading

0 comments on commit 491cb7f

Please sign in to comment.