From f081f3bd7619194e567daaa53b52216b75925adf Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sat, 7 Mar 2020 08:22:55 -0800 Subject: [PATCH] MINOR: Reset `streamTime` on clear (#8250) Reviewers: Guozhang Wang --- .../kafka/streams/processor/internals/PartitionGroup.java | 7 +++---- .../kafka/streams/processor/internals/RecordQueue.java | 5 ++--- .../streams/processor/internals/PartitionGroupTest.java | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 32610b583c74c..4514d8df23862 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -220,15 +220,14 @@ boolean allPartitionsBuffered() { void close() { clear(); - - streamTime = RecordQueue.UNKNOWN; } void clear() { - nonEmptyQueuesByTime.clear(); - totalBuffered = 0; for (final RecordQueue queue : partitionQueues.values()) { queue.clear(); } + nonEmptyQueuesByTime.clear(); + totalBuffered = 0; + streamTime = RecordQueue.UNKNOWN; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 0beade90ec457..dff2f44822225 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -47,7 +47,7 @@ public class RecordQueue { private final ArrayDeque> fifoQueue; private StampedRecord headRecord = null; - private long partitionTime; + private long partitionTime = UNKNOWN; private final Sensor droppedRecordsSensor; @@ -74,7 +74,6 @@ public class RecordQueue { droppedRecordsSensor ); this.log = logContext.logger(RecordQueue.class); - setPartitionTime(UNKNOWN); } void setPartitionTime(final long partitionTime) { @@ -167,7 +166,7 @@ public Long headRecordOffset() { public void clear() { fifoQueue.clear(); headRecord = null; - setPartitionTime(UNKNOWN); + partitionTime = UNKNOWN; } private void updateHead() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index d38c52dcb8d91..40e64515fa1b9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -392,7 +392,7 @@ public void shouldEmptyPartitionsOnClear() { group.clear(); assertThat(group.numBuffered(), equalTo(0)); - assertThat(group.streamTime(), equalTo(3L)); + assertThat(group.streamTime(), equalTo(RecordQueue.UNKNOWN)); assertThat(group.nextRecord(new PartitionGroup.RecordInfo()), equalTo(null)); assertThat(group.partitionTimestamp(partition1), equalTo(RecordQueue.UNKNOWN));