Skip to content

Commit

Permalink
KAFKA-16815: Handle FencedInstanceId in HB response (apache#16047)
Browse files Browse the repository at this point in the history
Handle FencedInstanceIdException that a consumer may receive in the heartbeat response. This will be the case when a static consumer is removed from the group by and admin client, and another member joins with the same group.instance.id (allowed in). The first member will receive a FencedInstanceId on its next heartbeat. The expectation is that this should be handled as a fatal error.

There are no actual changes in logic with this PR, given that without being handled, the FencedInstanceId was being treated as an "unexpected error", which are all treated as fatal errors, so the outcome remains the same. But we're introducing this small change just for accuracy in the logic and the logs: FencedInstanceId is expected during heartbeat, a log line is shown describing the situation and why it happened (and it's treated as a fatal error, just like it was before this PR).

This PR also improves the test to ensure that the error propagated to the app thread matches the one received in the HB.

Reviewers: Andrew Schofield <[email protected]>, David Jacot <[email protected]>
  • Loading branch information
lianetm authored and TaiJuWu committed Jun 8, 2024
1 parent dbe81b4 commit 41d0321
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,15 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
case UNRELEASED_INSTANCE_ID:
logger.error("GroupHeartbeatRequest failed due to unreleased instance id {}: {}",
membershipManager.groupInstanceId().orElse("null"), errorMessage);
handleFatalFailure(Errors.UNRELEASED_INSTANCE_ID.exception(errorMessage));
handleFatalFailure(error.exception(errorMessage));
break;

case FENCED_INSTANCE_ID:
logger.error("GroupHeartbeatRequest failed due to fenced instance id {}: {}. " +
"This is expected in the case that the member was removed from the group " +
"by an admin client, and another member joined using the same group instance id.",
membershipManager.groupInstanceId().orElse("null"), errorMessage);
handleFatalFailure(error.exception(errorMessage));
break;

case INVALID_REQUEST:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.HeartbeatState;
import org.apache.kafka.clients.consumer.internals.MembershipManager.LocalAssignment;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;

import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -483,8 +485,7 @@ public void testHeartbeatResponseOnErrorHandling(final Errors error, final boole
break;
default:
if (isFatal) {
// The memberStateManager should have stopped heartbeat at this point
ensureFatalError();
ensureFatalError(error);
} else {
verify(backgroundEventHandler, never()).add(any());
assertNextHeartbeatTiming(0);
Expand Down Expand Up @@ -781,9 +782,15 @@ private void mockStableMember() {
assertEquals(MemberState.STABLE, membershipManager.state());
}

private void ensureFatalError() {
private void ensureFatalError(Errors expectedError) {
verify(membershipManager).transitionToFatal();
verify(backgroundEventHandler).add(any());

final ArgumentCaptor<ErrorEvent> errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture());
ErrorEvent errorEvent = errorEventArgumentCaptor.getValue();
assertInstanceOf(expectedError.exception().getClass(), errorEvent.error(),
"The fatal error propagated to the app thread does not match the error received in the heartbeat response.");

ensureHeartbeatStopped();
}

Expand All @@ -808,6 +815,7 @@ private static Collection<Arguments> errorProvider() {
Arguments.of(Errors.UNSUPPORTED_ASSIGNOR, true),
Arguments.of(Errors.UNSUPPORTED_VERSION, true),
Arguments.of(Errors.UNRELEASED_INSTANCE_ID, true),
Arguments.of(Errors.FENCED_INSTANCE_ID, true),
Arguments.of(Errors.GROUP_MAX_SIZE_REACHED, true));
}

Expand Down

0 comments on commit 41d0321

Please sign in to comment.