Skip to content

Commit

Permalink
txn: step down on begin_tx & store_txn_offsets failures
Browse files Browse the repository at this point in the history
Transactions in kafka protocol are stateful: the processing of the
requests depends on the previous commands executed by the same or
even different producer. It makes the situations when the replica-
tion fails with the indecisive errors such as timeout dangerous
because the true state is unknown.

Stepping down to resolve uncertainty by replaying the log
  • Loading branch information
rystsov committed Jun 8, 2023
1 parent 69c5392 commit 7ca5707
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1915,17 +1915,22 @@ group::begin_tx(cluster::begin_group_tx_request r) {

auto reader = model::make_memory_record_batch_reader(
std::move(batch.value()));
auto e = co_await _partition->raft()->replicate(
auto res = co_await _partition->raft()->replicate(
_term,
std::move(reader),
raft::replicate_options(raft::consistency_level::quorum_ack));

if (!e) {
if (!res) {
vlog(
_ctx_txlog.warn,
"Error \"{}\" on replicating pid:{} fencing batch",
e.error(),
res.error(),
r.pid);
if (
_partition->raft()->is_leader()
&& _partition->raft()->term() == _term) {
co_await _partition->raft()->step_down("group begin_tx failed");
}
co_return make_begin_tx_reply(cluster::tx_errc::leader_not_found);
}

Expand All @@ -1936,9 +1941,9 @@ group::begin_tx(cluster::begin_group_tx_request r) {
_volatile_txs[r.pid] = volatile_tx{.tx_seq = r.tx_seq};
}

auto res = _expiration_info.insert_or_assign(
auto [it, _] = _expiration_info.insert_or_assign(
r.pid, expiration_info(r.timeout));
try_arm(res.first->second.deadline());
try_arm(it->second.deadline());

cluster::begin_group_tx_reply reply;
reply.etag = _term;
Expand Down Expand Up @@ -2271,6 +2276,12 @@ group::store_txn_offsets(txn_offset_commit_request r) {
raft::replicate_options(raft::consistency_level::quorum_ack));

if (!e) {
if (
_partition->raft()->is_leader()
&& _partition->raft()->term() == _term) {
co_await _partition->raft()->step_down(
"group store_txn_offsets failed");
}
co_return txn_offset_commit_response(
r, error_code::unknown_server_error);
}
Expand Down

0 comments on commit 7ca5707

Please sign in to comment.