Skip to content

Commit

Permalink
unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
abbccdda committed Mar 6, 2020
1 parent 12f2349 commit 84ccecb
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private KafkaException offsetsException;
private AtomicBoolean wakeup;
private boolean closed;
private boolean shouldRebalance;

public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy);
Expand All @@ -79,6 +80,7 @@ public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
this.pollException = null;
this.wakeup = new AtomicBoolean(false);
this.committed = new HashMap<>();
this.shouldRebalance = false;
}

@Override
Expand Down Expand Up @@ -356,21 +358,10 @@ public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
subscriptions.requestOffsetReset(partitions, OffsetResetStrategy.LATEST);
}

// needed for cases where you make a second call to endOffsets
public synchronized void addEndOffsets(final Map<TopicPartition, Long> newOffsets) {
innerUpdateEndOffsets(newOffsets, false);
}

public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOffsets) {
innerUpdateEndOffsets(newOffsets, true);
}

private void innerUpdateEndOffsets(final Map<TopicPartition, Long> newOffsets,
final boolean replace) {

for (final Map.Entry<TopicPartition, Long> entry : newOffsets.entrySet()) {
List<Long> offsets = endOffsets.get(entry.getKey());
if (replace || offsets == null) {
if (offsets == null) {
offsets = new ArrayList<>();
}
offsets.add(entry.getValue());
Expand Down Expand Up @@ -568,6 +559,15 @@ public ConsumerGroupMetadata groupMetadata() {

@Override
public void enforceRebalance() {
shouldRebalance = true;
}

public boolean shouldRebalance() {
return shouldRebalance;
}

public void resetShouldRebalance() {
shouldRebalance = false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
Expand Down Expand Up @@ -807,6 +808,67 @@ public void shouldShutdownTaskManagerOnClose() {
EasyMock.verify(taskManager);
}

@Test
public void shouldNotReturnDataAfterTaskMigrated() {
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);

internalTopologyBuilder = EasyMock.createNiceMock(InternalTopologyBuilder.class);

EasyMock.expect(internalTopologyBuilder.sourceTopicCollection()).andReturn(Collections.singletonList(topic1)).times(2);

final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.LATEST);

consumer.subscribe(Collections.singletonList(topic1), new MockRebalanceListener());
consumer.rebalance(Collections.singletonList(t1p1));
consumer.updateEndOffsets(Collections.singletonMap(t1p1, 10L));
consumer.seekToEnd(Collections.singletonList(t1p1));

final ChangelogReader changelogReader = new MockChangelogReader() {

@Override
public void restore() {
consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new byte[0], new byte[0]));
consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new byte[1], new byte[0]));

throw new TaskMigratedException(
"Changelog restore found task migrated", new RuntimeException("restore task migrated"));
}
};

taskManager.handleLostAll();

EasyMock.replay(taskManager, internalTopologyBuilder);

final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);
final StreamThread thread = new StreamThread(
mockTime,
config,
null,
null,
null,
consumer,
consumer,
changelogReader,
null,
taskManager,
streamsMetrics,
internalTopologyBuilder,
CLIENT_ID,
new LogContext(""),
new AtomicInteger()
).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));

final IllegalStateException thrown = assertThrows(
IllegalStateException.class, thread::run);

EasyMock.verify(taskManager);

// The Mock consumer shall throw as the assignment has been wiped out, but records are assigned.
assertEquals("No current assignment for partition topic1-1", thrown.getMessage());
assertFalse(consumer.shouldRebalance());
}

@Test
public void shouldShutdownTaskManagerOnCloseWithoutStart() {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
Expand Down

0 comments on commit 84ccecb

Please sign in to comment.