Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9686 MockConsumer#endOffsets should be idempotent #8255

Merged
merged 1 commit into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private final Map<String, List<PartitionInfo>> partitions;
private final SubscriptionState subscriptions;
private final Map<TopicPartition, Long> beginningOffsets;
private final Map<TopicPartition, List<Long>> endOffsets;
private final Map<TopicPartition, Long> endOffsets;
private final Map<TopicPartition, OffsetAndMetadata> committed;
private final Queue<Runnable> pollTasks;
private final Set<TopicPartition> paused;
Expand Down Expand Up @@ -359,14 +359,7 @@ public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
}

public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOffsets) {
for (final Map.Entry<TopicPartition, Long> entry : newOffsets.entrySet()) {
List<Long> offsets = endOffsets.get(entry.getKey());
if (offsets == null) {
offsets = new ArrayList<>();
}
offsets.add(entry.getValue());
endOffsets.put(entry.getKey(), offsets);
}
endOffsets.putAll(newOffsets);
}

@Override
Expand Down Expand Up @@ -439,7 +432,7 @@ public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartiti
}
Map<TopicPartition, Long> result = new HashMap<>();
for (TopicPartition tp : partitions) {
Long endOffset = getEndOffset(endOffsets.get(tp));
Long endOffset = endOffsets.get(tp);
if (endOffset == null)
throw new IllegalStateException("The partition " + tp + " does not have an end offset.");
result.put(tp, endOffset);
Expand Down Expand Up @@ -510,7 +503,7 @@ private void resetOffsetPosition(TopicPartition tp) {
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have beginning offset specified, but tried to seek to beginning");
} else if (strategy == OffsetResetStrategy.LATEST) {
offset = getEndOffset(endOffsets.get(tp));
offset = endOffsets.get(tp);
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end");
} else {
Expand All @@ -519,13 +512,6 @@ private void resetOffsetPosition(TopicPartition tp) {
seek(tp, offset);
}

private Long getEndOffset(List<Long> offsets) {
if (offsets == null || offsets.isEmpty()) {
return null;
}
return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0);
}

@Override
public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
return partitionsFor(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.junit.Assert;
import org.junit.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Collection;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -119,4 +120,19 @@ public void shouldNotClearRecordsForPausedPartitions() {
assertThat(recordsSecondPoll.count(), is(1));
}

@Test
public void endOffsetsShouldBeIdempotent() {
TopicPartition partition = new TopicPartition("test", 0);
consumer.updateEndOffsets(Collections.singletonMap(partition, 10L));
// consumer.endOffsets should NOT change the value of end offsets
Assert.assertEquals(10L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition));
Assert.assertEquals(10L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition));
Assert.assertEquals(10L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition));
consumer.updateEndOffsets(Collections.singletonMap(partition, 11L));
// consumer.endOffsets should NOT change the value of end offsets
Assert.assertEquals(11L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition));
Assert.assertEquals(11L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition));
Assert.assertEquals(11L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition));
}

}