Skip to content

Commit

Permalink
Merge pull request #12116 from rystsov/issue-11947
Browse files Browse the repository at this point in the history
txns: fix list txns failure when it's used before find_coordinator
  • Loading branch information
rystsov authored Aug 3, 2023
2 parents 5851af5 + 85ad993 commit ad733f5
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 6 deletions.
46 changes: 41 additions & 5 deletions src/v/cluster/tx_registry_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,49 @@ tx_registry_frontend::tx_registry_frontend(
config::shard_local_cfg().metadata_dissemination_retry_delay_ms.value()) {
}

ss::future<bool> tx_registry_frontend::ensure_tx_topic_exists() {
if (_metadata_cache.local().contains(model::tx_manager_nt)) {
co_return true;
}

if (!co_await try_create_tx_topic()) {
vlog(clusterlog.warn, "failed to create {}", model::tx_manager_nt);
co_return false;
}

auto retries = _metadata_dissemination_retries;
auto delay_ms = _metadata_dissemination_retry_delay_ms;
std::optional<std::string> error;
while (!_as.abort_requested() && 0 < retries--) {
auto is_cache_filled = _metadata_cache.local().contains(
model::tx_manager_nt);
if (unlikely(!is_cache_filled)) {
error = vformat(
fmt::runtime("can't find {} in the metadata_cache cache"),
model::tx_manager_nt);
vlog(
clusterlog.trace,
"waiting for {} to fill metadata_cache cache, retries left: {}",
model::tx_manager_nt,
retries);
co_await sleep_abortable(delay_ms, _as);
continue;
}
co_return true;
}

if (error) {
vlog(clusterlog.warn, "{}", error.value());
}

co_return false;
}

ss::future<find_coordinator_reply> tx_registry_frontend::find_coordinator(
kafka::transactional_id tid, model::timeout_clock::duration timeout) {
if (!_metadata_cache.local().contains(model::tx_manager_nt)) {
if (!co_await try_create_tx_topic()) {
vlog(
clusterlog.warn,
"can't find {} in the metadata cache",
model::tx_manager_nt);
vlog(clusterlog.warn, "failed to create {}", model::tx_manager_nt);
co_return find_coordinator_reply(
std::nullopt, std::nullopt, errc::topic_not_exists);
}
Expand Down Expand Up @@ -316,7 +351,8 @@ ss::future<bool> tx_registry_frontend::try_create_tx_topic() {
return _controller->get_topics_frontend()
.local()
.autocreate_topics(
{std::move(topic)}, config::shard_local_cfg().create_topic_timeout_ms())
{std::move(topic)},
config::shard_local_cfg().create_topic_timeout_ms() * partitions_amount)
.then([](std::vector<cluster::topic_result> res) {
vassert(res.size() == 1, "expected exactly one result");
if (res[0].ec == cluster::errc::topic_already_exists) {
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/tx_registry_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class tx_registry_frontend {
ss::sharded<cluster::tx_coordinator_mapper>&,
ss::sharded<features::feature_table>&);

ss::future<bool> ensure_tx_topic_exists();

ss::future<find_coordinator_reply>
find_coordinator(kafka::transactional_id, model::timeout_clock::duration);

Expand Down
11 changes: 11 additions & 0 deletions src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "cluster/id_allocator_frontend.h"
#include "cluster/topics_frontend.h"
#include "cluster/tx_registry_frontend.h"
#include "config/broker_authn_endpoint.h"
#include "config/configuration.h"
#include "config/node_config.h"
Expand Down Expand Up @@ -1566,6 +1567,16 @@ list_transactions_handler::handle(request_context ctx, ss::smp_service_group) {
return true;
};

auto& tx_registry = ctx.tx_registry_frontend();
if (!co_await tx_registry.ensure_tx_topic_exists()) {
vlog(
klog.error,
"Can not return list of transactions. Failed to create {}",
model::tx_manager_nt);
response.data.error_code = kafka::error_code::unknown_server_error;
co_return co_await ctx.respond(std::move(response));
}

auto& tx_frontend = ctx.tx_gateway_frontend();
auto txs = co_await tx_frontend.get_all_transactions();
if (txs.has_value()) {
Expand Down
8 changes: 7 additions & 1 deletion tests/rptest/tests/transaction_kafka_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ def __init__(self, test_context):
"tx_timeout_delay_ms": 10000000,
"abort_timed_out_transactions_interval_ms":
10000000,
'enable_leader_balancer': False
"enable_leader_balancer": False,
"transaction_coordinator_partitions": 4
})

self.kafka_cli = KafkaCliTools(self.redpanda, "3.0.0")
Expand Down Expand Up @@ -136,6 +137,11 @@ def test_describe_transactions(self):
tpoic_partition = f"{topic}-{partition}"
assert tpoic_partition in expected_partitions

@cluster(num_nodes=3)
def test_empty_list_transactions(self):
txs_info = self.kafka_cli.list_transactions()
assert len(txs_info) == 0

@cluster(num_nodes=3)
def test_list_transactions(self):
producer1 = ck.Producer({
Expand Down

0 comments on commit ad733f5

Please sign in to comment.