From 4297c2dd1d26068eeaae1118f53c5bc2d256a8b9 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Tue, 11 Oct 2022 16:21:21 -0700 Subject: [PATCH 01/19] it's ok to delay an ack so no need to add conditions --- src/v/cluster/tx_gateway_frontend.cc | 28 +++++++--------------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/src/v/cluster/tx_gateway_frontend.cc b/src/v/cluster/tx_gateway_frontend.cc index 50744b65acf2..19860221ae5a 100644 --- a/src/v/cluster/tx_gateway_frontend.cc +++ b/src/v/cluster/tx_gateway_frontend.cc @@ -1533,32 +1533,18 @@ tx_gateway_frontend::do_commit_tm_tx( throw; } - if (!is_transaction_ga()) { - outcome->set_value(tx_errc::none); - } - auto changed_tx = co_await stm->mark_tx_prepared(expected_term, tx.id); if (!changed_tx.has_value()) { - if (changed_tx.error() == tm_stm::op_status::not_leader) { - if (is_transaction_ga()) { - outcome->set_value(tx_errc::not_coordinator); - } - co_return tx_errc::not_coordinator; - } - if (is_transaction_ga()) { - outcome->set_value(tx_errc::unknown_server_error); - } + outcome->set_value(tx_errc::unknown_server_error); co_return tx_errc::unknown_server_error; } - if (is_transaction_ga()) { - // We can reduce the number of disk operation if we will not write - // preparing state on disk. But after it we should ans to client when we - // sure that tx will be recommited after fail. We can guarantee it only - // if we ans after marking tx prepared. Becase after fail tx will be - // recommited again and client will see expected bechavior. - outcome->set_value(tx_errc::none); - } + // We can reduce the number of disk operation if we will not write + // preparing state on disk. But after it we should ans to client when we + // sure that tx will be recommited after fail. We can guarantee it only + // if we ans after marking tx prepared. Becase after fail tx will be + // recommited again and client will see expected bechavior. + outcome->set_value(tx_errc::none); tx = changed_tx.value(); From df42b11e6508e958323104c9782e73e9f9aac7fe Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Tue, 11 Oct 2022 16:25:15 -0700 Subject: [PATCH 02/19] action already performed --- src/v/cluster/tx_gateway_frontend.cc | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/v/cluster/tx_gateway_frontend.cc b/src/v/cluster/tx_gateway_frontend.cc index 19860221ae5a..19ae87303a47 100644 --- a/src/v/cluster/tx_gateway_frontend.cc +++ b/src/v/cluster/tx_gateway_frontend.cc @@ -1514,10 +1514,6 @@ tx_gateway_frontend::do_commit_tm_tx( auto aborting_tx = co_await stm->mark_tx_killed( expected_term, tx.id); if (!aborting_tx.has_value()) { - if (aborting_tx.error() == tm_stm::op_status::not_leader) { - outcome->set_value(tx_errc::not_coordinator); - co_return tx_errc::not_coordinator; - } outcome->set_value(tx_errc::invalid_txn_state); co_return tx_errc::invalid_txn_state; } From 6a59047949d8d9928fe64ceb68b87c1b90775ff6 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Tue, 11 Oct 2022 16:41:11 -0700 Subject: [PATCH 03/19] with writing fence on each tx this bug fix isn't needed --- src/v/cluster/tm_stm.cc | 54 +++++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/src/v/cluster/tm_stm.cc b/src/v/cluster/tm_stm.cc index 6ba85c9f3e70..453492223e53 100644 --- a/src/v/cluster/tm_stm.cc +++ b/src/v/cluster/tm_stm.cc @@ -442,22 +442,25 @@ ss::future tm_stm::add_partitions( if (ptx->second.status != tm_transaction::tx_status::ongoing) { co_return tm_stm::op_status::unknown; } - bool just_started = ptx->second.partitions.size() == 0 + + if (!is_transaction_ga()) { + bool just_started = ptx->second.partitions.size() == 0 && ptx->second.groups.size() == 0; - if (just_started) { - tm_transaction tx = ptx->second; - for (auto& partition : partitions) { - tx.partitions.push_back(partition); - } - tx.last_update_ts = clock_type::now(); - auto r = co_await update_tx(tx, tx.etag); + if (just_started) { + tm_transaction tx = ptx->second; + for (auto& partition : partitions) { + tx.partitions.push_back(partition); + } + tx.last_update_ts = clock_type::now(); + auto r = co_await update_tx(tx, tx.etag); - if (!r.has_value()) { - co_return tm_stm::op_status::unknown; + if (!r.has_value()) { + co_return tm_stm::op_status::unknown; + } + _mem_txes[tx_id] = tx; + co_return tm_stm::op_status::success; } - _mem_txes[tx_id] = tx; - co_return tm_stm::op_status::success; } for (auto& partition : partitions) { @@ -479,21 +482,24 @@ ss::future tm_stm::add_group( if (ptx->second.status != tm_transaction::tx_status::ongoing) { co_return tm_stm::op_status::unknown; } - bool just_started = ptx->second.partitions.size() == 0 - && ptx->second.groups.size() == 0; - if (just_started) { - tm_transaction tx = ptx->second; - tx.groups.push_back( - tm_transaction::tx_group{.group_id = group_id, .etag = term}); - tx.last_update_ts = clock_type::now(); - auto r = co_await update_tx(tx, tx.etag); + if (!is_transaction_ga()) { + bool just_started = ptx->second.partitions.size() == 0 + && ptx->second.groups.size() == 0; + + if (just_started) { + tm_transaction tx = ptx->second; + tx.groups.push_back( + tm_transaction::tx_group{.group_id = group_id, .etag = term}); + tx.last_update_ts = clock_type::now(); + auto r = co_await update_tx(tx, tx.etag); - if (!r.has_value()) { - co_return tm_stm::op_status::unknown; + if (!r.has_value()) { + co_return tm_stm::op_status::unknown; + } + _mem_txes[tx_id] = tx; + co_return tm_stm::op_status::success; } - _mem_txes[tx_id] = tx; - co_return tm_stm::op_status::success; } ptx->second.groups.push_back( From 54215cd84ec0c0470615157e2986c67b5b14daa3 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Tue, 11 Oct 2022 16:47:07 -0700 Subject: [PATCH 04/19] remove obsolete comment --- src/v/cluster/rm_stm.cc | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index bdb4d96c288c..dc71c8d89b7f 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -382,11 +382,6 @@ ss::future> rm_stm::do_begin_tx( auto [_, insert_tx_seq] = _log_state.tx_seqs.emplace(pid, tx_seq); insert &= insert_tx_seq; if (!insert) { - // TODO: https://app.clubhouse.io/vectorized/story/2194 - // tm_stm forgot that it had already begun a transaction - // (it may happen when it crashes) - // it's ok we fail this request, a client will abort a - // tx bumping its producer id's epoch vlog( _ctx_log.error, "there is already an ongoing transaction within {} session", From 7d06d73190625254bcbfbfc927295a4a45363c93 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Tue, 11 Oct 2022 18:20:49 -0700 Subject: [PATCH 05/19] make replicate_tx reject tx writes if tx isn't active --- src/v/cluster/rm_stm.cc | 7 ++++++- src/v/cluster/rm_stm.h | 1 + 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index dc71c8d89b7f..95b26f59ba1e 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -1107,7 +1107,11 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { co_return errc::invalid_producer_epoch; } - if (!is_transaction_ga()) { + if (is_transaction_ga()) { + if (!_log_state.tx_seqs.contains(bid.pid)) { + co_return errc::invalid_producer_epoch; + } + } else { if (!_mem_state.expected.contains(bid.pid)) { // there is an inflight abort // or this partition lost leadership => can't continue tx since @@ -1199,6 +1203,7 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { set_seq(bid, new_offset); + // TODO: check what happens on re-election if (!_mem_state.tx_start.contains(bid.pid)) { auto base_offset = model::offset(old_offset() - (bid.record_count - 1)); _mem_state.tx_start.emplace(bid.pid, base_offset); diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 9f43e139e65e..3b9601af78d0 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -393,6 +393,7 @@ class rm_stm final : public persisted_stm { absl::flat_hash_map tx_seqs; + // number of batches replicated withing a transaction absl::flat_hash_map inflight; }; From 983e2ad817da0dd068b056842255a288b43e3c50 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Tue, 11 Oct 2022 19:11:53 -0700 Subject: [PATCH 06/19] adds stepdown to resolve unknown situations --- src/v/cluster/rm_stm.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 95b26f59ba1e..bdb5fc79896c 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -1188,6 +1188,9 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { // an error during replication, preventin tx from progress _mem_state.expected.erase(bid.pid); } + if (_c->is_leader() && _c->term() == synced_term) { + co_await _c->step_down(); + } co_return r.error(); } From 480ac28f552f3c3f50c9aa95e08f84eb0c468c75 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Tue, 11 Oct 2022 19:15:29 -0700 Subject: [PATCH 07/19] add wait to rely on log_state.ongoing_* instead of tx_start/s --- src/v/cluster/rm_stm.cc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index bdb5fc79896c..5e2b36be9dd6 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -1193,6 +1193,13 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { } co_return r.error(); } + if (!co_await wait_no_throw( + model::offset(r.value().last_offset()), _sync_timeout)) { + if (_c->is_leader() && _c->term() == synced_term) { + co_await _c->step_down(); + } + co_return tx_errc::timeout; + } auto expiration_it = _mem_state.expiration.find(bid.pid); if (expiration_it == _mem_state.expiration.end()) { From f75b3904fbc9545851a66663fb8b7bd956a48392 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Tue, 11 Oct 2022 20:08:44 -0700 Subject: [PATCH 08/19] put accessing old fields behind flag --- src/v/cluster/rm_stm.cc | 86 ++++++++++++++++++++++++----------------- src/v/cluster/rm_stm.h | 17 ++++---- 2 files changed, 60 insertions(+), 43 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 5e2b36be9dd6..f7147ebba17b 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -396,8 +396,6 @@ ss::future> rm_stm::do_begin_tx( // strictly after all records are written it means that it // won't be retrying old writes and we may reset the seq cache _log_state.seq_table.erase(pid); - _log_state.inflight[pid] = 0; - _mem_state.inflight[pid] = 0; co_return synced_term; } @@ -473,17 +471,22 @@ ss::future rm_stm::do_prepare_tx( co_return tx_errc::request_rejected; } - auto expected_it = _mem_state.expected.find(pid); - if (expected_it == _mem_state.expected.end()) { - // impossible situation, a transaction coordinator tries - // to prepare a transaction which wasn't started - vlog(_ctx_log.error, "Can't prepare pid:{} - unknown session", pid); - co_return tx_errc::request_rejected; - } - - if (expected_it->second != tx_seq) { - // current prepare_tx call is stale, rejecting - co_return tx_errc::request_rejected; + if (is_transaction_ga()) { + if (!_log_state.tx_seqs.contains(pid)) { + co_return errc::invalid_producer_epoch; + } + } else { + auto expected_it = _mem_state.expected.find(pid); + if (expected_it == _mem_state.expected.end()) { + // impossible situation, a transaction coordinator tries + // to prepare a transaction which wasn't started + vlog(_ctx_log.error, "Can't prepare pid:{} - unknown session", pid); + co_return tx_errc::request_rejected; + } + if (expected_it->second != tx_seq) { + // current prepare_tx call is stale, rejecting + co_return tx_errc::request_rejected; + } } auto marker = prepare_marker{ @@ -982,12 +985,14 @@ ss::future> rm_stm::get_transactions() { ans.emplace(id, tx_info); } - for (auto& [id, offset] : _mem_state.tx_start) { - transaction_info tx_info; - tx_info.lso_bound = offset; - tx_info.status = get_tx_status(id); - tx_info.info = get_expiration_info(id); - ans.emplace(id, tx_info); + if (!is_transaction_ga()) { + for (auto& [id, offset] : _mem_state.tx_start) { + transaction_info tx_info; + tx_info.lso_bound = offset; + tx_info.status = get_tx_status(id); + tx_info.info = get_expiration_info(id); + ans.emplace(id, tx_info); + } } for (auto& [id, offset] : _log_state.ongoing_map) { @@ -1144,7 +1149,7 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { } } - if (_mem_state.inflight[bid.pid] > 0 || _log_state.inflight[bid.pid] > 0) { + if (_log_state.inflight[bid.pid] > 0) { // this isn't the first attempt in the tx we should try dedupe auto cached_offset = known_seq(bid); if (cached_offset) { @@ -1173,10 +1178,10 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { reset_seq(bid); } - _mem_state.inflight[bid.pid] = _mem_state.inflight[bid.pid] + 1; - - if (!_mem_state.tx_start.contains(bid.pid)) { - _mem_state.estimated.emplace(bid.pid, _insync_offset); + if (!is_transaction_ga()) { + if (!_mem_state.tx_start.contains(bid.pid)) { + _mem_state.estimated.emplace(bid.pid, _insync_offset); + } } auto r = co_await _c->replicate( @@ -1213,12 +1218,14 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { set_seq(bid, new_offset); - // TODO: check what happens on re-election - if (!_mem_state.tx_start.contains(bid.pid)) { - auto base_offset = model::offset(old_offset() - (bid.record_count - 1)); - _mem_state.tx_start.emplace(bid.pid, base_offset); - _mem_state.tx_starts.insert(base_offset); - _mem_state.estimated.erase(bid.pid); + if (!is_transaction_ga()) { + // TODO: check what happens on re-election + if (!_mem_state.tx_start.contains(bid.pid)) { + auto base_offset = model::offset(old_offset() - (bid.record_count - 1)); + _mem_state.tx_start.emplace(bid.pid, base_offset); + _mem_state.tx_starts.insert(base_offset); + _mem_state.estimated.erase(bid.pid); + } } co_return kafka_result{.last_offset = new_offset}; @@ -1468,9 +1475,11 @@ model::offset rm_stm::last_stable_offset() { first_tx_start = *_log_state.ongoing_set.begin(); } - if (!_mem_state.tx_starts.empty()) { - first_tx_start = std::min( - first_tx_start, *_mem_state.tx_starts.begin()); + if (!is_transaction_ga()) { + if (!_mem_state.tx_starts.empty()) { + first_tx_start = std::min( + first_tx_start, *_mem_state.tx_starts.begin()); + } } for (auto& entry : _mem_state.estimated) { @@ -1640,8 +1649,10 @@ ss::future<> rm_stm::do_abort_old_txes() { for (auto& [k, _] : _mem_state.estimated) { pids.push_back(k); } - for (auto& [k, _] : _mem_state.tx_start) { - pids.push_back(k); + if (!is_transaction_ga()) { + for (auto& [k, _] : _mem_state.tx_start) { + pids.push_back(k); + } } for (auto& [k, _] : _log_state.ongoing_map) { pids.push_back(k); @@ -1908,6 +1919,7 @@ ss::future<> rm_stm::apply_control( if (crt == model::control_record_type::tx_abort) { _log_state.prepared.erase(pid); _log_state.tx_seqs.erase(pid); + _log_state.inflight.erase(pid); auto offset_it = _log_state.ongoing_map.find(pid); if (offset_it != _log_state.ongoing_map.end()) { // make a list @@ -1927,6 +1939,7 @@ ss::future<> rm_stm::apply_control( } else if (crt == model::control_record_type::tx_commit) { _log_state.prepared.erase(pid); _log_state.tx_seqs.erase(pid); + _log_state.inflight.erase(pid); auto offset_it = _log_state.ongoing_map.find(pid); if (offset_it != _log_state.ongoing_map.end()) { _log_state.ongoing_set.erase(offset_it->second.first); @@ -1992,6 +2005,9 @@ void rm_stm::apply_data(model::batch_identity bid, model::offset last_offset) { _log_state.ongoing_set.insert(base_offset); _mem_state.estimated.erase(bid.pid); } + if (!_log_state.inflight.contains(bid.pid)) { + _log_state.inflight[bid.pid] = 0; + } _log_state.inflight[bid.pid]++; } } diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 3b9601af78d0..8ea8e7f78fe8 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -403,25 +403,26 @@ class rm_stm final : public persisted_stm { // with this approach a combination of mem_state and log_state is // always up to date model::term_id term; + // before we replicate the first batch of a transaction we don't know + // its offset but we must prevent read_comitted fetch from getting it + // so we use last seen offset to estimate it + absl::flat_hash_map estimated; + absl::flat_hash_map + expiration; + model::offset last_end_tx{-1}; + + // FIELDS TO GO AFTER GA // a map from producer_identity (a session) to the first offset of // the current transaction in this session absl::flat_hash_map tx_start; // a heap of the first offsets of all ongoing transactions absl::btree_set tx_starts; - // before we replicate the first batch of a transaction we don't know - // its offset but we must prevent read_comitted fetch from getting it - // so we use last seen offset to estimate it - absl::flat_hash_map estimated; // a set of ongoing sessions. we use it to prevent some client protocol // errors like the transactional writes outside of a transaction absl::flat_hash_map expected; // `preparing` helps to identify failed prepare requests and use them to // filter out stale abort requests absl::flat_hash_map preparing; - absl::flat_hash_map - expiration; - model::offset last_end_tx{-1}; - absl::flat_hash_map inflight; void forget(model::producer_identity pid) { expected.erase(pid); From d845b4f05547a00ae5a52373acb7aa70a08b0144 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Tue, 11 Oct 2022 20:15:41 -0700 Subject: [PATCH 09/19] add comment --- src/v/cluster/rm_stm.cc | 44 ++++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index f7147ebba17b..2d47b32097b1 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -560,8 +560,30 @@ ss::future rm_stm::do_commit_tx( } } + // checking fencing from prepared phase + auto fence_it = _log_state.fence_pid_epoch.find(pid.get_id()); + if (fence_it == _log_state.fence_pid_epoch.end()) { + // begin_tx should have set a fence + co_return tx_errc::request_rejected; + } + if (pid.get_epoch() != fence_it->second) { + vlog( + _ctx_log.error, + "Can't commit pid:{} - fenced out by epoch {}", + pid, + fence_it->second); + co_return tx_errc::fenced; + } + std::optional tx_seq_for_pid; - if (!is_transaction_ga()) { + if (is_transaction_ga()) { + // We can't validate a request because the is_transaction_ga + // switch may happen mid tx execution and in this case we risk + // to deadlock the system; please uncomment in 23.1 release + // if (!_log_state.tx_seqs.contains(bid.pid)) { + // co_return errc::invalid_producer_epoch; + // } + } else { auto preparing_it = _mem_state.preparing.find(pid); if (preparing_it != _mem_state.preparing.end()) { @@ -598,27 +620,13 @@ ss::future rm_stm::do_commit_tx( } } - _mem_state.expected.erase(pid); - _mem_state.preparing.erase(pid); - // checking fencing from prepared phase - auto fence_it = _log_state.fence_pid_epoch.find(pid.get_id()); - if (fence_it == _log_state.fence_pid_epoch.end()) { - // begin_tx should have set a fence - co_return tx_errc::request_rejected; - } - if (pid.get_epoch() != fence_it->second) { - vlog( - _ctx_log.error, - "Can't commit pid:{} - fenced out by epoch {}", - pid, - fence_it->second); - co_return tx_errc::fenced; - } - if (!tx_seq_for_pid) { tx_seq_for_pid = get_tx_seq(pid); } + _mem_state.expected.erase(pid); + _mem_state.preparing.erase(pid); + if (!tx_seq_for_pid) { vlog( _ctx_log.trace, From 5e39dcb747434fb51d950580c8177fc0b8569ebd Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Wed, 12 Oct 2022 12:33:33 -0700 Subject: [PATCH 10/19] refactoring --- src/v/cluster/rm_stm.cc | 56 ++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 2d47b32097b1..ef265cc9ce6b 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -306,37 +306,35 @@ ss::future> rm_stm::do_begin_tx( } auto synced_term = _insync_term; - // checking / setting pid fencing auto fence_it = _log_state.fence_pid_epoch.find(pid.get_id()); - auto is_new_pid = fence_it == _log_state.fence_pid_epoch.end(); - if (is_new_pid || pid.get_epoch() > fence_it->second) { - if (!is_new_pid) { - auto old_pid = model::producer_identity{ - pid.get_id(), fence_it->second}; - // there is a fence, it might be that tm_stm failed, forget about - // an ongoing transaction, assigned next pid for the same tx.id and - // started a new transaction without aborting the previous one. - // - // at the same time it's possible that it already aborted the old - // tx before starting this. do_abort_tx is idempotent so calling it - // just in case to proactivly abort the tx instead of waiting for - // the timeout - // - // moreover do_abort_tx is co-idempotent with do_commit_tx so if a - // tx was committed calling do_abort_tx will do nothing - auto ar = co_await do_abort_tx( - old_pid, std::nullopt, _sync_timeout); - if (ar == tx_errc::stale) { - co_return tx_errc::stale; - } - if (ar != tx_errc::none) { - co_return tx_errc::unknown_server_error; - } + if (fence_it == _log_state.fence_pid_epoch.end()) { + // intentionally empty + } else if (pid.get_epoch() > fence_it->second) { + auto old_pid = model::producer_identity{ + pid.get_id(), fence_it->second}; + // there is a fence, it might be that tm_stm failed, forget about + // an ongoing transaction, assigned next pid for the same tx.id and + // started a new transaction without aborting the previous one. + // + // at the same time it's possible that it already aborted the old + // tx before starting this. do_abort_tx is idempotent so calling it + // just in case to proactivly abort the tx instead of waiting for + // the timeout + // + // moreover do_abort_tx is co-idempotent with do_commit_tx so if a + // tx was committed calling do_abort_tx will do nothing + auto ar = co_await do_abort_tx( + old_pid, std::nullopt, _sync_timeout); + if (ar == tx_errc::stale) { + co_return tx_errc::stale; + } + if (ar != tx_errc::none) { + co_return tx_errc::unknown_server_error; + } - if (is_known_session(old_pid)) { - // can't begin a transaction while previous tx is in progress - co_return tx_errc::unknown_server_error; - } + if (is_known_session(old_pid)) { + // can't begin a transaction while previous tx is in progress + co_return tx_errc::unknown_server_error; } } else if (pid.get_epoch() < fence_it->second) { vlog( From 14b66b2efd8e50e7e559fb849a913fb059c79212 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Wed, 12 Oct 2022 12:41:23 -0700 Subject: [PATCH 11/19] fix apply_fence --- src/v/cluster/rm_stm.cc | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index ef265cc9ce6b..0a155a75a1fd 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -1851,9 +1851,9 @@ void rm_stm::apply_fence(model::record_batch&& b) { version, rm_stm::fence_control_record_version); + std::optional tx_seq{}; if (version == rm_stm::fence_control_record_version) { - auto tx_seq = reflection::adl{}.from(val_reader); - _log_state.tx_seqs.try_emplace(bid.pid, tx_seq); + tx_seq = reflection::adl{}.from(val_reader); } auto key_buf = record.release_key(); @@ -1876,11 +1876,13 @@ void rm_stm::apply_fence(model::record_batch&& b) { auto [fence_it, _] = _log_state.fence_pid_epoch.try_emplace( bid.pid.get_id(), bid.pid.get_epoch()); - if (fence_it->second < bid.pid.get_epoch()) { + if (fence_it->second <= bid.pid.get_epoch()) { fence_it->second = bid.pid.get_epoch(); + _log_state.inflight[bid.pid] = 0; + if (version == rm_stm::fence_control_record_version) { + _log_state.tx_seqs[bid.pid] = tx_seq.value(); + } } - - _log_state.inflight[bid.pid] = 0; } ss::future<> rm_stm::apply(model::record_batch b) { From f34ab5f12d0d0e704c5958aa92d66e4753934085 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Wed, 12 Oct 2022 13:03:15 -0700 Subject: [PATCH 12/19] fix begin --- src/v/cluster/rm_stm.cc | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 0a155a75a1fd..28eca95636eb 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -369,22 +369,32 @@ ss::future> rm_stm::do_begin_tx( co_return tx_errc::unknown_server_error; } - bool insert = true; + if (_c->term() != synced_term) { + vlog(_ctx_log.warn, "term changeg from {} to {} during fencing pid {}", synced_term, _c->term(), pid); + co_return tx_errc::unknown_server_error; + } + + auto tx_seq_it = _log_state.tx_seqs.find(pid); + if (tx_seq_it == _log_state.tx_seqs.end()) { + vlog(_ctx_log.error, "tx_seqs should be updated after fencing pid {}", pid); + co_return tx_errc::unknown_server_error; + } + if (tx_seq_it->second != tx_seq) { + vlog(_ctx_log.error, "expected tx_seq={} for pid {} got {}", tx_seq, pid, tx_seq_it->second); + co_return tx_errc::unknown_server_error; + } + if (!is_transaction_ga()) { // We do not need it after transaction_ga. Because we do not have // prepare state anymore auto [_, inserted] = _mem_state.expected.emplace(pid, tx_seq); - insert &= inserted; - } - - auto [_, insert_tx_seq] = _log_state.tx_seqs.emplace(pid, tx_seq); - insert &= insert_tx_seq; - if (!insert) { - vlog( - _ctx_log.error, - "there is already an ongoing transaction within {} session", - pid); - co_return tx_errc::unknown_server_error; + if (!inserted) { + vlog( + _ctx_log.error, + "there is already an ongoing transaction within {} session", + pid); + co_return tx_errc::unknown_server_error; + } } track_tx(pid, transaction_timeout_ms); From ad5684a8db9e66da3a1790eefc392451af5fe5bd Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Wed, 12 Oct 2022 13:07:46 -0700 Subject: [PATCH 13/19] fix prepare --- src/v/cluster/rm_stm.cc | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 28eca95636eb..e6ca73b50559 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -465,6 +465,13 @@ ss::future rm_stm::do_prepare_tx( co_return tx_errc::fenced; } + if (_log_state.tx_seqs.contains(pid)) { + if (_log_state.tx_seqs[pid] != tx_seq) { + vlog(_ctx_log.warn, "expectd tx_seq {} doesn't match gived {} for pid {}", _log_state.tx_seqs[pid], tx_seq, pid); + } + co_return errc::invalid_producer_epoch; + } + if (synced_term != etag) { vlog( _ctx_log.warn, @@ -479,11 +486,7 @@ ss::future rm_stm::do_prepare_tx( co_return tx_errc::request_rejected; } - if (is_transaction_ga()) { - if (!_log_state.tx_seqs.contains(pid)) { - co_return errc::invalid_producer_epoch; - } - } else { + if (!is_transaction_ga()) { auto expected_it = _mem_state.expected.find(pid); if (expected_it == _mem_state.expected.end()) { // impossible situation, a transaction coordinator tries From 51f0ab3085e01194997748f3bcb614f205653c7e Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Wed, 12 Oct 2022 13:37:11 -0700 Subject: [PATCH 14/19] fix replicate_tx --- src/v/cluster/rm_stm.cc | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index e6ca73b50559..df491363adb4 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -1195,12 +1195,8 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { } else { // this is the first attempt in the tx, reset dedupe cache reset_seq(bid); - } - if (!is_transaction_ga()) { - if (!_mem_state.tx_start.contains(bid.pid)) { - _mem_state.estimated.emplace(bid.pid, _insync_offset); - } + _mem_state.estimated[bid.pid] = _insync_offset; } auto r = co_await _c->replicate( From f6474ebb5f0b09294dd5b49323ac4aabee6b23b7 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Wed, 12 Oct 2022 14:17:15 -0700 Subject: [PATCH 15/19] serialize transaction_timeout_ms --- src/v/cluster/rm_stm.cc | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index df491363adb4..26c6a32e4b76 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -57,13 +57,13 @@ static model::record_batch make_fence_batch(model::producer_identity pid) { } static model::record_batch -make_fence_batch(model::producer_identity pid, model::tx_seq tx_seq) { +make_fence_batch(model::producer_identity pid, model::tx_seq tx_seq, std::chrono::milliseconds transaction_timeout_ms) { iobuf key; auto pid_id = pid.id; reflection::serialize(key, model::record_batch_type::tx_fence, pid_id); iobuf value; - reflection::serialize(value, rm_stm::fence_control_record_version, tx_seq); + reflection::serialize(value, rm_stm::fence_control_record_version, tx_seq, transaction_timeout_ms); storage::record_batch_builder builder( model::record_batch_type::tx_fence, model::offset(0)); @@ -346,11 +346,12 @@ ss::future> rm_stm::do_begin_tx( } std::optional batch; - if (!is_transaction_ga()) { - batch = make_fence_batch(pid); + if (is_transaction_ga()) { + batch = make_fence_batch(pid, tx_seq, transaction_timeout_ms); } else { - batch = make_fence_batch(pid, tx_seq); + batch = make_fence_batch(pid); } + auto reader = model::make_memory_record_batch_reader(std::move(*batch)); auto r = co_await _c->replicate( synced_term, @@ -1861,8 +1862,10 @@ void rm_stm::apply_fence(model::record_batch&& b) { rm_stm::fence_control_record_version); std::optional tx_seq{}; + std::optional transaction_timeout_ms; if (version == rm_stm::fence_control_record_version) { tx_seq = reflection::adl{}.from(val_reader); + transaction_timeout_ms = reflection::adl{}.from(val_reader); } auto key_buf = record.release_key(); From a205866bbe85e6dc7a84caf1d692080a3a497de4 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Wed, 12 Oct 2022 15:00:39 -0700 Subject: [PATCH 16/19] move expiration from mem to log --- src/v/cluster/rm_stm.cc | 30 ++++++++++++++++-------------- src/v/cluster/rm_stm.h | 10 ++++++---- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 26c6a32e4b76..d3391d61cdb1 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -826,8 +826,8 @@ ss::future rm_stm::do_abort_tx( // // If it happens to be the first case then Redpanda rejects a // client's tx. - auto expiration_it = _mem_state.expiration.find(pid); - if (expiration_it != _mem_state.expiration.end()) { + auto expiration_it = _log_state.expiration.find(pid); + if (expiration_it != _log_state.expiration.end()) { expiration_it->second.is_expiration_requested = true; } // spawing abort in the background and returning an error to @@ -976,8 +976,8 @@ rm_stm::get_tx_status(model::producer_identity pid) const { std::optional rm_stm::get_expiration_info(model::producer_identity pid) const { - auto it = _mem_state.expiration.find(pid); - if (it == _mem_state.expiration.end()) { + auto it = _log_state.expiration.find(pid); + if (it == _log_state.expiration.end()) { return std::nullopt; } @@ -1043,7 +1043,7 @@ rm_stm::do_mark_expired(model::producer_identity pid) { // We should delete information about expiration for pid, because inside // try_abort_old_tx it checks is tx expired or not. - _mem_state.expiration.erase(pid); + _log_state.expiration.erase(pid); co_await do_try_abort_old_tx(pid); co_return std::error_code(tx_errc::none); } @@ -1222,8 +1222,8 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { co_return tx_errc::timeout; } - auto expiration_it = _mem_state.expiration.find(bid.pid); - if (expiration_it == _mem_state.expiration.end()) { + auto expiration_it = _log_state.expiration.find(bid.pid); + if (expiration_it == _log_state.expiration.end()) { co_return errc::generic_tx_error; } expiration_it->second.last_update = clock_type::now(); @@ -1633,14 +1633,14 @@ void rm_stm::track_tx( if (_gate.is_closed()) { return; } - _mem_state.expiration[pid] = expiration_info{ + _log_state.expiration[pid] = expiration_info{ .timeout = transaction_timeout_ms, .last_update = clock_type::now(), .is_expiration_requested = false}; if (!_is_autoabort_enabled) { return; } - auto deadline = _mem_state.expiration[pid].deadline(); + auto deadline = _log_state.expiration[pid].deadline(); try_arm(deadline); } @@ -1675,8 +1675,8 @@ ss::future<> rm_stm::do_abort_old_txes() { } absl::btree_set expired; for (auto pid : pids) { - auto expiration_it = _mem_state.expiration.find(pid); - if (expiration_it != _mem_state.expiration.end()) { + auto expiration_it = _log_state.expiration.find(pid); + if (expiration_it != _log_state.expiration.end()) { if (!expiration_it->second.is_expired(clock_type::now())) { continue; } @@ -1689,7 +1689,7 @@ ss::future<> rm_stm::do_abort_old_txes() { } std::optional earliest_deadline; - for (auto& [pid, expiration] : _mem_state.expiration) { + for (auto& [pid, expiration] : _log_state.expiration) { if (!is_known_session(pid)) { continue; } @@ -1731,8 +1731,8 @@ ss::future<> rm_stm::do_try_abort_old_tx(model::producer_identity pid) { co_return; } - auto expiration_it = _mem_state.expiration.find(pid); - if (expiration_it != _mem_state.expiration.end()) { + auto expiration_it = _log_state.expiration.find(pid); + if (expiration_it != _log_state.expiration.end()) { if (!expiration_it->second.is_expired(clock_type::now())) { co_return; } @@ -1949,6 +1949,7 @@ ss::future<> rm_stm::apply_control( } _mem_state.forget(pid); + _log_state.forget(pid); if ( _log_state.aborted.size() > _abort_index_segment_size @@ -1967,6 +1968,7 @@ ss::future<> rm_stm::apply_control( } _mem_state.forget(pid); + _log_state.forget(pid); } co_return; diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 8ea8e7f78fe8..b9f2cfa8f457 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -392,9 +392,14 @@ class rm_stm final : public persisted_stm { absl::flat_hash_map seq_table; absl::flat_hash_map tx_seqs; - // number of batches replicated withing a transaction absl::flat_hash_map inflight; + absl::flat_hash_map + expiration; + + void forget(model::producer_identity pid) { + expiration.erase(pid); + } }; struct mem_state { @@ -407,8 +412,6 @@ class rm_stm final : public persisted_stm { // its offset but we must prevent read_comitted fetch from getting it // so we use last seen offset to estimate it absl::flat_hash_map estimated; - absl::flat_hash_map - expiration; model::offset last_end_tx{-1}; // FIELDS TO GO AFTER GA @@ -428,7 +431,6 @@ class rm_stm final : public persisted_stm { expected.erase(pid); estimated.erase(pid); preparing.erase(pid); - expiration.erase(pid); auto tx_start_it = tx_start.find(pid); if (tx_start_it != tx_start.end()) { tx_starts.erase(tx_start_it->second); From 128bbffc94b42c2d4fe8091fc163336985ac8803 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Wed, 12 Oct 2022 15:11:09 -0700 Subject: [PATCH 17/19] fix expiration --- src/v/cluster/rm_stm.cc | 35 ++++++++++++++++++++--------------- src/v/cluster/rm_stm.h | 4 ---- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index d3391d61cdb1..7a203becbbf5 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -396,9 +396,9 @@ ss::future> rm_stm::do_begin_tx( pid); co_return tx_errc::unknown_server_error; } - } - track_tx(pid, transaction_timeout_ms); + track_tx(pid, transaction_timeout_ms); + } // a client may start new transaction only when the previous // tx is committed. since a client commits a transacitons @@ -1200,6 +1200,13 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { _mem_state.estimated[bid.pid] = _insync_offset; } + auto expiration_it = _log_state.expiration.find(bid.pid); + if (expiration_it == _log_state.expiration.end()) { + co_return errc::generic_tx_error; + } + expiration_it->second.last_update = clock_type::now(); + expiration_it->second.is_expiration_requested = false; + auto r = co_await _c->replicate( synced_term, std::move(br), @@ -1222,13 +1229,6 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { co_return tx_errc::timeout; } - auto expiration_it = _log_state.expiration.find(bid.pid); - if (expiration_it == _log_state.expiration.end()) { - co_return errc::generic_tx_error; - } - expiration_it->second.last_update = clock_type::now(); - expiration_it->second.is_expiration_requested = false; - auto old_offset = r.value().last_offset; auto new_offset = from_log_offset(old_offset); @@ -1633,10 +1633,13 @@ void rm_stm::track_tx( if (_gate.is_closed()) { return; } - _log_state.expiration[pid] = expiration_info{ - .timeout = transaction_timeout_ms, - .last_update = clock_type::now(), - .is_expiration_requested = false}; + + if (!_log_state.expiration.contains(pid)) { + _log_state.expiration[pid] = expiration_info{ + .timeout = transaction_timeout_ms, + .last_update = clock_type::now(), + .is_expiration_requested = false}; + } if (!_is_autoabort_enabled) { return; } @@ -1891,8 +1894,10 @@ void rm_stm::apply_fence(model::record_batch&& b) { if (fence_it->second <= bid.pid.get_epoch()) { fence_it->second = bid.pid.get_epoch(); _log_state.inflight[bid.pid] = 0; + _log_state.expiration.erase(pid); if (version == rm_stm::fence_control_record_version) { _log_state.tx_seqs[bid.pid] = tx_seq.value(); + track_tx(bid.pid, transaction_timeout_ms.value()); } } } @@ -1949,7 +1954,7 @@ ss::future<> rm_stm::apply_control( } _mem_state.forget(pid); - _log_state.forget(pid); + expiration.erase(pid); if ( _log_state.aborted.size() > _abort_index_segment_size @@ -1968,7 +1973,7 @@ ss::future<> rm_stm::apply_control( } _mem_state.forget(pid); - _log_state.forget(pid); + expiration.erase(pid); } co_return; diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index b9f2cfa8f457..cd9e0f35d667 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -396,10 +396,6 @@ class rm_stm final : public persisted_stm { absl::flat_hash_map inflight; absl::flat_hash_map expiration; - - void forget(model::producer_identity pid) { - expiration.erase(pid); - } }; struct mem_state { From 515103779dd84b6350ded2604ca02323eaa8f43e Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Wed, 12 Oct 2022 15:14:49 -0700 Subject: [PATCH 18/19] fix replicate_tx --- src/v/cluster/rm_stm.cc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 7a203becbbf5..18de56686de0 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -1161,8 +1161,7 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { if (_mem_state.estimated.contains(bid.pid)) { // we received second produce request while the first is still - // being processed. this is highly unlikely situation because - // we replicate with ack=1 and it should be fast + // being processed. vlog( _ctx_log.warn, "Too frequent produce with same pid:{}", bid.pid); co_return errc::generic_tx_error; @@ -1234,13 +1233,13 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) { set_seq(bid, new_offset); + _mem_state.estimated.erase(bid.pid); + if (!is_transaction_ga()) { - // TODO: check what happens on re-election if (!_mem_state.tx_start.contains(bid.pid)) { auto base_offset = model::offset(old_offset() - (bid.record_count - 1)); _mem_state.tx_start.emplace(bid.pid, base_offset); _mem_state.tx_starts.insert(base_offset); - _mem_state.estimated.erase(bid.pid); } } From 6c8f249e57b0197c099c79ef5b4248e4d3db0b2e Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Wed, 12 Oct 2022 15:24:09 -0700 Subject: [PATCH 19/19] fix commit --- src/v/cluster/rm_stm.cc | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 18de56686de0..660ceca3cdc9 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -572,7 +572,6 @@ ss::future rm_stm::do_commit_tx( } } - // checking fencing from prepared phase auto fence_it = _log_state.fence_pid_epoch.find(pid.get_id()); if (fence_it == _log_state.fence_pid_epoch.end()) { // begin_tx should have set a fence @@ -589,12 +588,11 @@ ss::future rm_stm::do_commit_tx( std::optional tx_seq_for_pid; if (is_transaction_ga()) { - // We can't validate a request because the is_transaction_ga - // switch may happen mid tx execution and in this case we risk - // to deadlock the system; please uncomment in 23.1 release - // if (!_log_state.tx_seqs.contains(bid.pid)) { - // co_return errc::invalid_producer_epoch; - // } + if (_log_state.tx_seqs.contains(bid.pid)) { + if (tx_seq != _log_state.tx_seqs[bid.pid]) { + co_return errc::request_rejected; + } + } } else { auto preparing_it = _mem_state.preparing.find(pid);