Skip to content

Commit

Permalink
KAFKA-9686 MockConsumer#endOffsets should be idempotent
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Mar 10, 2020
1 parent dcfb641 commit e74b24c
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private final Map<String, List<PartitionInfo>> partitions;
private final SubscriptionState subscriptions;
private final Map<TopicPartition, Long> beginningOffsets;
private final Map<TopicPartition, List<Long>> endOffsets;
private final Map<TopicPartition, Long> endOffsets;
private final Map<TopicPartition, OffsetAndMetadata> committed;
private final Queue<Runnable> pollTasks;
private final Set<TopicPartition> paused;
Expand Down Expand Up @@ -359,14 +359,7 @@ public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
}

public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOffsets) {
for (final Map.Entry<TopicPartition, Long> entry : newOffsets.entrySet()) {
List<Long> offsets = endOffsets.get(entry.getKey());
if (offsets == null) {
offsets = new ArrayList<>();
}
offsets.add(entry.getValue());
endOffsets.put(entry.getKey(), offsets);
}
endOffsets.putAll(newOffsets);
}

@Override
Expand Down Expand Up @@ -439,7 +432,7 @@ public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartiti
}
Map<TopicPartition, Long> result = new HashMap<>();
for (TopicPartition tp : partitions) {
Long endOffset = getEndOffset(endOffsets.get(tp));
Long endOffset = endOffsets.get(tp);
if (endOffset == null)
throw new IllegalStateException("The partition " + tp + " does not have an end offset.");
result.put(tp, endOffset);
Expand Down Expand Up @@ -510,7 +503,7 @@ private void resetOffsetPosition(TopicPartition tp) {
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have beginning offset specified, but tried to seek to beginning");
} else if (strategy == OffsetResetStrategy.LATEST) {
offset = getEndOffset(endOffsets.get(tp));
offset = endOffsets.get(tp);
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end");
} else {
Expand All @@ -519,13 +512,6 @@ private void resetOffsetPosition(TopicPartition tp) {
seek(tp, offset);
}

private Long getEndOffset(List<Long> offsets) {
if (offsets == null || offsets.isEmpty()) {
return null;
}
return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0);
}

@Override
public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
return partitionsFor(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.junit.Assert;
import org.junit.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Collection;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -119,4 +120,19 @@ public void shouldNotClearRecordsForPausedPartitions() {
assertThat(recordsSecondPoll.count(), is(1));
}

@Test
public void endOffsetsShouldBeIdempotent() {
TopicPartition partition = new TopicPartition("test", 0);
consumer.updateEndOffsets(Collections.singletonMap(partition, 10L));
// consumer.endOffsets should NOT change the value of end offsets
Assert.assertEquals(10L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition));
Assert.assertEquals(10L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition));
Assert.assertEquals(10L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition));
consumer.updateEndOffsets(Collections.singletonMap(partition, 11L));
// consumer.endOffsets should NOT change the value of end offsets
Assert.assertEquals(11L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition));
Assert.assertEquals(11L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition));
Assert.assertEquals(11L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition));
}

}

0 comments on commit e74b24c

Please sign in to comment.