From 6250ee9cc7109ed663589ba5a199de48fe032092 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Fri, 26 May 2023 22:57:14 -0700 Subject: [PATCH 1/9] txn: fix incorrect log messages (cherry picked from commit 7debd0160cd1da35a4a35317618533c0bc152731) --- src/v/kafka/server/group.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index a4b474342102..2017ffbf8f92 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -1683,14 +1683,14 @@ group::commit_tx(cluster::commit_group_tx_request r) { if (fence_it == _fence_pid_epoch.end()) { vlog( _ctx_txlog.warn, - "Can't prepare tx: fence with pid {} isn't set", + "Can't commit tx: fence with pid {} isn't set", r.pid); co_return make_commit_tx_reply(cluster::tx_errc::request_rejected); } if (r.pid.get_epoch() != fence_it->second) { vlog( _ctx_txlog.trace, - "Can't prepare tx with pid {} - the fence doesn't match {}", + "Can't commit tx with pid {} - the fence doesn't match {}", r.pid, fence_it->second); co_return make_commit_tx_reply(cluster::tx_errc::request_rejected); From a1aa4bd8bc965947ee0a1bb7028a080ce97323a3 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Sat, 27 May 2023 23:08:48 -0700 Subject: [PATCH 2/9] txn: update abort_old_txes to use locks An execution of abort_old_txes could span multiple terms so the so the method could modify new state assuming it's the old state resulting in undefined behavior (cherry picked from commit f7fc026f3203da167acc0e4c95b86f2e7e082dec) --- src/v/kafka/server/group.cc | 15 +++++++++++---- src/v/kafka/server/group.h | 18 ++++++++++++++++-- src/v/kafka/server/group_manager.cc | 10 +++++----- src/v/kafka/server/group_manager.h | 13 ------------- 4 files changed, 32 insertions(+), 24 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 2017ffbf8f92..9b2aae9a4f1f 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -48,7 +48,7 @@ group::group( kafka::group_id id, group_state s, config::configuration& conf, - ss::lw_shared_ptr partition, + ss::lw_shared_ptr p, ss::sharded& tx_frontend, ss::sharded& feature_table, group_metadata_serializer serializer, @@ -60,7 +60,8 @@ group::group( , _num_members_joining(0) , _new_member_added(false) , _conf(conf) - , _partition(std::move(partition)) + , _p(p) + , _partition(p != nullptr ? p->partition : nullptr) , _probe(_members, _static_members, _offsets) , _ctxlog(klog, *this) , _ctx_txlog(cluster::txlog, *this) @@ -81,7 +82,7 @@ group::group( kafka::group_id id, group_metadata_value& md, config::configuration& conf, - ss::lw_shared_ptr partition, + ss::lw_shared_ptr p, ss::sharded& tx_frontend, ss::sharded& feature_table, group_metadata_serializer serializer, @@ -99,7 +100,8 @@ group::group( , _leader(md.leader) , _new_member_added(false) , _conf(conf) - , _partition(std::move(partition)) + , _p(p) + , _partition(p->partition) , _probe(_members, _static_members, _offsets) , _ctxlog(klog, *this) , _ctx_txlog(cluster::txlog, *this) @@ -1651,6 +1653,9 @@ void group::fail_offset_commit( } void group::reset_tx_state(model::term_id term) { + // must be invoked under catchup_lock.hold_write_lock() + // all other tx methods should use catchup_lock.hold_read_lock() + // to avoid modifying the state of the executing tx methods _term = term; _volatile_txs.clear(); } @@ -3186,6 +3191,8 @@ void group::maybe_rearm_timer() { } ss::future<> group::do_abort_old_txes() { + auto units = co_await _p->catchup_lock.hold_read_lock(); + std::vector pids; for (auto& [id, _] : _prepared_txs) { pids.push_back(id); diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index 5b7139dbffcc..a73d506c9570 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -50,6 +50,19 @@ struct configuration; namespace kafka { struct group_log_group_metadata; +struct attached_partition { + bool loading; + ssx::semaphore sem{1, "k/group-mgr"}; + ss::abort_source as; + ss::lw_shared_ptr partition; + ss::basic_rwlock<> catchup_lock; + model::term_id term{-1}; + + explicit attached_partition(ss::lw_shared_ptr p) + : loading(true) + , partition(std::move(p)) {} +}; + /** * \defgroup kafka-groups Kafka group membership API * @@ -196,7 +209,7 @@ class group final : public ss::enable_lw_shared_from_this { kafka::group_id id, group_state s, config::configuration& conf, - ss::lw_shared_ptr partition, + ss::lw_shared_ptr, ss::sharded& tx_frontend, ss::sharded&, group_metadata_serializer, @@ -207,7 +220,7 @@ class group final : public ss::enable_lw_shared_from_this { kafka::group_id id, group_metadata_value& md, config::configuration& conf, - ss::lw_shared_ptr partition, + ss::lw_shared_ptr, ss::sharded& tx_frontend, ss::sharded&, group_metadata_serializer, @@ -898,6 +911,7 @@ class group final : public ss::enable_lw_shared_from_this { ss::timer _join_timer; bool _new_member_added; config::configuration& _conf; + ss::lw_shared_ptr _p; ss::lw_shared_ptr _partition; absl::node_hash_map< model::topic_partition, diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 43b810fa17e6..c53ba4b8e080 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -715,7 +715,7 @@ ss::future<> group_manager::do_recover_group( group_id, group_stm.get_metadata(), _conf, - p->partition, + p, _tx_frontend, _feature_table, _serializer_factory(), @@ -910,7 +910,7 @@ group::join_group_stages group_manager::join_group(join_group_request&& r) { return group::join_group_stages( make_join_error(r.data.member_id, error_code::not_coordinator)); } - auto p = it->second->partition; + auto p = it->second; group = ss::make_lw_shared( r.data.group_id, group_state::empty, @@ -1063,7 +1063,7 @@ group_manager::txn_offset_commit(txn_offset_commit_request&& r) { r.data.group_id, group_state::empty, _conf, - p->partition, + p, _tx_frontend, _feature_table, _serializer_factory(), @@ -1149,7 +1149,7 @@ group_manager::begin_tx(cluster::begin_group_tx_request&& r) { r.group_id, group_state::empty, _conf, - p->partition, + p, _tx_frontend, _feature_table, _serializer_factory(), @@ -1257,7 +1257,7 @@ group_manager::offset_commit(offset_commit_request&& r) { r.data.group_id, group_state::empty, _conf, - p->partition, + p, _tx_frontend, _feature_table, _serializer_factory(), diff --git a/src/v/kafka/server/group_manager.h b/src/v/kafka/server/group_manager.h index 6dba342f0702..e09e99446eff 100644 --- a/src/v/kafka/server/group_manager.h +++ b/src/v/kafka/server/group_manager.h @@ -203,19 +203,6 @@ class group_manager { void detach_partition(const model::ntp&); ss::future<> do_detach_partition(model::ntp); - struct attached_partition { - bool loading; - ssx::semaphore sem{1, "k/group-mgr"}; - ss::abort_source as; - ss::lw_shared_ptr partition; - ss::basic_rwlock<> catchup_lock; - model::term_id term{-1}; - - explicit attached_partition(ss::lw_shared_ptr p) - : loading(true) - , partition(std::move(p)) {} - }; - cluster::notification_id_type _leader_notify_handle; cluster::notification_id_type _topic_table_notify_handle; From 3fe916a99af8bf269d563a0748e947619ac6b3cb Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Sun, 28 May 2023 00:02:49 -0700 Subject: [PATCH 3/9] txn: make group accept term Make group accept term to reduce scope of where reset_tx_state is used to easier track where the write lock is necessary (cherry picked from commit 93297d5d81c1cffaaa2a965abf94d180a79d55a9) --- src/v/kafka/server/group.cc | 4 ++++ src/v/kafka/server/group.h | 2 ++ src/v/kafka/server/group_manager.cc | 10 +++++----- src/v/kafka/server/tests/group_test.cc | 1 + 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 9b2aae9a4f1f..9eb5bb5996b7 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -49,6 +49,7 @@ group::group( group_state s, config::configuration& conf, ss::lw_shared_ptr p, + model::term_id term, ss::sharded& tx_frontend, ss::sharded& feature_table, group_metadata_serializer serializer, @@ -66,6 +67,7 @@ group::group( , _ctxlog(klog, *this) , _ctx_txlog(cluster::txlog, *this) , _md_serializer(std::move(serializer)) + , _term(term) , _enable_group_metrics(group_metrics) , _abort_interval_ms(config::shard_local_cfg() .abort_timed_out_transactions_interval_ms.value()) @@ -83,6 +85,7 @@ group::group( group_metadata_value& md, config::configuration& conf, ss::lw_shared_ptr p, + model::term_id term, ss::sharded& tx_frontend, ss::sharded& feature_table, group_metadata_serializer serializer, @@ -106,6 +109,7 @@ group::group( , _ctxlog(klog, *this) , _ctx_txlog(cluster::txlog, *this) , _md_serializer(std::move(serializer)) + , _term(term) , _enable_group_metrics(group_metrics) , _abort_interval_ms(config::shard_local_cfg() .abort_timed_out_transactions_interval_ms.value()) diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index a73d506c9570..23f8f5bd7ea6 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -210,6 +210,7 @@ class group final : public ss::enable_lw_shared_from_this { group_state s, config::configuration& conf, ss::lw_shared_ptr, + model::term_id, ss::sharded& tx_frontend, ss::sharded&, group_metadata_serializer, @@ -221,6 +222,7 @@ class group final : public ss::enable_lw_shared_from_this { group_metadata_value& md, config::configuration& conf, ss::lw_shared_ptr, + model::term_id, ss::sharded& tx_frontend, ss::sharded&, group_metadata_serializer, diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index c53ba4b8e080..9d0673439ab8 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -716,11 +716,11 @@ ss::future<> group_manager::do_recover_group( group_stm.get_metadata(), _conf, p, + term, _tx_frontend, _feature_table, _serializer_factory(), _enable_group_metrics); - group->reset_tx_state(term); _groups.emplace(group_id, group); group->reschedule_all_member_heartbeats(); } @@ -916,11 +916,11 @@ group::join_group_stages group_manager::join_group(join_group_request&& r) { group_state::empty, _conf, p, + it->second->term, _tx_frontend, _feature_table, _serializer_factory(), _enable_group_metrics); - group->reset_tx_state(it->second->term); _groups.emplace(r.data.group_id, group); _groups.rehash(0); is_new_group = true; @@ -1064,11 +1064,11 @@ group_manager::txn_offset_commit(txn_offset_commit_request&& r) { group_state::empty, _conf, p, + p->term, _tx_frontend, _feature_table, _serializer_factory(), _enable_group_metrics); - group->reset_tx_state(p->term); _groups.emplace(r.data.group_id, group); _groups.rehash(0); } @@ -1150,11 +1150,11 @@ group_manager::begin_tx(cluster::begin_group_tx_request&& r) { group_state::empty, _conf, p, + p->term, _tx_frontend, _feature_table, _serializer_factory(), _enable_group_metrics); - group->reset_tx_state(p->term); _groups.emplace(r.group_id, group); _groups.rehash(0); } @@ -1258,11 +1258,11 @@ group_manager::offset_commit(offset_commit_request&& r) { group_state::empty, _conf, p, + p->term, _tx_frontend, _feature_table, _serializer_factory(), _enable_group_metrics); - group->reset_tx_state(p->term); _groups.emplace(r.data.group_id, group); _groups.rehash(0); } else { diff --git a/src/v/kafka/server/tests/group_test.cc b/src/v/kafka/server/tests/group_test.cc index 8022229968b5..77d1992d9c71 100644 --- a/src/v/kafka/server/tests/group_test.cc +++ b/src/v/kafka/server/tests/group_test.cc @@ -52,6 +52,7 @@ static group get() { group_state::empty, conf, nullptr, + model::term_id(), fr, feature_table, make_consumer_offsets_serializer(), From f68ff7bb81074a11c8bdda7b59cb7fff6923fb57 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Sun, 28 May 2023 00:20:11 -0700 Subject: [PATCH 4/9] txn: completely reset group state on term change When the consumer group log's term change we replay the whole log to reconstruct the state. We used to merge current and the replayed state but it's error prone. Reseting the whole txn state to have more deter- ministic behavior (cherry picked from commit 69c5392872cbc46e0c0d532ed80a8dabe6266825) --- src/v/kafka/server/group.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 9eb5bb5996b7..9f135f6387b6 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -1662,6 +1662,10 @@ void group::reset_tx_state(model::term_id term) { // to avoid modifying the state of the executing tx methods _term = term; _volatile_txs.clear(); + _prepared_txs.clear(); + _expiration_info.clear(); + _tx_seqs.clear(); + _fence_pid_epoch.clear(); } void group::insert_prepared(prepared_tx tx) { From adaadce5cd706eaf079550f8f635529111602209 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Sun, 28 May 2023 06:05:48 -0700 Subject: [PATCH 5/9] txn: step down on begin_tx & store_txn_offsets failures Transactions in kafka protocol are stateful: the processing of the requests depends on the previous commands executed by the same or even different producer. It makes the situations when the replica- tion fails with the indecisive errors such as timeout dangerous because the true state is unknown. Stepping down to resolve uncertainty by replaying the log (cherry picked from commit 7ca57077f2a4c3ed592d4a989c11729f1aced332) --- src/v/kafka/server/group.cc | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 9f135f6387b6..6f84917a15c8 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -1892,17 +1892,22 @@ group::begin_tx(cluster::begin_group_tx_request r) { auto reader = model::make_memory_record_batch_reader( std::move(batch.value())); - auto e = co_await _partition->raft()->replicate( + auto res = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); - if (!e) { + if (!res) { vlog( _ctx_txlog.warn, "Error \"{}\" on replicating pid:{} fencing batch", - e.error(), + res.error(), r.pid); + if ( + _partition->raft()->is_leader() + && _partition->raft()->term() == _term) { + co_await _partition->raft()->step_down("group begin_tx failed"); + } co_return make_begin_tx_reply(cluster::tx_errc::leader_not_found); } @@ -1913,9 +1918,9 @@ group::begin_tx(cluster::begin_group_tx_request r) { _volatile_txs[r.pid] = volatile_tx{.tx_seq = r.tx_seq}; } - auto res = _expiration_info.insert_or_assign( + auto [it, _] = _expiration_info.insert_or_assign( r.pid, expiration_info(r.timeout)); - try_arm(res.first->second.deadline()); + try_arm(it->second.deadline()); cluster::begin_group_tx_reply reply; reply.etag = _term; @@ -2248,6 +2253,12 @@ group::store_txn_offsets(txn_offset_commit_request r) { raft::replicate_options(raft::consistency_level::quorum_ack)); if (!e) { + if ( + _partition->raft()->is_leader() + && _partition->raft()->term() == _term) { + co_await _partition->raft()->step_down( + "group store_txn_offsets failed"); + } co_return txn_offset_commit_response( r, error_code::unknown_server_error); } From f5b20172019cf9a7c4fea2bdf1d1367b44343865 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Thu, 15 Jun 2023 16:51:28 -0700 Subject: [PATCH 6/9] Revert "txn: update abort_old_txes to use locks" Fixes: - https://github.com/redpanda-data/redpanda/issues/11372 - https://github.com/redpanda-data/redpanda/issues/11452 - https://github.com/redpanda-data/redpanda/issues/11344 This reverts commit f7fc026f3203da167acc0e4c95b86f2e7e082dec. (cherry picked from commit 6d86fc813fe526112d8aaccc3fda612342df1aab) --- src/v/kafka/server/group.cc | 15 ++++----------- src/v/kafka/server/group.h | 18 ++---------------- src/v/kafka/server/group_manager.cc | 10 +++++----- src/v/kafka/server/group_manager.h | 13 +++++++++++++ 4 files changed, 24 insertions(+), 32 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 6f84917a15c8..1bac84b7788e 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -48,7 +48,7 @@ group::group( kafka::group_id id, group_state s, config::configuration& conf, - ss::lw_shared_ptr p, + ss::lw_shared_ptr partition, model::term_id term, ss::sharded& tx_frontend, ss::sharded& feature_table, @@ -61,8 +61,7 @@ group::group( , _num_members_joining(0) , _new_member_added(false) , _conf(conf) - , _p(p) - , _partition(p != nullptr ? p->partition : nullptr) + , _partition(std::move(partition)) , _probe(_members, _static_members, _offsets) , _ctxlog(klog, *this) , _ctx_txlog(cluster::txlog, *this) @@ -84,7 +83,7 @@ group::group( kafka::group_id id, group_metadata_value& md, config::configuration& conf, - ss::lw_shared_ptr p, + ss::lw_shared_ptr partition, model::term_id term, ss::sharded& tx_frontend, ss::sharded& feature_table, @@ -103,8 +102,7 @@ group::group( , _leader(md.leader) , _new_member_added(false) , _conf(conf) - , _p(p) - , _partition(p->partition) + , _partition(std::move(partition)) , _probe(_members, _static_members, _offsets) , _ctxlog(klog, *this) , _ctx_txlog(cluster::txlog, *this) @@ -1657,9 +1655,6 @@ void group::fail_offset_commit( } void group::reset_tx_state(model::term_id term) { - // must be invoked under catchup_lock.hold_write_lock() - // all other tx methods should use catchup_lock.hold_read_lock() - // to avoid modifying the state of the executing tx methods _term = term; _volatile_txs.clear(); _prepared_txs.clear(); @@ -3210,8 +3205,6 @@ void group::maybe_rearm_timer() { } ss::future<> group::do_abort_old_txes() { - auto units = co_await _p->catchup_lock.hold_read_lock(); - std::vector pids; for (auto& [id, _] : _prepared_txs) { pids.push_back(id); diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index 23f8f5bd7ea6..575966031dc7 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -50,19 +50,6 @@ struct configuration; namespace kafka { struct group_log_group_metadata; -struct attached_partition { - bool loading; - ssx::semaphore sem{1, "k/group-mgr"}; - ss::abort_source as; - ss::lw_shared_ptr partition; - ss::basic_rwlock<> catchup_lock; - model::term_id term{-1}; - - explicit attached_partition(ss::lw_shared_ptr p) - : loading(true) - , partition(std::move(p)) {} -}; - /** * \defgroup kafka-groups Kafka group membership API * @@ -209,7 +196,7 @@ class group final : public ss::enable_lw_shared_from_this { kafka::group_id id, group_state s, config::configuration& conf, - ss::lw_shared_ptr, + ss::lw_shared_ptr partition, model::term_id, ss::sharded& tx_frontend, ss::sharded&, @@ -221,7 +208,7 @@ class group final : public ss::enable_lw_shared_from_this { kafka::group_id id, group_metadata_value& md, config::configuration& conf, - ss::lw_shared_ptr, + ss::lw_shared_ptr partition, model::term_id, ss::sharded& tx_frontend, ss::sharded&, @@ -913,7 +900,6 @@ class group final : public ss::enable_lw_shared_from_this { ss::timer _join_timer; bool _new_member_added; config::configuration& _conf; - ss::lw_shared_ptr _p; ss::lw_shared_ptr _partition; absl::node_hash_map< model::topic_partition, diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 9d0673439ab8..1f17cf9d7662 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -715,7 +715,7 @@ ss::future<> group_manager::do_recover_group( group_id, group_stm.get_metadata(), _conf, - p, + p->partition, term, _tx_frontend, _feature_table, @@ -910,7 +910,7 @@ group::join_group_stages group_manager::join_group(join_group_request&& r) { return group::join_group_stages( make_join_error(r.data.member_id, error_code::not_coordinator)); } - auto p = it->second; + auto p = it->second->partition; group = ss::make_lw_shared( r.data.group_id, group_state::empty, @@ -1063,7 +1063,7 @@ group_manager::txn_offset_commit(txn_offset_commit_request&& r) { r.data.group_id, group_state::empty, _conf, - p, + p->partition, p->term, _tx_frontend, _feature_table, @@ -1149,7 +1149,7 @@ group_manager::begin_tx(cluster::begin_group_tx_request&& r) { r.group_id, group_state::empty, _conf, - p, + p->partition, p->term, _tx_frontend, _feature_table, @@ -1257,7 +1257,7 @@ group_manager::offset_commit(offset_commit_request&& r) { r.data.group_id, group_state::empty, _conf, - p, + p->partition, p->term, _tx_frontend, _feature_table, diff --git a/src/v/kafka/server/group_manager.h b/src/v/kafka/server/group_manager.h index e09e99446eff..6dba342f0702 100644 --- a/src/v/kafka/server/group_manager.h +++ b/src/v/kafka/server/group_manager.h @@ -203,6 +203,19 @@ class group_manager { void detach_partition(const model::ntp&); ss::future<> do_detach_partition(model::ntp); + struct attached_partition { + bool loading; + ssx::semaphore sem{1, "k/group-mgr"}; + ss::abort_source as; + ss::lw_shared_ptr partition; + ss::basic_rwlock<> catchup_lock; + model::term_id term{-1}; + + explicit attached_partition(ss::lw_shared_ptr p) + : loading(true) + , partition(std::move(p)) {} + }; + cluster::notification_id_type _leader_notify_handle; cluster::notification_id_type _topic_table_notify_handle; From f8a398c94f370ebc1b9583b4d4a12456fb509f02 Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Fri, 7 Jul 2023 08:26:20 -0700 Subject: [PATCH 7/9] utils: add units-based attempt-lock interface to rwlock (cherry picked from commit 0dabdff4c3534cfeb53c6c0e9a5419b1c04edbf0) --- src/v/utils/rwlock.h | 82 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 src/v/utils/rwlock.h diff --git a/src/v/utils/rwlock.h b/src/v/utils/rwlock.h new file mode 100644 index 000000000000..4152a8eae223 --- /dev/null +++ b/src/v/utils/rwlock.h @@ -0,0 +1,82 @@ +/* + * Copyright 2020 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once +#include "seastarx.h" +#include "ssx/semaphore.h" + +#include + +namespace ssx { + +class rwlock; + +class rwlock_unit { + rwlock* _lock; + bool _is_write_lock{false}; + +public: + rwlock_unit(rwlock* lock, bool is_write_lock) noexcept + : _lock(lock) + , _is_write_lock(is_write_lock) {} + rwlock_unit(rwlock_unit&& token) noexcept + : _lock(token._lock) + , _is_write_lock(token._is_write_lock) { + token._lock = nullptr; + } + rwlock_unit(const rwlock_unit&) = delete; + rwlock_unit(const rwlock_unit&&) = delete; + rwlock_unit() = delete; + + rwlock_unit& operator=(rwlock_unit&& other) noexcept { + if (this != &other) { + other._lock = nullptr; + } + return *this; + } + + ~rwlock_unit() noexcept; +}; + +class rwlock : public ss::basic_rwlock<> { +public: + std::optional attempt_read_lock() { + bool locked = try_read_lock(); + + if (!locked) { + return std::nullopt; + } + + return rwlock_unit(this, false); + } + + std::optional attempt_write_lock() { + bool locked = try_write_lock(); + + if (!locked) { + return std::nullopt; + } + + return rwlock_unit(this, true); + } +}; + +inline rwlock_unit::~rwlock_unit() noexcept { + if (_lock) { + if (_is_write_lock) { + _lock->write_unlock(); + } else { + _lock->read_unlock(); + } + } +} + +} // namespace ssx From f8865d4f7761729987de6168649f45d4a6df8cfb Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Thu, 6 Jul 2023 13:34:26 -0700 Subject: [PATCH 8/9] test: add test for ssx::rwlock (cherry picked from commit 45675ccd8c4c843c1d9e0df0b21fab3727e3233a) --- src/v/utils/tests/CMakeLists.txt | 1 + src/v/utils/tests/rwlock_test.cc | 53 ++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) create mode 100644 src/v/utils/tests/rwlock_test.cc diff --git a/src/v/utils/tests/CMakeLists.txt b/src/v/utils/tests/CMakeLists.txt index a06301726c98..acf2aa2d40f6 100644 --- a/src/v/utils/tests/CMakeLists.txt +++ b/src/v/utils/tests/CMakeLists.txt @@ -14,6 +14,7 @@ rp_test( waiter_queue_test.cc object_pool_test.cc delta_for_test.cc + rwlock_test.cc token_bucket_test.cc uuid_test.cc seastar_histogram_test.cc diff --git a/src/v/utils/tests/rwlock_test.cc b/src/v/utils/tests/rwlock_test.cc new file mode 100644 index 000000000000..d97c269f976a --- /dev/null +++ b/src/v/utils/tests/rwlock_test.cc @@ -0,0 +1,53 @@ +#include "seastarx.h" +#include "ssx/semaphore.h" +#include "utils/rwlock.h" + +#include +#include +#include + +SEASTAR_THREAD_TEST_CASE(test_rwlock_r_blocks_w) { + ssx::rwlock rwlock; + auto runit = rwlock.attempt_read_lock(); + BOOST_REQUIRE(runit); + auto wunit = rwlock.attempt_write_lock(); + BOOST_REQUIRE(!wunit); +} + +SEASTAR_THREAD_TEST_CASE(test_rwlock_w_blocks_r) { + ssx::rwlock rwlock; + auto wunit = rwlock.attempt_write_lock(); + BOOST_REQUIRE(wunit); + auto runit = rwlock.attempt_read_lock(); + BOOST_REQUIRE(!runit); +} + +SEASTAR_THREAD_TEST_CASE(test_rwlock_r_permits_r) { + ssx::rwlock rwlock; + auto unit1 = rwlock.attempt_read_lock(); + BOOST_REQUIRE(unit1); + auto unit2 = rwlock.attempt_read_lock(); + BOOST_REQUIRE(unit2); +} + +SEASTAR_THREAD_TEST_CASE(test_rwlock_w_blocks_w) { + ssx::rwlock rwlock; + auto unit1 = rwlock.attempt_write_lock(); + BOOST_REQUIRE(unit1); + auto unit2 = rwlock.attempt_write_lock(); + BOOST_REQUIRE(!unit2); +} + +void some_processing(ssx::rwlock_unit&& unit) { + ssx::rwlock_unit unit3(std::move(unit)); +} + +SEASTAR_THREAD_TEST_CASE(test_rwlock_unit_move) { + ssx::rwlock rwlock; + auto unit1 = rwlock.attempt_write_lock(); + BOOST_REQUIRE(unit1); + some_processing(std::move(unit1.value())); + + auto unit2 = rwlock.attempt_write_lock(); + BOOST_REQUIRE(unit2); +} From 15f575f87d6f4087117b73be29c8a2f79bb1996f Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Thu, 15 Jun 2023 19:25:22 -0700 Subject: [PATCH 9/9] txn: update abort_old_txes to use locks An execution of abort_old_txes could span multiple terms so the so the method could modify new state assuming it's the old state resulting in undefined behavior. This commit is the rewrite of the reverted f7fc026 in 11474. The pro- blem was caused by: - do_detach_partition got blocked - RP ignored blocked do_detach_partition and attempted next op leading double registration of the consumer groups ntp The op was blocked by a deadlock: - do_abort_old_txes was waiting for read lock while holding _gate - do_detach_partition was holding write lock while waiting to the gate to be closed This version doesn't wait for the read lock to become available and exit do_abort_old_txes releasing the _gate. It still isn't clear why RP ignored a blocked op (cherry picked from commit 0b9b9fb1b2bfddbe26582831cbe6584d6e1fa32f) --- src/v/kafka/server/group.cc | 9 ++++ src/v/kafka/server/group.h | 4 ++ src/v/kafka/server/group_manager.cc | 57 ++++++++++++++------------ src/v/kafka/server/group_manager.h | 7 +++- src/v/kafka/server/tests/group_test.cc | 1 + 5 files changed, 49 insertions(+), 29 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 1bac84b7788e..4e3c39f16e60 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -48,6 +48,7 @@ group::group( kafka::group_id id, group_state s, config::configuration& conf, + ss::lw_shared_ptr catchup_lock, ss::lw_shared_ptr partition, model::term_id term, ss::sharded& tx_frontend, @@ -61,6 +62,7 @@ group::group( , _num_members_joining(0) , _new_member_added(false) , _conf(conf) + , _catchup_lock(std::move(catchup_lock)) , _partition(std::move(partition)) , _probe(_members, _static_members, _offsets) , _ctxlog(klog, *this) @@ -83,6 +85,7 @@ group::group( kafka::group_id id, group_metadata_value& md, config::configuration& conf, + ss::lw_shared_ptr catchup_lock, ss::lw_shared_ptr partition, model::term_id term, ss::sharded& tx_frontend, @@ -102,6 +105,7 @@ group::group( , _leader(md.leader) , _new_member_added(false) , _conf(conf) + , _catchup_lock(std::move(catchup_lock)) , _partition(std::move(partition)) , _probe(_members, _static_members, _offsets) , _ctxlog(klog, *this) @@ -3205,6 +3209,11 @@ void group::maybe_rearm_timer() { } ss::future<> group::do_abort_old_txes() { + auto unit = _catchup_lock->attempt_read_lock(); + if (!unit) { + co_return; + } + std::vector pids; for (auto& [id, _] : _prepared_txs) { pids.push_back(id); diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index 575966031dc7..1cdd4a5d6401 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -29,6 +29,7 @@ #include "model/timestamp.h" #include "seastarx.h" #include "utils/mutex.h" +#include "utils/rwlock.h" #include #include @@ -196,6 +197,7 @@ class group final : public ss::enable_lw_shared_from_this { kafka::group_id id, group_state s, config::configuration& conf, + ss::lw_shared_ptr catchup_lock, ss::lw_shared_ptr partition, model::term_id, ss::sharded& tx_frontend, @@ -208,6 +210,7 @@ class group final : public ss::enable_lw_shared_from_this { kafka::group_id id, group_metadata_value& md, config::configuration& conf, + ss::lw_shared_ptr catchup_lock, ss::lw_shared_ptr partition, model::term_id, ss::sharded& tx_frontend, @@ -900,6 +903,7 @@ class group final : public ss::enable_lw_shared_from_this { ss::timer _join_timer; bool _new_member_added; config::configuration& _conf; + ss::lw_shared_ptr _catchup_lock; ss::lw_shared_ptr _partition; absl::node_hash_map< model::topic_partition, diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 1f17cf9d7662..5c40e87450bf 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -401,7 +401,7 @@ ss::future<> group_manager::do_detach_partition(model::ntp ntp) { co_return; } auto p = it->second; - auto units = co_await p->catchup_lock.hold_write_lock(); + auto units = co_await p->catchup_lock->hold_write_lock(); // Becasue shutdown group is async operation we should run it after // rehash for groups map @@ -557,8 +557,7 @@ group_manager::gc_partition_state(ss::lw_shared_ptr p) { * since this operation is destructive for partitions group we hold a * catchup write lock */ - - auto units = co_await p->catchup_lock.hold_write_lock(); + auto units = co_await p->catchup_lock->hold_write_lock(); // Becasue shutdown group is async operation we should run it after rehash // for groups map @@ -615,8 +614,8 @@ ss::future<> group_manager::handle_partition_leader_change( * is rarely contended we take a writer lock only when leadership * changes (infrequent event) */ - return p->catchup_lock.hold_write_lock() - .then([this, term, timeout, p](ss::basic_rwlock<>::holder unit) { + return p->catchup_lock->hold_write_lock().then( + [this, term, timeout, p](ss::basic_rwlock<>::holder unit) { return inject_noop(p->partition, timeout) .then([this, term, timeout, p] { /* @@ -655,8 +654,7 @@ ss::future<> group_manager::handle_partition_leader_change( }); }) .finally([unit = std::move(unit)] {}); - }) - .finally([p] {}); + }); } /* @@ -715,6 +713,7 @@ ss::future<> group_manager::do_recover_group( group_id, group_stm.get_metadata(), _conf, + p->catchup_lock, p->partition, term, _tx_frontend, @@ -915,6 +914,7 @@ group::join_group_stages group_manager::join_group(join_group_request&& r) { r.data.group_id, group_state::empty, _conf, + it->second->catchup_lock, p, it->second->term, _tx_frontend, @@ -1031,7 +1031,7 @@ group_manager::leave_group(leave_group_request&& r) { ss::future group_manager::txn_offset_commit(txn_offset_commit_request&& r) { auto p = get_attached_partition(r.ntp); - if (!p || !p->catchup_lock.try_read_lock()) { + if (!p || !p->catchup_lock->try_read_lock()) { // transaction operations can't run in parallel with loading // state from the log (happens once per term change) vlog( @@ -1041,9 +1041,9 @@ group_manager::txn_offset_commit(txn_offset_commit_request&& r) { txn_offset_commit_response( r, error_code::coordinator_load_in_progress)); } - p->catchup_lock.read_unlock(); + p->catchup_lock->read_unlock(); - return p->catchup_lock.hold_read_lock().then( + return p->catchup_lock->hold_read_lock().then( [this, p, r = std::move(r)](ss::basic_rwlock<>::holder unit) mutable { // TODO: use correct key instead of offset_commit_api::key // check other txn places @@ -1056,13 +1056,14 @@ group_manager::txn_offset_commit(txn_offset_commit_request&& r) { auto group = get_group(r.data.group_id); if (!group) { - // the group is not relying on Kafka for group management, - // so allow the commit + // the group is not relying on Kafka for group + // management, so allow the commit group = ss::make_lw_shared( r.data.group_id, group_state::empty, _conf, + p->catchup_lock, p->partition, p->term, _tx_frontend, @@ -1081,7 +1082,7 @@ group_manager::txn_offset_commit(txn_offset_commit_request&& r) { ss::future group_manager::commit_tx(cluster::commit_group_tx_request&& r) { auto p = get_attached_partition(r.ntp); - if (!p || !p->catchup_lock.try_read_lock()) { + if (!p || !p->catchup_lock->try_read_lock()) { // transaction operations can't run in parallel with loading // state from the log (happens once per term change) vlog( @@ -1090,10 +1091,10 @@ group_manager::commit_tx(cluster::commit_group_tx_request&& r) { return ss::make_ready_future( make_commit_tx_reply(cluster::tx_errc::coordinator_load_in_progress)); } - p->catchup_lock.read_unlock(); + p->catchup_lock->read_unlock(); - return p->catchup_lock.hold_read_lock().then( - [this, r = std::move(r)](ss::basic_rwlock<>::holder unit) mutable { + return p->catchup_lock->hold_read_lock().then( + [this, p, r = std::move(r)](ss::basic_rwlock<>::holder unit) mutable { auto error = validate_group_status( r.ntp, r.group_id, offset_commit_api::key); if (error != error_code::none) { @@ -1120,7 +1121,7 @@ group_manager::commit_tx(cluster::commit_group_tx_request&& r) { ss::future group_manager::begin_tx(cluster::begin_group_tx_request&& r) { auto p = get_attached_partition(r.ntp); - if (!p || !p->catchup_lock.try_read_lock()) { + if (!p || !p->catchup_lock->try_read_lock()) { // transaction operations can't run in parallel with loading // state from the log (happens once per term change) vlog( @@ -1129,9 +1130,9 @@ group_manager::begin_tx(cluster::begin_group_tx_request&& r) { return ss::make_ready_future( make_begin_tx_reply(cluster::tx_errc::coordinator_load_in_progress)); } - p->catchup_lock.read_unlock(); + p->catchup_lock->read_unlock(); - return p->catchup_lock.hold_read_lock().then( + return p->catchup_lock->hold_read_lock().then( [this, p, r = std::move(r)](ss::basic_rwlock<>::holder unit) mutable { auto error = validate_group_status( r.ntp, r.group_id, offset_commit_api::key); @@ -1149,6 +1150,7 @@ group_manager::begin_tx(cluster::begin_group_tx_request&& r) { r.group_id, group_state::empty, _conf, + p->catchup_lock, p->partition, p->term, _tx_frontend, @@ -1167,7 +1169,7 @@ group_manager::begin_tx(cluster::begin_group_tx_request&& r) { ss::future group_manager::prepare_tx(cluster::prepare_group_tx_request&& r) { auto p = get_attached_partition(r.ntp); - if (!p || !p->catchup_lock.try_read_lock()) { + if (!p || !p->catchup_lock->try_read_lock()) { // transaction operations can't run in parallel with loading // state from the log (happens once per term change) vlog( @@ -1177,10 +1179,10 @@ group_manager::prepare_tx(cluster::prepare_group_tx_request&& r) { make_prepare_tx_reply( cluster::tx_errc::coordinator_load_in_progress)); } - p->catchup_lock.read_unlock(); + p->catchup_lock->read_unlock(); - return p->catchup_lock.hold_read_lock().then( - [this, r = std::move(r)](ss::basic_rwlock<>::holder unit) mutable { + return p->catchup_lock->hold_read_lock().then( + [this, p, r = std::move(r)](ss::basic_rwlock<>::holder unit) mutable { auto error = validate_group_status( r.ntp, r.group_id, offset_commit_api::key); if (error != error_code::none) { @@ -1205,7 +1207,7 @@ group_manager::prepare_tx(cluster::prepare_group_tx_request&& r) { ss::future group_manager::abort_tx(cluster::abort_group_tx_request&& r) { auto p = get_attached_partition(r.ntp); - if (!p || !p->catchup_lock.try_read_lock()) { + if (!p || !p->catchup_lock->try_read_lock()) { // transaction operations can't run in parallel with loading // state from the log (happens once per term change) vlog( @@ -1214,10 +1216,10 @@ group_manager::abort_tx(cluster::abort_group_tx_request&& r) { return ss::make_ready_future( make_abort_tx_reply(cluster::tx_errc::coordinator_load_in_progress)); } - p->catchup_lock.read_unlock(); + p->catchup_lock->read_unlock(); - return p->catchup_lock.hold_read_lock().then( - [this, r = std::move(r)](ss::basic_rwlock<>::holder unit) mutable { + return p->catchup_lock->hold_read_lock().then( + [this, p, r = std::move(r)](ss::basic_rwlock<>::holder unit) mutable { auto error = validate_group_status( r.ntp, r.group_id, offset_commit_api::key); if (error != error_code::none) { @@ -1257,6 +1259,7 @@ group_manager::offset_commit(offset_commit_request&& r) { r.data.group_id, group_state::empty, _conf, + p->catchup_lock, p->partition, p->term, _tx_frontend, diff --git a/src/v/kafka/server/group_manager.h b/src/v/kafka/server/group_manager.h index 6dba342f0702..5e1022f16953 100644 --- a/src/v/kafka/server/group_manager.h +++ b/src/v/kafka/server/group_manager.h @@ -32,6 +32,7 @@ #include "raft/group_manager.h" #include "seastarx.h" #include "ssx/semaphore.h" +#include "utils/rwlock.h" #include #include @@ -208,12 +209,14 @@ class group_manager { ssx::semaphore sem{1, "k/group-mgr"}; ss::abort_source as; ss::lw_shared_ptr partition; - ss::basic_rwlock<> catchup_lock; + ss::lw_shared_ptr catchup_lock; model::term_id term{-1}; explicit attached_partition(ss::lw_shared_ptr p) : loading(true) - , partition(std::move(p)) {} + , partition(std::move(p)) { + catchup_lock = ss::make_lw_shared(); + } }; cluster::notification_id_type _leader_notify_handle; diff --git a/src/v/kafka/server/tests/group_test.cc b/src/v/kafka/server/tests/group_test.cc index 77d1992d9c71..9c40ef18b62b 100644 --- a/src/v/kafka/server/tests/group_test.cc +++ b/src/v/kafka/server/tests/group_test.cc @@ -52,6 +52,7 @@ static group get() { group_state::empty, conf, nullptr, + nullptr, model::term_id(), fr, feature_table,