diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 33702c3144e37..88e052ffd9327 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -56,7 +56,7 @@ public class MockConsumer implements Consumer { private final Map> partitions; private final SubscriptionState subscriptions; private final Map beginningOffsets; - private final Map> endOffsets; + private final Map endOffsets; private final Map committed; private final Queue pollTasks; private final Set paused; @@ -359,14 +359,7 @@ public synchronized void seekToEnd(Collection partitions) { } public synchronized void updateEndOffsets(final Map newOffsets) { - for (final Map.Entry entry : newOffsets.entrySet()) { - List offsets = endOffsets.get(entry.getKey()); - if (offsets == null) { - offsets = new ArrayList<>(); - } - offsets.add(entry.getValue()); - endOffsets.put(entry.getKey(), offsets); - } + endOffsets.putAll(newOffsets); } @Override @@ -439,7 +432,7 @@ public synchronized Map endOffsets(Collection result = new HashMap<>(); for (TopicPartition tp : partitions) { - Long endOffset = getEndOffset(endOffsets.get(tp)); + Long endOffset = endOffsets.get(tp); if (endOffset == null) throw new IllegalStateException("The partition " + tp + " does not have an end offset."); result.put(tp, endOffset); @@ -510,7 +503,7 @@ private void resetOffsetPosition(TopicPartition tp) { if (offset == null) throw new IllegalStateException("MockConsumer didn't have beginning offset specified, but tried to seek to beginning"); } else if (strategy == OffsetResetStrategy.LATEST) { - offset = getEndOffset(endOffsets.get(tp)); + offset = endOffsets.get(tp); if (offset == null) throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end"); } else { @@ -519,13 +512,6 @@ private void resetOffsetPosition(TopicPartition tp) { seek(tp, offset); } - private Long getEndOffset(List offsets) { - if (offsets == null || offsets.isEmpty()) { - return null; - } - return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0); - } - @Override public List partitionsFor(String topic, Duration timeout) { return partitionsFor(topic); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index a7a5f665c37b1..00295432c3a34 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -18,14 +18,15 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.TimestampType; +import org.junit.Assert; import org.junit.Test; import java.time.Duration; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.Collection; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -119,4 +120,19 @@ public void shouldNotClearRecordsForPausedPartitions() { assertThat(recordsSecondPoll.count(), is(1)); } + @Test + public void endOffsetsShouldBeIdempotent() { + TopicPartition partition = new TopicPartition("test", 0); + consumer.updateEndOffsets(Collections.singletonMap(partition, 10L)); + // consumer.endOffsets should NOT change the value of end offsets + Assert.assertEquals(10L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition)); + Assert.assertEquals(10L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition)); + Assert.assertEquals(10L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition)); + consumer.updateEndOffsets(Collections.singletonMap(partition, 11L)); + // consumer.endOffsets should NOT change the value of end offsets + Assert.assertEquals(11L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition)); + Assert.assertEquals(11L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition)); + Assert.assertEquals(11L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition)); + } + }