Skip to content

Commit

Permalink
k/group_manager: return not_coordinator quickly in tx operations
Browse files Browse the repository at this point in the history
group_manager::attached_partition::catchup_lock can get blocked for
extended periods of time. For example in the following scenario:
1. consumer_offsets partition leader gets isolated
2. some group operation acquires a read lock and tries to replicate a
  batch to the consumer_offsets partition. This operation hangs for an
  indefinite period of time.
3. the consumer_offsets leader steps down
4. group state cleanup gets triggered, tries to acquire a write lock,
  hangs until (2) finishes

Meanwhile, clients trying to perform any tx group operations will get a
coordinator_load_in_progress errors and blindly retry, without even
trying to find the real coordinator.

Check for leadership without the read lock first to prevent that (this
is basically a "double-check" pattern as we have to check the second
time under the lock.)
  • Loading branch information
ztlpn committed Sep 3, 2024
1 parent 0c5c702 commit 440ed2c
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions src/v/kafka/server/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1273,7 +1273,11 @@ group_manager::leave_group(leave_group_request&& r) {
ss::future<txn_offset_commit_response>
group_manager::txn_offset_commit(txn_offset_commit_request&& r) {
auto p = get_attached_partition(r.ntp);
if (!p || !p->catchup_lock->try_read_lock()) {
if (!p || !p->partition->is_leader()) {
return ss::make_ready_future<txn_offset_commit_response>(
txn_offset_commit_response(r, error_code::not_coordinator));
}
if (!p->catchup_lock->try_read_lock()) {
// transaction operations can't run in parallel with loading
// state from the log (happens once per term change)
vlog(
Expand Down Expand Up @@ -1324,7 +1328,11 @@ group_manager::txn_offset_commit(txn_offset_commit_request&& r) {
ss::future<cluster::commit_group_tx_reply>
group_manager::commit_tx(cluster::commit_group_tx_request&& r) {
auto p = get_attached_partition(r.ntp);
if (!p || !p->catchup_lock->try_read_lock()) {
if (!p || !p->partition->is_leader()) {
return ss::make_ready_future<cluster::commit_group_tx_reply>(
make_commit_tx_reply(cluster::tx::errc::not_coordinator));
}
if (!p->catchup_lock->try_read_lock()) {
// transaction operations can't run in parallel with loading
// state from the log (happens once per term change)
vlog(
Expand Down Expand Up @@ -1364,7 +1372,11 @@ group_manager::commit_tx(cluster::commit_group_tx_request&& r) {
ss::future<cluster::begin_group_tx_reply>
group_manager::begin_tx(cluster::begin_group_tx_request&& r) {
auto p = get_attached_partition(r.ntp);
if (!p || !p->catchup_lock->try_read_lock()) {
if (!p || !p->partition->is_leader()) {
return ss::make_ready_future<cluster::begin_group_tx_reply>(
make_begin_tx_reply(cluster::tx::errc::not_coordinator));
}
if (!p->catchup_lock->try_read_lock()) {
// transaction operations can't run in parallel with loading
// state from the log (happens once per term change)
vlog(
Expand Down Expand Up @@ -1412,7 +1424,11 @@ group_manager::begin_tx(cluster::begin_group_tx_request&& r) {
ss::future<cluster::abort_group_tx_reply>
group_manager::abort_tx(cluster::abort_group_tx_request&& r) {
auto p = get_attached_partition(r.ntp);
if (!p || !p->catchup_lock->try_read_lock()) {
if (!p || !p->partition->is_leader()) {
return ss::make_ready_future<cluster::abort_group_tx_reply>(
make_abort_tx_reply(cluster::tx::errc::not_coordinator));
}
if (!p->catchup_lock->try_read_lock()) {
// transaction operations can't run in parallel with loading
// state from the log (happens once per term change)
vlog(
Expand Down

0 comments on commit 440ed2c

Please sign in to comment.