Skip to content

Commit

Permalink
ducky: add max_transactions_per_coordinator tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rystsov committed Aug 16, 2023
1 parent ac155de commit 149e746
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 1 deletion.
7 changes: 6 additions & 1 deletion src/v/utils/rwlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ class rwlock_unit {
rwlock* _lock;
bool _is_write_lock{false};

public:
rwlock_unit(rwlock* lock, bool is_write_lock) noexcept
: _lock(lock)
, _is_write_lock(is_write_lock) {}

friend class rwlock;

public:
rwlock_unit(rwlock_unit&& token) noexcept
: _lock(token._lock)
, _is_write_lock(token._is_write_lock) {
Expand All @@ -36,6 +39,8 @@ class rwlock_unit {
rwlock_unit(const rwlock_unit&&) = delete;
rwlock_unit() = delete;

explicit operator bool() const noexcept { return _lock != nullptr; }

rwlock_unit& operator=(rwlock_unit&& other) noexcept {
if (this != &other) {
this->_lock = other._lock;
Expand Down
12 changes: 12 additions & 0 deletions src/v/utils/tests/rwlock_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,15 @@ SEASTAR_THREAD_TEST_CASE(test_rwlock_unit_move) {
auto unit2 = rwlock.attempt_write_lock();
BOOST_REQUIRE(unit2);
}

SEASTAR_THREAD_TEST_CASE(test_rwlock_assign) {
ssx::rwlock rwlock;
auto unit1 = rwlock.attempt_write_lock();
BOOST_REQUIRE(unit1);
BOOST_REQUIRE(unit1.value());

ssx::rwlock_unit unit2 = std::move(unit1.value());

BOOST_REQUIRE(!unit1.value());
BOOST_REQUIRE(unit2);
}
74 changes: 74 additions & 0 deletions tests/rptest/tests/tx_overflow_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright 2022 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.services.cluster import cluster
from rptest.clients.types import TopicSpec

from rptest.tests.redpanda_test import RedpandaTest
from confluent_kafka.cimpl import KafkaException, KafkaError
from confluent_kafka import Producer
from rptest.clients.rpk import RpkTool


class TxOverflowTest(RedpandaTest):
topics = [TopicSpec(partition_count=1, replication_factor=3)]

def __init__(self, test_context):
extra_rp_conf = {
"enable_leader_balancer": False,
"partition_autobalancing_mode": "off",
"transaction_coordinator_partitions": 1
}

super(TxOverflowTest, self).__init__(test_context=test_context,
extra_rp_conf=extra_rp_conf,
log_level="trace")

def set_max_transactions_per_coordinator(self, n):
rpk = RpkTool(self.redpanda)
rpk.cluster_config_set("max_transactions_per_coordinator", str(n))

@cluster(num_nodes=3)
def underflow_test(self):
producers = []
for i in range(20):
p = Producer({
'bootstrap.servers': self.redpanda.brokers(),
'transactional.id': f"tx_{i}",
})
p.init_transactions()
producers.append(p)

for p in producers:
p.begin_transaction()
p.produce(self.topic, "key", "value", partition=0)
p.commit_transaction()

@cluster(num_nodes=3)
def overflow_test(self):
self.set_max_transactions_per_coordinator(10)

producers = []
for i in range(20):
p = Producer({
'bootstrap.servers': self.redpanda.brokers(),
'transactional.id': f"tx_{i}",
})
p.init_transactions()
producers.append(p)

oldest_producer = producers[0]
oldest_producer.begin_transaction()
oldest_producer.produce(self.topic, "key", "value", partition=0)
try:
oldest_producer.commit_transaction()
assert False, ""
except KafkaException as e:
assert e.args[0].code(
) == KafkaError.INVALID_PRODUCER_ID_MAPPING, f"observed code={e.args[0].code()}"

0 comments on commit 149e746

Please sign in to comment.