Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.1.x] Fix txn consume group issues leading to undefined behavior #12006

Merged
42 changes: 35 additions & 7 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ group::group(
kafka::group_id id,
group_state s,
config::configuration& conf,
ss::lw_shared_ptr<ssx::rwlock> catchup_lock,
ss::lw_shared_ptr<cluster::partition> partition,
model::term_id term,
ss::sharded<cluster::tx_gateway_frontend>& tx_frontend,
ss::sharded<features::feature_table>& feature_table,
group_metadata_serializer serializer,
Expand All @@ -60,11 +62,13 @@ group::group(
, _num_members_joining(0)
, _new_member_added(false)
, _conf(conf)
, _catchup_lock(std::move(catchup_lock))
, _partition(std::move(partition))
, _probe(_members, _static_members, _offsets)
, _ctxlog(klog, *this)
, _ctx_txlog(cluster::txlog, *this)
, _md_serializer(std::move(serializer))
, _term(term)
, _enable_group_metrics(group_metrics)
, _abort_interval_ms(config::shard_local_cfg()
.abort_timed_out_transactions_interval_ms.value())
Expand All @@ -81,7 +85,9 @@ group::group(
kafka::group_id id,
group_metadata_value& md,
config::configuration& conf,
ss::lw_shared_ptr<ssx::rwlock> catchup_lock,
ss::lw_shared_ptr<cluster::partition> partition,
model::term_id term,
ss::sharded<cluster::tx_gateway_frontend>& tx_frontend,
ss::sharded<features::feature_table>& feature_table,
group_metadata_serializer serializer,
Expand All @@ -99,11 +105,13 @@ group::group(
, _leader(md.leader)
, _new_member_added(false)
, _conf(conf)
, _catchup_lock(std::move(catchup_lock))
, _partition(std::move(partition))
, _probe(_members, _static_members, _offsets)
, _ctxlog(klog, *this)
, _ctx_txlog(cluster::txlog, *this)
, _md_serializer(std::move(serializer))
, _term(term)
, _enable_group_metrics(group_metrics)
, _abort_interval_ms(config::shard_local_cfg()
.abort_timed_out_transactions_interval_ms.value())
Expand Down Expand Up @@ -1653,6 +1661,10 @@ void group::fail_offset_commit(
void group::reset_tx_state(model::term_id term) {
_term = term;
_volatile_txs.clear();
_prepared_txs.clear();
_expiration_info.clear();
_tx_seqs.clear();
_fence_pid_epoch.clear();
}

void group::insert_prepared(prepared_tx tx) {
Expand Down Expand Up @@ -1683,14 +1695,14 @@ group::commit_tx(cluster::commit_group_tx_request r) {
if (fence_it == _fence_pid_epoch.end()) {
vlog(
_ctx_txlog.warn,
"Can't prepare tx: fence with pid {} isn't set",
"Can't commit tx: fence with pid {} isn't set",
r.pid);
co_return make_commit_tx_reply(cluster::tx_errc::request_rejected);
}
if (r.pid.get_epoch() != fence_it->second) {
vlog(
_ctx_txlog.trace,
"Can't prepare tx with pid {} - the fence doesn't match {}",
"Can't commit tx with pid {} - the fence doesn't match {}",
r.pid,
fence_it->second);
co_return make_commit_tx_reply(cluster::tx_errc::request_rejected);
Expand Down Expand Up @@ -1879,17 +1891,22 @@ group::begin_tx(cluster::begin_group_tx_request r) {

auto reader = model::make_memory_record_batch_reader(
std::move(batch.value()));
auto e = co_await _partition->raft()->replicate(
auto res = co_await _partition->raft()->replicate(
_term,
std::move(reader),
raft::replicate_options(raft::consistency_level::quorum_ack));

if (!e) {
if (!res) {
vlog(
_ctx_txlog.warn,
"Error \"{}\" on replicating pid:{} fencing batch",
e.error(),
res.error(),
r.pid);
if (
_partition->raft()->is_leader()
&& _partition->raft()->term() == _term) {
co_await _partition->raft()->step_down("group begin_tx failed");
}
co_return make_begin_tx_reply(cluster::tx_errc::leader_not_found);
}

Expand All @@ -1900,9 +1917,9 @@ group::begin_tx(cluster::begin_group_tx_request r) {
_volatile_txs[r.pid] = volatile_tx{.tx_seq = r.tx_seq};
}

auto res = _expiration_info.insert_or_assign(
auto [it, _] = _expiration_info.insert_or_assign(
r.pid, expiration_info(r.timeout));
try_arm(res.first->second.deadline());
try_arm(it->second.deadline());

cluster::begin_group_tx_reply reply;
reply.etag = _term;
Expand Down Expand Up @@ -2235,6 +2252,12 @@ group::store_txn_offsets(txn_offset_commit_request r) {
raft::replicate_options(raft::consistency_level::quorum_ack));

if (!e) {
if (
_partition->raft()->is_leader()
&& _partition->raft()->term() == _term) {
co_await _partition->raft()->step_down(
"group store_txn_offsets failed");
}
co_return txn_offset_commit_response(
r, error_code::unknown_server_error);
}
Expand Down Expand Up @@ -3186,6 +3209,11 @@ void group::maybe_rearm_timer() {
}

ss::future<> group::do_abort_old_txes() {
auto unit = _catchup_lock->attempt_read_lock();
if (!unit) {
co_return;
}

std::vector<model::producer_identity> pids;
for (auto& [id, _] : _prepared_txs) {
pids.push_back(id);
Expand Down
6 changes: 6 additions & 0 deletions src/v/kafka/server/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "model/timestamp.h"
#include "seastarx.h"
#include "utils/mutex.h"
#include "utils/rwlock.h"

#include <seastar/core/future.hh>
#include <seastar/core/lowres_clock.hh>
Expand Down Expand Up @@ -196,7 +197,9 @@ class group final : public ss::enable_lw_shared_from_this<group> {
kafka::group_id id,
group_state s,
config::configuration& conf,
ss::lw_shared_ptr<ssx::rwlock> catchup_lock,
ss::lw_shared_ptr<cluster::partition> partition,
model::term_id,
ss::sharded<cluster::tx_gateway_frontend>& tx_frontend,
ss::sharded<features::feature_table>&,
group_metadata_serializer,
Expand All @@ -207,7 +210,9 @@ class group final : public ss::enable_lw_shared_from_this<group> {
kafka::group_id id,
group_metadata_value& md,
config::configuration& conf,
ss::lw_shared_ptr<ssx::rwlock> catchup_lock,
ss::lw_shared_ptr<cluster::partition> partition,
model::term_id,
ss::sharded<cluster::tx_gateway_frontend>& tx_frontend,
ss::sharded<features::feature_table>&,
group_metadata_serializer,
Expand Down Expand Up @@ -898,6 +903,7 @@ class group final : public ss::enable_lw_shared_from_this<group> {
ss::timer<clock_type> _join_timer;
bool _new_member_added;
config::configuration& _conf;
ss::lw_shared_ptr<ssx::rwlock> _catchup_lock;
ss::lw_shared_ptr<cluster::partition> _partition;
absl::node_hash_map<
model::topic_partition,
Expand Down
Loading