Skip to content

Commit

Permalink
KAFKA-9620: Do not throw in the middle of consumer user callbacks (#8187
Browse files Browse the repository at this point in the history
)

One way of fixing it forward.

Reviewers: Guozhang Wang <[email protected]>
  • Loading branch information
Boyang Chen authored Feb 28, 2020
1 parent 294b629 commit ede0730
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ protected void onJoinComplete(int generation,
ByteBuffer assignmentBuffer) {
log.debug("Executing onJoinComplete with generation {} and memberId {}", generation, memberId);

// only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
// Only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
if (!isLeader)
assignmentSnapshot = null;

Expand Down Expand Up @@ -377,12 +377,12 @@ protected void onJoinComplete(int generation,
);

if (!revokedPartitions.isEmpty()) {
// revoke partitions that were previously owned but no longer assigned;
// Revoke partitions that were previously owned but no longer assigned;
// note that we should only change the assignment (or update the assignor's state)
// AFTER we've triggered the revoke callback
firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));

// if revoked any partitions, need to re-join the group afterwards
// If revoked any partitions, need to re-join the group afterwards
log.debug("Need to revoke partitions {} and re-join the group", revokedPartitions);
requestRejoin();
}
Expand All @@ -392,21 +392,32 @@ protected void onJoinComplete(int generation,
// were not explicitly requested, so we update the joined subscription here.
maybeUpdateJoinedSubscription(assignedPartitions);

// give the assignor a chance to update internal state based on the received assignment
// Give the assignor a chance to update internal state based on the received assignment
groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId);
assignor.onAssignment(assignment, groupMetadata);

// reschedule the auto commit starting from now
// Catch any exception here to make sure we could complete the user callback.
try {
assignor.onAssignment(assignment, groupMetadata);
} catch (Exception e) {
firstException.compareAndSet(null, e);
}

// Reschedule the auto commit starting from now
if (autoCommitEnabled)
this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);

subscriptions.assignFromSubscribed(assignedPartitions);

// add partitions that were not previously owned but are now assigned
// Add partitions that were not previously owned but are now assigned
firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions));

if (firstException.get() != null)
throw new KafkaException("User rebalance callback throws an error", firstException.get());
if (firstException.get() != null) {
if (firstException.get() instanceof KafkaException) {
throw (KafkaException) firstException.get();
} else {
throw new KafkaException("User rebalance callback throws an error", firstException.get());
}
}
}

void maybeUpdateSubscriptionMetadata() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@
import static java.util.Collections.singletonMap;
import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE;
import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.EAGER;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.test.TestUtils.toSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -134,7 +136,10 @@ public class ConsumerCoordinatorTest {

private final ConsumerPartitionAssignor.RebalanceProtocol protocol;
private final MockPartitionAssignor partitionAssignor;
private final ThrowOnAssignmentAssignor throwOnAssignmentAssignor;
private final ThrowOnAssignmentAssignor throwFatalErrorOnAssignmentAssignor;
private final List<ConsumerPartitionAssignor> assignors;
private final Map<String, MockPartitionAssignor> assignorMap;
private MockClient client;
private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
{
Expand All @@ -153,8 +158,18 @@ public class ConsumerCoordinatorTest {

public ConsumerCoordinatorTest(final ConsumerPartitionAssignor.RebalanceProtocol protocol) {
this.protocol = protocol;

this.partitionAssignor = new MockPartitionAssignor(Collections.singletonList(protocol));
this.assignors = Collections.singletonList(partitionAssignor);
this.throwOnAssignmentAssignor = new ThrowOnAssignmentAssignor(Collections.singletonList(protocol),
new KafkaException("Kaboom for assignment!"),
"throw-on-assignment-assignor");
this.throwFatalErrorOnAssignmentAssignor = new ThrowOnAssignmentAssignor(Collections.singletonList(protocol),
new IllegalStateException("Illegal state for assignment!"),
"throw-fatal-error-on-assignment-assignor");
this.assignors = Arrays.asList(partitionAssignor, throwOnAssignmentAssignor, throwFatalErrorOnAssignmentAssignor);
this.assignorMap = mkMap(mkEntry(partitionAssignor.name(), partitionAssignor),
mkEntry(throwOnAssignmentAssignor.name(), throwOnAssignmentAssignor),
mkEntry(throwFatalErrorOnAssignmentAssignor.name(), throwFatalErrorOnAssignmentAssignor));
}

@Parameterized.Parameters(name = "rebalance protocol = {0}")
Expand Down Expand Up @@ -338,14 +353,11 @@ public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() {

for (int i = 0; i < numRequests; i++) {
Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(tp, new OffsetAndMetadata(i));
coordinator.commitOffsetsAsync(offsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
responses.incrementAndGet();
Throwable cause = exception.getCause();
assertTrue("Unexpected exception cause type: " + (cause == null ? null : cause.getClass()),
cause instanceof DisconnectException);
}
coordinator.commitOffsetsAsync(offsets, (offsets1, exception) -> {
responses.incrementAndGet();
Throwable cause = exception.getCause();
assertTrue("Unexpected exception cause type: " + (cause == null ? null : cause.getClass()),
cause instanceof DisconnectException);
});
}

Expand Down Expand Up @@ -465,6 +477,134 @@ public void testUnsubscribeWithValidGeneration() {
assertEquals(0, rebalanceListener.revokedCount);
}

@Test
public void testRevokeExceptionThrownFirstNonBlockingSubCallbacks() {
MockRebalanceListener throwOnRevokeListener = new MockRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
super.onPartitionsRevoked(partitions);
throw new KafkaException("Kaboom on revoke!");
}
};

if (protocol == COOPERATIVE) {
verifyOnCallbackExceptions(throwOnRevokeListener,
throwOnAssignmentAssignor.name(), "Kaboom on revoke!", null);
} else {
// Eager protocol doesn't revoke partitions.
verifyOnCallbackExceptions(throwOnRevokeListener,
throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null);
}
}

@Test
public void testOnAssignmentExceptionThrownFirstNonBlockingSubCallbacks() {
MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
super.onPartitionsAssigned(partitions);
throw new KafkaException("Kaboom on partition assign!");
}
};

verifyOnCallbackExceptions(throwOnAssignListener,
throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null);
}

@Test
public void testOnPartitionsAssignExceptionThrownWhenNoPreviousThrownCallbacks() {
MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
super.onPartitionsAssigned(partitions);
throw new KafkaException("Kaboom on partition assign!");
}
};

verifyOnCallbackExceptions(throwOnAssignListener,
partitionAssignor.name(), "Kaboom on partition assign!", null);
}

@Test
public void testOnRevokeExceptionShouldBeRenderedIfNotKafkaException() {
MockRebalanceListener throwOnRevokeListener = new MockRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
super.onPartitionsRevoked(partitions);
throw new IllegalStateException("Illegal state on partition revoke!");
}
};

if (protocol == COOPERATIVE) {
verifyOnCallbackExceptions(throwOnRevokeListener,
throwOnAssignmentAssignor.name(),
"User rebalance callback throws an error", "Illegal state on partition revoke!");
} else {
// Eager protocol doesn't revoke partitions.
verifyOnCallbackExceptions(throwOnRevokeListener,
throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null);
}
}

@Test
public void testOnAssignmentExceptionShouldBeRenderedIfNotKafkaException() {
MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
super.onPartitionsAssigned(partitions);
throw new KafkaException("Kaboom on partition assign!");
}
};
verifyOnCallbackExceptions(throwOnAssignListener,
throwFatalErrorOnAssignmentAssignor.name(),
"User rebalance callback throws an error", "Illegal state for assignment!");
}

@Test
public void testOnPartitionsAssignExceptionShouldBeRenderedIfNotKafkaException() {
MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
super.onPartitionsAssigned(partitions);
throw new IllegalStateException("Illegal state on partition assign!");
}
};

verifyOnCallbackExceptions(throwOnAssignListener,
partitionAssignor.name(), "User rebalance callback throws an error",
"Illegal state on partition assign!");
}

private void verifyOnCallbackExceptions(final MockRebalanceListener rebalanceListener,
final String assignorName,
final String exceptionMessage,
final String causeMessage) {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

subscriptions.subscribe(singleton(topic1), rebalanceListener);
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(
new ConsumerPartitionAssignor.Assignment(Collections.singletonList(t1p), ByteBuffer.wrap(new byte[0])));
subscriptions.assignFromSubscribed(singleton(t2p));

if (exceptionMessage != null) {
final Exception exception = assertThrows(KafkaException.class,
() -> coordinator.onJoinComplete(1, "memberId", assignorName, buffer));
assertEquals(exceptionMessage, exception.getMessage());
if (causeMessage != null) {
assertEquals(causeMessage, exception.getCause().getMessage());
}
}

// Eager doesn't trigger on partition revoke.
assertEquals(protocol == COOPERATIVE ? 1 : 0, rebalanceListener.revokedCount);
assertEquals(0, rebalanceListener.lostCount);
assertEquals(1, rebalanceListener.assignedCount);
assertTrue("Unknown assignor name: " + assignorName,
assignorMap.containsKey(assignorName));
assertEquals(1, assignorMap.get(assignorName).numAssignment());
}

@Test
public void testUnsubscribeWithInvalidGeneration() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.List;
Expand All @@ -25,10 +26,13 @@ public class MockPartitionAssignor extends AbstractPartitionAssignor {

private final List<RebalanceProtocol> supportedProtocols;

private int numAssignment;

private Map<String, List<TopicPartition>> result = null;

MockPartitionAssignor(final List<RebalanceProtocol> supportedProtocols) {
this.supportedProtocols = supportedProtocols;
numAssignment = 0;
}

@Override
Expand Down Expand Up @@ -57,4 +61,12 @@ public void prepare(Map<String, List<TopicPartition>> result) {
this.result = result;
}

@Override
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
numAssignment += 1;
}

int numAssignment() {
return numAssignment;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;

import java.util.List;

/**
* A mock assignor which throws for {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor#onAssignment}.
*/
public class ThrowOnAssignmentAssignor extends MockPartitionAssignor {

private final RuntimeException bookeepedException;
private final String name;

ThrowOnAssignmentAssignor(final List<RebalanceProtocol> supportedProtocols,
final RuntimeException bookeepedException,
final String name) {
super(supportedProtocols);
this.bookeepedException = bookeepedException;
this.name = name;
}

@Override
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
super.onAssignment(assignment, metadata);
throw bookeepedException;
}

@Override
public String name() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ static void wipeStateStores(final Logger log, final ProcessorStateManager stateM
Utils.delete(stateMgr.baseDir());
} catch (final IOException fatalException) {
// since it is only called under dirty close, we always swallow the exception
log.warn("Failed to wiping state stores for task {}", stateMgr.taskId());
log.warn("Failed to wiping state stores for task {} due to {}", stateMgr.taskId(), fatalException);
}
}

Expand Down
Loading

0 comments on commit ede0730

Please sign in to comment.