diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 32610b583c74c..4514d8df23862 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -220,15 +220,14 @@ boolean allPartitionsBuffered() { void close() { clear(); - - streamTime = RecordQueue.UNKNOWN; } void clear() { - nonEmptyQueuesByTime.clear(); - totalBuffered = 0; for (final RecordQueue queue : partitionQueues.values()) { queue.clear(); } + nonEmptyQueuesByTime.clear(); + totalBuffered = 0; + streamTime = RecordQueue.UNKNOWN; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 0beade90ec457..dff2f44822225 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -47,7 +47,7 @@ public class RecordQueue { private final ArrayDeque> fifoQueue; private StampedRecord headRecord = null; - private long partitionTime; + private long partitionTime = UNKNOWN; private final Sensor droppedRecordsSensor; @@ -74,7 +74,6 @@ public class RecordQueue { droppedRecordsSensor ); this.log = logContext.logger(RecordQueue.class); - setPartitionTime(UNKNOWN); } void setPartitionTime(final long partitionTime) { @@ -167,7 +166,7 @@ public Long headRecordOffset() { public void clear() { fifoQueue.clear(); headRecord = null; - setPartitionTime(UNKNOWN); + partitionTime = UNKNOWN; } private void updateHead() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index d38c52dcb8d91..40e64515fa1b9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -392,7 +392,7 @@ public void shouldEmptyPartitionsOnClear() { group.clear(); assertThat(group.numBuffered(), equalTo(0)); - assertThat(group.streamTime(), equalTo(3L)); + assertThat(group.streamTime(), equalTo(RecordQueue.UNKNOWN)); assertThat(group.nextRecord(new PartitionGroup.RecordInfo()), equalTo(null)); assertThat(group.partitionTimestamp(partition1), equalTo(RecordQueue.UNKNOWN));