diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 7cd21f84ae318..b5e4ae404e791 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -451,15 +451,24 @@ public void restore() { } private void maybeUpdateLimitOffsetsForStandbyChangelogs() { - // for standby changelogs, if the interval has elapsed and there are buffered records not applicable, - // we can try to update the limit offset next time. - if (updateOffsetIntervalMs < time.milliseconds() - lastUpdateOffsetTime) { - final Set standbyChangelogs = changelogs.values().stream() - .filter(metadata -> metadata.stateManager.taskType() == Task.TaskType.STANDBY) - .collect(Collectors.toSet()); - for (final ChangelogMetadata metadata : standbyChangelogs) { - if (!metadata.bufferedRecords().isEmpty()) { - updateLimitOffsets(); + // we only consider updating the limit offset for standbys if we are not restoring active tasks + if (state == ChangelogReaderState.STANDBY_UPDATING && + updateOffsetIntervalMs < time.milliseconds() - lastUpdateOffsetTime) { + + // when the interval has elapsed we should try to update the limit offset for standbys reading from + // a source changelog with the new committed offset, unless there are no buffered records since + // we only need the limit when processing new records + // for other changelog partitions we do not need to update limit offset at all since we never need to + // check when it completes based on limit offset anyways: the end offset would keep increasing and the + // standby never need to stop + final Set changelogsWithLimitOffsets = changelogs.entrySet().stream() + .filter(entry -> entry.getValue().stateManager.taskType() == Task.TaskType.STANDBY && + entry.getValue().stateManager.changelogAsSource(entry.getKey())) + .map(Map.Entry::getKey).collect(Collectors.toSet()); + + for (final TopicPartition partition : changelogsWithLimitOffsets) { + if (!changelogs.get(partition).bufferedRecords().isEmpty()) { + updateLimitOffsetsForStandbyChangelogs(committedOffsetForChangelogs(changelogsWithLimitOffsets)); break; } } @@ -575,23 +584,6 @@ private Map endOffsetForChangelogs(final Set changelogsWithLimitOffsets = changelogs.entrySet().stream() - .filter(entry -> entry.getValue().stateManager.taskType() == Task.TaskType.STANDBY && - entry.getValue().stateManager.changelogAsSource(entry.getKey())) - .map(Map.Entry::getKey).collect(Collectors.toSet()); - - updateLimitOffsetsForStandbyChangelogs(committedOffsetForChangelogs(changelogsWithLimitOffsets)); - } - private void updateLimitOffsetsForStandbyChangelogs(final Map committedOffsets) { for (final ChangelogMetadata metadata : changelogs.values()) { final TopicPartition partition = metadata.storeMetadata.changelogPartition(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index e9784eaea10b0..e033535cd4d51 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -627,6 +627,58 @@ public void shouldOnlyRestoreStandbyChangelogInUpdateStandbyState() { assertTrue(changelogReader.changelogMetadata(tp).bufferedRecords().isEmpty()); } + @Test + public void shouldNotUpdateLimitForNonSourceStandbyChangelog() { + EasyMock.expect(standbyStateManager.changelogAsSource(tp)).andReturn(false).anyTimes(); + EasyMock.replay(standbyStateManager, storeMetadata, store); + + final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + @Override + public Map committed(final Set partitions) { + throw new AssertionError("Should not try to fetch committed offsets"); + } + }; + + final Properties properties = new Properties(); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); + final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig("test-reader", properties)); + final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, callback); + changelogReader.setMainConsumer(consumer); + changelogReader.transitToUpdateStandby(); + + consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L)); + changelogReader.register(tp, standbyStateManager); + assertNull(changelogReader.changelogMetadata(tp).endOffset()); + assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored()); + + // if there's no records fetchable, nothings gets restored + changelogReader.restore(); + assertNull(callback.restoreTopicPartition); + assertNull(callback.storeNameCalledStates.get(RESTORE_START)); + assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state()); + assertNull(changelogReader.changelogMetadata(tp).endOffset()); + assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored()); + + consumer.addRecord(new ConsumerRecord<>(topicName, 0, 5L, "key".getBytes(), "value".getBytes())); + consumer.addRecord(new ConsumerRecord<>(topicName, 0, 6L, "key".getBytes(), "value".getBytes())); + consumer.addRecord(new ConsumerRecord<>(topicName, 0, 7L, "key".getBytes(), "value".getBytes())); + // null key should be ignored + consumer.addRecord(new ConsumerRecord<>(topicName, 0, 8L, null, "value".getBytes())); + consumer.addRecord(new ConsumerRecord<>(topicName, 0, 9L, "key".getBytes(), "value".getBytes())); + consumer.addRecord(new ConsumerRecord<>(topicName, 0, 10L, "key".getBytes(), "value".getBytes())); + consumer.addRecord(new ConsumerRecord<>(topicName, 0, 11L, "key".getBytes(), "value".getBytes())); + + // we should be able to restore to the log end offsets since there's no limit + changelogReader.restore(); + assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state()); + assertNull(changelogReader.changelogMetadata(tp).endOffset()); + assertEquals(6L, changelogReader.changelogMetadata(tp).totalRestored()); + assertEquals(0, changelogReader.changelogMetadata(tp).bufferedRecords().size()); + assertEquals(0, changelogReader.changelogMetadata(tp).bufferedLimitIndex()); + assertNull(callback.storeNameCalledStates.get(RESTORE_END)); + assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH)); + } + @Test public void shouldRestoreToLimitInStandbyState() { EasyMock.expect(standbyStateManager.changelogAsSource(tp)).andReturn(true).anyTimes(); @@ -650,6 +702,9 @@ public Map committed(final Set committed(final Set