diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 18341757a89f7..62e5f56adc56e 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -605,6 +605,7 @@ boolean isLeader() { // the task's run() method. private volatile Optional copiedOffsetOption = Optional.empty(); private volatile boolean isLogStartOffsetUpdatedOnBecomingLeader = false; + private volatile Optional logDirectory = Optional.empty(); public void convertToLeader(int leaderEpochVal) { if (leaderEpochVal < 0) { @@ -815,6 +816,13 @@ public void run() { } UnifiedLog log = unifiedLogOptional.get(); + // In the first run after completing altering logDir within broker, we should make sure the state is reset. (KAFKA-16711) + if (!log.parentDir().equals(logDirectory.orElse(null))) { + copiedOffsetOption = Optional.empty(); + isLogStartOffsetUpdatedOnBecomingLeader = false; + logDirectory = Optional.of(log.parentDir()); + } + if (isLeader()) { // Copy log segments to remote storage copyLogSegmentsToRemote(log); diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index fdee0ef72a8b5..5dd6fbe34cc46 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -203,7 +203,7 @@ public List read() { }; private final AtomicLong currentLogStartOffset = new AtomicLong(0L); - private final UnifiedLog mockLog = mock(UnifiedLog.class); + private UnifiedLog mockLog = mock(UnifiedLog.class); @BeforeEach void setUp() throws Exception { @@ -661,6 +661,7 @@ void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exc long nextSegmentStartOffset = 150L; int segmentCount = 3; when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + when(mockLog.parentDir()).thenReturn("dir1"); // leader epoch preparation checkpoint.write(totalEpochEntries); @@ -729,7 +730,7 @@ void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exc Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition); Partition mockFollowerPartition = mockPartition(followerTopicIdPartition); List list = listRemoteLogSegmentMetadata(leaderTopicIdPartition, segmentCount, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED); - // return 3 metadata and then return 0 to simulate all segments are deleted + // return the metadataList 3 times, then return empty list to simulate all segments are deleted when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)).thenReturn(list.iterator()).thenReturn(Collections.emptyIterator()); when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)).thenReturn(list.iterator()).thenReturn(list.iterator()); when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 1)).thenReturn(list.iterator()); @@ -772,12 +773,126 @@ void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exc safeLongYammerMetricValue("RemoteLogSizeBytes"))); } + @Test + void testRemoteLogTaskUpdateRemoteLogSegmentMetadataAfterLogDirChanged() throws Exception { + long oldSegmentStartOffset = 0L; + long nextSegmentStartOffset = 150L; + int segmentCount = 3; + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + when(mockLog.parentDir()).thenReturn("dir1"); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())) + .thenReturn(Optional.of(0L)) + .thenReturn(Optional.of(nextSegmentStartOffset - 1)); + + File tempFile = TestUtils.tempFile(); + File mockProducerSnapshotIndex = TestUtils.tempFile(); + File tempDir = TestUtils.tempDirectory(); + // create 2 log segments, with 0 and 150 as log start offset + LogSegment oldSegment = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); + when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + + FileRecords fileRecords = mock(FileRecords.class); + when(oldSegment.log()).thenReturn(fileRecords); + when(fileRecords.file()).thenReturn(tempFile); + when(fileRecords.sizeInBytes()).thenReturn(10); + when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + + when(mockLog.activeSegment()).thenReturn(activeSegment); + when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + + ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockLog.producerStateManager()).thenReturn(mockStateManager); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + when(mockLog.lastStableOffset()).thenReturn(250L); + when(mockLog.logEndOffset()).thenReturn(500L); + Map logProps = new HashMap<>(); + logProps.put("retention.bytes", 100L); + logProps.put("retention.ms", -1L); + LogConfig logConfig = new LogConfig(logProps); + when(mockLog.config()).thenReturn(logConfig); + + OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); + TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); + File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); + txnFile.createNewFile(); + TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); + when(oldSegment.timeIndex()).thenReturn(timeIdx); + when(oldSegment.offsetIndex()).thenReturn(idx); + when(oldSegment.txnIndex()).thenReturn(txnIndex); + + CompletableFuture dummyFuture = new CompletableFuture<>(); + dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + + CountDownLatch copyLogSegmentLatch = new CountDownLatch(1); + doAnswer(ans -> { + // waiting for verification + copyLogSegmentLatch.await(5000, TimeUnit.MILLISECONDS); + return Optional.empty(); + }).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); + + Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition); + List metadataList = listRemoteLogSegmentMetadata(leaderTopicIdPartition, segmentCount, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)).thenReturn(metadataList.iterator()); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)).thenReturn(metadataList.iterator()).thenReturn(metadataList.iterator()); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 1)).thenReturn(metadataList.iterator()); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 2)).thenReturn(metadataList.iterator()); + + // leadership change to log in dir1 + remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), Collections.emptySet(), topicIds); + + TestUtils.waitForCondition(() -> { + ArgumentCaptor argument = ArgumentCaptor.forClass(Long.class); + verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(argument.capture()); + return 0L == argument.getValue(); + }, "Timed out waiting for updateHighestOffsetInRemoteStorage(0) get invoked for dir1 log"); + + UnifiedLog oldMockLog = mockLog; + Mockito.clearInvocations(oldMockLog); + // simulate altering log dir completes, and the new partition leader changes to the same broker in different log dir (dir2) + mockLog = mock(UnifiedLog.class); + when(mockLog.parentDir()).thenReturn("dir2"); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(mockLog.config()).thenReturn(logConfig); + when(mockLog.logEndOffset()).thenReturn(500L); + + remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), Collections.emptySet(), topicIds); + + // after copyLogSegment completes for log (in dir1), updateHighestOffsetInRemoteStorage will be triggered with new offset + // even though the leader replica has changed to log in dir2 + copyLogSegmentLatch.countDown(); + TestUtils.waitForCondition(() -> { + ArgumentCaptor argument = ArgumentCaptor.forClass(Long.class); + verify(oldMockLog, times(1)).updateHighestOffsetInRemoteStorage(argument.capture()); + return nextSegmentStartOffset - 1 == argument.getValue(); + }, "Timed out waiting for updateHighestOffsetInRemoteStorage(149) get invoked for dir1 log"); + + // On the next run of RLMTask, the log in dir2 will be picked and start by updateHighestOffsetInRemoteStorage to the expected offset + TestUtils.waitForCondition(() -> { + ArgumentCaptor argument = ArgumentCaptor.forClass(Long.class); + verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(argument.capture()); + return nextSegmentStartOffset - 1 == argument.getValue(); + }, "Timed out waiting for updateHighestOffsetInRemoteStorage(149) get invoked for dir2 log"); + + } + @Test void testRemoteLogManagerRemoteMetrics() throws Exception { long oldestSegmentStartOffset = 0L; long olderSegmentStartOffset = 75L; long nextSegmentStartOffset = 150L; when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + when(mockLog.parentDir()).thenReturn("dir1"); // leader epoch preparation checkpoint.write(totalEpochEntries);