Skip to content

Commit

Permalink
unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
abbccdda committed Feb 28, 2020
1 parent 9e51267 commit 8821bfa
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@
import static java.util.Collections.singletonMap;
import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE;
import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.EAGER;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.test.TestUtils.toSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -134,7 +136,10 @@ public class ConsumerCoordinatorTest {

private final ConsumerPartitionAssignor.RebalanceProtocol protocol;
private final MockPartitionAssignor partitionAssignor;
private final ThrowOnAssignmentAssignor throwOnAssignmentAssignor;
private final ThrowOnAssignmentAssignor throwFatalErrorOnAssignmentAssignor;
private final List<ConsumerPartitionAssignor> assignors;
private final Map<String, MockPartitionAssignor> assignorMap;
private MockClient client;
private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
{
Expand All @@ -153,8 +158,18 @@ public class ConsumerCoordinatorTest {

public ConsumerCoordinatorTest(final ConsumerPartitionAssignor.RebalanceProtocol protocol) {
this.protocol = protocol;

this.partitionAssignor = new MockPartitionAssignor(Collections.singletonList(protocol));
this.assignors = Collections.singletonList(partitionAssignor);
this.throwOnAssignmentAssignor = new ThrowOnAssignmentAssignor(Collections.singletonList(protocol),
new KafkaException("Kaboom for assignment!"),
"throw-on-assignment-assignor");
this.throwFatalErrorOnAssignmentAssignor = new ThrowOnAssignmentAssignor(Collections.singletonList(protocol),
new IllegalStateException("Illegal state for assignment!"),
"throw-fatal-error-on-assignment-assignor");
this.assignors = Arrays.asList(partitionAssignor, throwOnAssignmentAssignor, throwFatalErrorOnAssignmentAssignor);
this.assignorMap = mkMap(mkEntry(partitionAssignor.name(), partitionAssignor),
mkEntry(throwOnAssignmentAssignor.name(), throwOnAssignmentAssignor),
mkEntry(throwFatalErrorOnAssignmentAssignor.name(), throwFatalErrorOnAssignmentAssignor));
}

@Parameterized.Parameters(name = "rebalance protocol = {0}")
Expand Down Expand Up @@ -338,14 +353,11 @@ public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() {

for (int i = 0; i < numRequests; i++) {
Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(tp, new OffsetAndMetadata(i));
coordinator.commitOffsetsAsync(offsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
responses.incrementAndGet();
Throwable cause = exception.getCause();
assertTrue("Unexpected exception cause type: " + (cause == null ? null : cause.getClass()),
cause instanceof DisconnectException);
}
coordinator.commitOffsetsAsync(offsets, (offsets1, exception) -> {
responses.incrementAndGet();
Throwable cause = exception.getCause();
assertTrue("Unexpected exception cause type: " + (cause == null ? null : cause.getClass()),
cause instanceof DisconnectException);
});
}

Expand Down Expand Up @@ -465,6 +477,151 @@ public void testUnsubscribeWithValidGeneration() {
assertEquals(0, rebalanceListener.revokedCount);
}

@Test
public void testRevokeExceptionThrownFirstNonBlockingSubCallbacks() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

MockRebalanceListener throwOnRevokeListener = new MockRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
super.onPartitionsRevoked(partitions);
throw new KafkaException("Kaboom on revoke!");
}
};

if (protocol == COOPERATIVE) {
verifyOnCallbackExceptions(throwOnRevokeListener,
throwOnAssignmentAssignor.name(), "Kaboom on revoke!", null);
} else {
// Eager protocol doesn't revoke partitions.
verifyOnCallbackExceptions(throwOnRevokeListener,
throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null);
}
}

@Test
public void testOnAssignmentExceptionThrownFirstNonBlockingSubCallbacks() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
super.onPartitionsAssigned(partitions);
throw new KafkaException("Kaboom on partition assign!");
}
};

verifyOnCallbackExceptions(throwOnAssignListener,
throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null);
}

@Test
public void testOnPartitionsAssignExceptionThrownWhenNoPreviousThrownCallbacks() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
super.onPartitionsAssigned(partitions);
throw new KafkaException("Kaboom on partition assign!");
}
};

verifyOnCallbackExceptions(throwOnAssignListener,
partitionAssignor.name(), "Kaboom on partition assign!", null);
}

@Test
public void testOnRevokeExceptionShouldBeRenderedIfNotKafkaException() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

MockRebalanceListener throwOnRevokeListener = new MockRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
super.onPartitionsRevoked(partitions);
throw new IllegalStateException("Illegal state on partition revoke!");
}
};

if (protocol == COOPERATIVE) {
verifyOnCallbackExceptions(throwOnRevokeListener,
throwOnAssignmentAssignor.name(),
"User rebalance callback throws an error", "Illegal state on partition revoke!");
} else {
// Eager protocol doesn't revoke partitions.
verifyOnCallbackExceptions(throwOnRevokeListener,
throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null);
}
}

@Test
public void testOnAssignmentExceptionShouldBeRenderedIfNotKafkaException() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
super.onPartitionsAssigned(partitions);
throw new KafkaException("Kaboom on partition assign!");
}
};
verifyOnCallbackExceptions(throwOnAssignListener,
throwFatalErrorOnAssignmentAssignor.name(),
"User rebalance callback throws an error", "Illegal state for assignment!");
}

@Test
public void testOnPartitionsAssignExceptionShouldBeRenderedIfNotKafkaException() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

MockRebalanceListener throwOnAssignListener = new MockRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
super.onPartitionsAssigned(partitions);
throw new IllegalStateException("Illegal state on partition assign!");
}
};

verifyOnCallbackExceptions(throwOnAssignListener,
partitionAssignor.name(), "User rebalance callback throws an error",
"Illegal state on partition assign!");
}

private void verifyOnCallbackExceptions(final MockRebalanceListener rebalanceListener,
final String assignorName,
final String exceptionMessage,
final String causeMessage) {
subscriptions.subscribe(singleton(topic1), rebalanceListener);
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(
new ConsumerPartitionAssignor.Assignment(Collections.singletonList(t1p), ByteBuffer.wrap(new byte[0])));
subscriptions.assignFromSubscribed(singleton(t2p));

if (exceptionMessage != null) {
final Exception exception = assertThrows(KafkaException.class,
() -> coordinator.onJoinComplete(1, "memberId", assignorName, buffer));
assertEquals(exceptionMessage, exception.getMessage());
if (causeMessage != null) {
assertEquals(causeMessage, exception.getCause().getMessage());
}
}

// Eager doesn't trigger on partition revoke.
assertEquals(protocol == COOPERATIVE ? 1 : 0, rebalanceListener.revokedCount);
assertEquals(0, rebalanceListener.lostCount);
assertEquals(1, rebalanceListener.assignedCount);
if (assignorMap.containsKey(assignorName)) {
assertEquals(1, assignorMap.get(assignorName).numAssignment());
} else {
throw new IllegalArgumentException("Unknown assignor name: " + assignorName);
}
}

@Test
public void testUnsubscribeWithInvalidGeneration() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.List;
Expand All @@ -25,10 +26,13 @@ public class MockPartitionAssignor extends AbstractPartitionAssignor {

private final List<RebalanceProtocol> supportedProtocols;

private int numAssignment;

private Map<String, List<TopicPartition>> result = null;

MockPartitionAssignor(final List<RebalanceProtocol> supportedProtocols) {
this.supportedProtocols = supportedProtocols;
numAssignment = 0;
}

@Override
Expand Down Expand Up @@ -57,4 +61,12 @@ public void prepare(Map<String, List<TopicPartition>> result) {
this.result = result;
}

@Override
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
numAssignment += 1;
}

int numAssignment() {
return numAssignment;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;

import java.util.List;

/**
* A mock assignor which throws for {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor#onAssignment}.
*/
public class ThrowOnAssignmentAssignor extends MockPartitionAssignor {

private final RuntimeException bookeepedException;
private final String name;

ThrowOnAssignmentAssignor(final List<RebalanceProtocol> supportedProtocols,
final RuntimeException bookeepedException,
final String name) {
super(supportedProtocols);
this.bookeepedException = bookeepedException;
this.name = name;
}

@Override
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
super.onAssignment(assignment, metadata);
throw bookeepedException;
}

@Override
public String name() {
return name;
}
}
Loading

0 comments on commit 8821bfa

Please sign in to comment.