From 7ca57077f2a4c3ed592d4a989c11729f1aced332 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Sun, 28 May 2023 06:05:48 -0700 Subject: [PATCH] txn: step down on begin_tx & store_txn_offsets failures Transactions in kafka protocol are stateful: the processing of the requests depends on the previous commands executed by the same or even different producer. It makes the situations when the replica- tion fails with the indecisive errors such as timeout dangerous because the true state is unknown. Stepping down to resolve uncertainty by replaying the log --- src/v/kafka/server/group.cc | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 4cf7ac1c2164..640417a39d60 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -1915,17 +1915,22 @@ group::begin_tx(cluster::begin_group_tx_request r) { auto reader = model::make_memory_record_batch_reader( std::move(batch.value())); - auto e = co_await _partition->raft()->replicate( + auto res = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); - if (!e) { + if (!res) { vlog( _ctx_txlog.warn, "Error \"{}\" on replicating pid:{} fencing batch", - e.error(), + res.error(), r.pid); + if ( + _partition->raft()->is_leader() + && _partition->raft()->term() == _term) { + co_await _partition->raft()->step_down("group begin_tx failed"); + } co_return make_begin_tx_reply(cluster::tx_errc::leader_not_found); } @@ -1936,9 +1941,9 @@ group::begin_tx(cluster::begin_group_tx_request r) { _volatile_txs[r.pid] = volatile_tx{.tx_seq = r.tx_seq}; } - auto res = _expiration_info.insert_or_assign( + auto [it, _] = _expiration_info.insert_or_assign( r.pid, expiration_info(r.timeout)); - try_arm(res.first->second.deadline()); + try_arm(it->second.deadline()); cluster::begin_group_tx_reply reply; reply.etag = _term; @@ -2271,6 +2276,12 @@ group::store_txn_offsets(txn_offset_commit_request r) { raft::replicate_options(raft::consistency_level::quorum_ack)); if (!e) { + if ( + _partition->raft()->is_leader() + && _partition->raft()->term() == _term) { + co_await _partition->raft()->step_down( + "group store_txn_offsets failed"); + } co_return txn_offset_commit_response( r, error_code::unknown_server_error); }