Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9620: Do not throw in the middle of consumer user callbacks #8187

Merged
merged 6 commits into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ protected void onJoinComplete(int generation,
ByteBuffer assignmentBuffer) {
log.debug("Executing onJoinComplete with generation {} and memberId {}", generation, memberId);

// only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
// Only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
if (!isLeader)
assignmentSnapshot = null;

Expand Down Expand Up @@ -377,12 +377,12 @@ protected void onJoinComplete(int generation,
);

if (!revokedPartitions.isEmpty()) {
// revoke partitions that were previously owned but no longer assigned;
// Revoke partitions that were previously owned but no longer assigned;
// note that we should only change the assignment (or update the assignor's state)
// AFTER we've triggered the revoke callback
firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));

// if revoked any partitions, need to re-join the group afterwards
// If revoked any partitions, need to re-join the group afterwards
log.debug("Need to revoke partitions {} and re-join the group", revokedPartitions);
requestRejoin();
}
Expand All @@ -392,21 +392,32 @@ protected void onJoinComplete(int generation,
// were not explicitly requested, so we update the joined subscription here.
maybeUpdateJoinedSubscription(assignedPartitions);

// give the assignor a chance to update internal state based on the received assignment
// Give the assignor a chance to update internal state based on the received assignment
groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId);
assignor.onAssignment(assignment, groupMetadata);

// reschedule the auto commit starting from now
// Catch any exception here to make sure we could complete the user callback.
try {
assignor.onAssignment(assignment, groupMetadata);
} catch (Exception e) {
firstException.compareAndSet(null, e);
}

// Reschedule the auto commit starting from now
if (autoCommitEnabled)
this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);

subscriptions.assignFromSubscribed(assignedPartitions);

// add partitions that were not previously owned but are now assigned
// Add partitions that were not previously owned but are now assigned
firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions));

if (firstException.get() != null)
throw new KafkaException("User rebalance callback throws an error", firstException.get());
if (firstException.get() != null) {
if (firstException.get() instanceof KafkaException) {
throw (KafkaException) firstException.get();
} else {
throw new KafkaException("User rebalance callback throws an error", firstException.get());
}
}
}

void maybeUpdateSubscriptionMetadata() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@
import static java.util.Collections.singletonMap;
import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE;
import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.EAGER;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.test.TestUtils.toSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -134,7 +136,10 @@ public class ConsumerCoordinatorTest {

private final ConsumerPartitionAssignor.RebalanceProtocol protocol;
private final MockPartitionAssignor partitionAssignor;
private final ThrowOnAssignmentAssignor throwOnAssignmentAssignor;
private final ThrowOnAssignmentAssignor throwFatalErrorOnAssignmentAssignor;
private final List<ConsumerPartitionAssignor> assignors;
private final Map<String, MockPartitionAssignor> assignorMap;
private MockClient client;
private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
{
Expand All @@ -153,8 +158,18 @@ public class ConsumerCoordinatorTest {

public ConsumerCoordinatorTest(final ConsumerPartitionAssignor.RebalanceProtocol protocol) {
this.protocol = protocol;

this.partitionAssignor = new MockPartitionAssignor(Collections.singletonList(protocol));
this.assignors = Collections.singletonList(partitionAssignor);
this.throwOnAssignmentAssignor = new ThrowOnAssignmentAssignor(Collections.singletonList(protocol),
new KafkaException("Kaboom for assignment!"),
"throw-on-assignment-assignor");
this.throwFatalErrorOnAssignmentAssignor = new ThrowOnAssignmentAssignor(Collections.singletonList(protocol),
new IllegalStateException("Illegal state for assignment!"),
"throw-fatal-error-on-assignment-assignor");
this.assignors = Arrays.asList(partitionAssignor, throwOnAssignmentAssignor, throwFatalErrorOnAssignmentAssignor);
this.assignorMap = mkMap(mkEntry(partitionAssignor.name(), partitionAssignor),
mkEntry(throwOnAssignmentAssignor.name(), throwOnAssignmentAssignor),
mkEntry(throwFatalErrorOnAssignmentAssignor.name(), throwFatalErrorOnAssignmentAssignor));
}

@Parameterized.Parameters(name = "rebalance protocol = {0}")
Expand Down Expand Up @@ -338,14 +353,11 @@ public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() {

for (int i = 0; i < numRequests; i++) {
Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(tp, new OffsetAndMetadata(i));
coordinator.commitOffsetsAsync(offsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
responses.incrementAndGet();
Throwable cause = exception.getCause();
assertTrue("Unexpected exception cause type: " + (cause == null ? null : cause.getClass()),
cause instanceof DisconnectException);
}
coordinator.commitOffsetsAsync(offsets, (offsets1, exception) -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a side cleanup

responses.incrementAndGet();
Throwable cause = exception.getCause();
assertTrue("Unexpected exception cause type: " + (cause == null ? null : cause.getClass()),
cause instanceof DisconnectException);
});
}

Expand Down Expand Up @@ -465,6 +477,151 @@ public void testUnsubscribeWithValidGeneration() {
assertEquals(0, rebalanceListener.revokedCount);
}

@Test
public void testRevokeExceptionThrownFirstNonBlockingSubCallbacks() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

MockRebalanceListener throwOnRevokeListener = new MockRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
super.onPartitionsRevoked(partitions);
throw new KafkaException("Kaboom on revoke!");
}
};

if (protocol == COOPERATIVE) {
verifyOnCallbackExceptions(throwOnRevokeListener,
throwOnAssignmentAssignor.name(), "Kaboom on revoke!", null);
} else {
// Eager protocol doesn't revoke partitions.
verifyOnCallbackExceptions(throwOnRevokeListener,
throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null);
}
}

@Test
public void testOnAssignmentExceptionThrownFirstNonBlockingSubCallbacks() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
super.onPartitionsAssigned(partitions);
throw new KafkaException("Kaboom on partition assign!");
}
};

verifyOnCallbackExceptions(throwOnAssignListener,
throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null);
}

@Test
public void testOnPartitionsAssignExceptionThrownWhenNoPreviousThrownCallbacks() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
super.onPartitionsAssigned(partitions);
throw new KafkaException("Kaboom on partition assign!");
}
};

verifyOnCallbackExceptions(throwOnAssignListener,
partitionAssignor.name(), "Kaboom on partition assign!", null);
}

@Test
public void testOnRevokeExceptionShouldBeRenderedIfNotKafkaException() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

MockRebalanceListener throwOnRevokeListener = new MockRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
super.onPartitionsRevoked(partitions);
throw new IllegalStateException("Illegal state on partition revoke!");
}
};

if (protocol == COOPERATIVE) {
verifyOnCallbackExceptions(throwOnRevokeListener,
throwOnAssignmentAssignor.name(),
"User rebalance callback throws an error", "Illegal state on partition revoke!");
} else {
// Eager protocol doesn't revoke partitions.
verifyOnCallbackExceptions(throwOnRevokeListener,
throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null);
}
}

@Test
public void testOnAssignmentExceptionShouldBeRenderedIfNotKafkaException() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
super.onPartitionsAssigned(partitions);
throw new KafkaException("Kaboom on partition assign!");
}
};
verifyOnCallbackExceptions(throwOnAssignListener,
throwFatalErrorOnAssignmentAssignor.name(),
"User rebalance callback throws an error", "Illegal state for assignment!");
}

@Test
public void testOnPartitionsAssignExceptionShouldBeRenderedIfNotKafkaException() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
super.onPartitionsAssigned(partitions);
throw new IllegalStateException("Illegal state on partition assign!");
}
};

verifyOnCallbackExceptions(throwOnAssignListener,
partitionAssignor.name(), "User rebalance callback throws an error",
"Illegal state on partition assign!");
}

private void verifyOnCallbackExceptions(final MockRebalanceListener rebalanceListener,
final String assignorName,
final String exceptionMessage,
final String causeMessage) {
subscriptions.subscribe(singleton(topic1), rebalanceListener);
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(
new ConsumerPartitionAssignor.Assignment(Collections.singletonList(t1p), ByteBuffer.wrap(new byte[0])));
subscriptions.assignFromSubscribed(singleton(t2p));

if (exceptionMessage != null) {
final Exception exception = assertThrows(KafkaException.class,
() -> coordinator.onJoinComplete(1, "memberId", assignorName, buffer));
assertEquals(exceptionMessage, exception.getMessage());
if (causeMessage != null) {
assertEquals(causeMessage, exception.getCause().getMessage());
}
}

// Eager doesn't trigger on partition revoke.
assertEquals(protocol == COOPERATIVE ? 1 : 0, rebalanceListener.revokedCount);
assertEquals(0, rebalanceListener.lostCount);
assertEquals(1, rebalanceListener.assignedCount);
if (assignorMap.containsKey(assignorName)) {
assertEquals(1, assignorMap.get(assignorName).numAssignment());
} else {
throw new IllegalArgumentException("Unknown assignor name: " + assignorName);
}
}

@Test
public void testUnsubscribeWithInvalidGeneration() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.List;
Expand All @@ -25,10 +26,13 @@ public class MockPartitionAssignor extends AbstractPartitionAssignor {

private final List<RebalanceProtocol> supportedProtocols;

private int numAssignment;

private Map<String, List<TopicPartition>> result = null;

MockPartitionAssignor(final List<RebalanceProtocol> supportedProtocols) {
this.supportedProtocols = supportedProtocols;
numAssignment = 0;
}

@Override
Expand Down Expand Up @@ -57,4 +61,12 @@ public void prepare(Map<String, List<TopicPartition>> result) {
this.result = result;
}

@Override
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
numAssignment += 1;
}

int numAssignment() {
return numAssignment;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;

import java.util.List;

/**
* A mock assignor which throws for {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor#onAssignment}.
*/
public class ThrowOnAssignmentAssignor extends MockPartitionAssignor {

private final RuntimeException bookeepedException;
private final String name;

ThrowOnAssignmentAssignor(final List<RebalanceProtocol> supportedProtocols,
final RuntimeException bookeepedException,
final String name) {
super(supportedProtocols);
this.bookeepedException = bookeepedException;
this.name = name;
}

@Override
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
super.onAssignment(assignment, metadata);
throw bookeepedException;
}

@Override
public String name() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ static void wipeStateStores(final Logger log, final ProcessorStateManager stateM
Utils.delete(stateMgr.baseDir());
} catch (final IOException fatalException) {
// since it is only called under dirty close, we always swallow the exception
log.warn("Failed to wiping state stores for task {}", stateMgr.taskId());
log.warn("Failed to wiping state stores for task {} due to {}", stateMgr.taskId(), fatalException);
Copy link
Contributor Author

@abbccdda abbccdda Feb 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a logging to log the IO exception.

}
}

Expand Down
Loading