diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index 6724ea9bf3ba0..187fc515e526a 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -38,6 +38,7 @@ + diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index c524238f62341..538fcd2ecc31a 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -96,6 +96,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.security.PrivilegedAction; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -122,6 +123,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -158,6 +161,8 @@ public class RemoteLogManager implements Closeable { private final RemoteLogMetadataManager remoteLogMetadataManager; + private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true); + private final Condition copyQuotaManagerLockCondition = copyQuotaManagerLock.newCondition(); private final RLMQuotaManager rlmCopyQuotaManager; private final RLMQuotaManager rlmFetchQuotaManager; @@ -244,6 +249,13 @@ private void removeMetrics() { remoteStorageReaderThreadPool.removeMetrics(); } + /** + * Returns the timeout for the RLM Tasks to wait for the quota to be available + */ + Duration quotaTimeout() { + return Duration.ofSeconds(1); + } + RLMQuotaManager createRLMCopyQuotaManager() { return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMCopy$.MODULE$, "Tracking copy byte-rate for Remote Log Manager", time); @@ -757,6 +769,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + + copyQuotaManagerLock.lock(); + try { + while (rlmCopyQuotaManager.isQuotaExceeded()) { + logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available."); + // If the thread gets interrupted while waiting, the InterruptedException is thrown + // back to the caller. It's important to note that the task being executed is already + // cancelled before the executing thread is interrupted. The caller is responsible + // for handling the exception gracefully by checking if the task is already cancelled. + boolean ignored = copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), TimeUnit.MILLISECONDS); + } + rlmCopyQuotaManager.record(candidateLogSegment.logSegment.log().sizeInBytes()); + // Signal waiting threads to check the quota again + copyQuotaManagerLockCondition.signalAll(); + } finally { + copyQuotaManagerLock.unlock(); + } copyLogSegment(log, candidateLogSegment.logSegment, candidateLogSegment.nextSegmentOffset); } } diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 667e213b81d08..b4a36183aed88 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -20,6 +20,7 @@ import kafka.cluster.EndPoint; import kafka.cluster.Partition; import kafka.log.UnifiedLog; +import kafka.log.remote.quota.RLMQuotaManager; import kafka.log.remote.quota.RLMQuotaManagerConfig; import kafka.server.BrokerTopicStats; import kafka.server.KafkaConfig; @@ -86,6 +87,7 @@ import org.mockito.InOrder; import org.mockito.MockedConstruction; import org.mockito.Mockito; +import org.opentest4j.AssertionFailedError; import scala.Option; import scala.collection.JavaConverters; @@ -100,6 +102,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -144,6 +147,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; @@ -151,6 +155,7 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; @@ -185,6 +190,7 @@ public class RemoteLogManagerTest { private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class); private final RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class); + private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class); private RemoteLogManagerConfig remoteLogManagerConfig = null; private BrokerTopicStats brokerTopicStats = null; @@ -228,6 +234,12 @@ public RemoteStorageManager createRemoteStorageManager() { public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; } + public RLMQuotaManager createRLMCopyQuotaManager() { + return rlmCopyQuotaManager; + } + public Duration quotaTimeout() { + return Duration.ofMillis(100); + } @Override long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) { return 0L; @@ -2730,6 +2742,204 @@ public void testEpochEntriesAsByteBuffer() throws Exception { } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testCopyQuota(boolean quotaExceeded) throws Exception { + RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded); + + if (quotaExceeded) { + // Verify that the copy operation times out, since no segments can be copied due to quota being exceeded + assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog))); + + // Verify the highest offset in remote storage is updated only once + ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); + verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture()); + // Verify the highest offset in remote storage was -1L before the copy started + assertEquals(-1L, capture.getValue()); + } else { + // Verify the copy operation completes within the timeout, since it does not need to wait for quota availability + assertTimeoutPreemptively(Duration.ofMillis(100), () -> task.copyLogSegmentsToRemote(mockLog)); + + // Verify quota check was performed + verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded(); + // Verify bytes to copy was recorded with the quota manager + verify(rlmCopyQuotaManager, times(1)).record(10); + + // Verify the highest offset in remote storage is updated + ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); + verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture()); + List capturedValues = capture.getAllValues(); + // Verify the highest offset in remote storage was -1L before the copy + assertEquals(-1L, capturedValues.get(0).longValue()); + // Verify it was updated to 149L after the copy + assertEquals(149L, capturedValues.get(1).longValue()); + } + } + + @Test + public void testRLMShutdownDuringQuotaExceededScenario() throws Exception { + remoteLogManager.startup(); + setupRLMTask(true); + remoteLogManager.onLeadershipChange( + Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds); + // Ensure the copy operation is waiting for quota to be available + TestUtils.waitForCondition(() -> { + verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded(); + return true; + }, "Quota exceeded check did not happen"); + // Verify RLM is able to shut down + assertTimeoutPreemptively(Duration.ofMillis(100), () -> remoteLogManager.close()); + } + + // helper method to set up a RemoteLogManager.RLMTask for testing copy quota behaviour + private RemoteLogManager.RLMTask setupRLMTask(boolean quotaExceeded) throws RemoteStorageException, IOException { + long oldSegmentStartOffset = 0L; + long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(mockLog.parentDir()).thenReturn("dir1"); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + // create 2 log segments, with 0 and 150 as log start offset + LogSegment oldSegment = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + File tempFile = TestUtils.tempFile(); + FileRecords fileRecords = mock(FileRecords.class); + when(fileRecords.file()).thenReturn(tempFile); + when(fileRecords.sizeInBytes()).thenReturn(10); + + // Set up the segment that is eligible for copy + when(oldSegment.log()).thenReturn(fileRecords); + when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); + when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + + // set up the active segment + when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + + when(mockLog.activeSegment()).thenReturn(activeSegment); + when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + + File mockProducerSnapshotIndex = TestUtils.tempFile(); + ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + + when(mockLog.producerStateManager()).thenReturn(mockStateManager); + when(mockLog.lastStableOffset()).thenReturn(250L); + + File tempDir = TestUtils.tempDirectory(); + OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); + TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); + File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); + txnFile.createNewFile(); + TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); + when(oldSegment.timeIndex()).thenReturn(timeIdx); + when(oldSegment.offsetIndex()).thenReturn(idx); + when(oldSegment.txnIndex()).thenReturn(txnIndex); + + CompletableFuture dummyFuture = new CompletableFuture<>(); + dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + + when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded); + doNothing().when(rlmCopyQuotaManager).record(anyInt()); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.convertToLeader(2); + return task; + } + + @Test + public void testCopyThrottling() throws Exception { + long oldestSegmentStartOffset = 0L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + // create 3 log segments + LogSegment segmentToCopy = mock(LogSegment.class); + LogSegment segmentToThrottle = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + File tempFile = TestUtils.tempFile(); + FileRecords fileRecords = mock(FileRecords.class); + when(fileRecords.file()).thenReturn(tempFile); + when(fileRecords.sizeInBytes()).thenReturn(10); + + // set up the segment that will be copied + when(segmentToCopy.log()).thenReturn(fileRecords); + when(segmentToCopy.baseOffset()).thenReturn(oldestSegmentStartOffset); + when(segmentToCopy.readNextOffset()).thenReturn(100L); + + // set up the segment that will not be copied because of hitting quota + when(segmentToThrottle.log()).thenReturn(fileRecords); + when(segmentToThrottle.baseOffset()).thenReturn(100L); + when(segmentToThrottle.readNextOffset()).thenReturn(150L); + + // set up the active segment + when(activeSegment.log()).thenReturn(fileRecords); + when(activeSegment.baseOffset()).thenReturn(150L); + + when(mockLog.activeSegment()).thenReturn(activeSegment); + when(mockLog.logStartOffset()).thenReturn(oldestSegmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segmentToCopy, segmentToThrottle, activeSegment))); + + File mockProducerSnapshotIndex = TestUtils.tempFile(); + ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + + when(mockLog.producerStateManager()).thenReturn(mockStateManager); + when(mockLog.lastStableOffset()).thenReturn(250L); + + File tempDir = TestUtils.tempDirectory(); + OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1000).get(); + TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1500).get(); + File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldestSegmentStartOffset, ""); + txnFile.createNewFile(); + TransactionIndex txnIndex = new TransactionIndex(oldestSegmentStartOffset, txnFile); + when(segmentToCopy.timeIndex()).thenReturn(timeIdx); + when(segmentToCopy.offsetIndex()).thenReturn(idx); + when(segmentToCopy.txnIndex()).thenReturn(txnIndex); + + CompletableFuture dummyFuture = new CompletableFuture<>(); + dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + + // After the first call, isQuotaExceeded should return true + when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(false, true); + doNothing().when(rlmCopyQuotaManager).record(anyInt()); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.convertToLeader(2); + + // Verify that the copy operation times out, since the second segment cannot be copied due to quota being exceeded + assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog))); + + // Verify the highest offset in remote storage is updated corresponding to the only segment that was copied + ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); + verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture()); + List capturedValues = capture.getAllValues(); + // Verify the highest offset in remote storage was -1L before the copy + assertEquals(-1L, capturedValues.get(0).longValue()); + // Verify it was updated to 99L after the copy + assertEquals(99L, capturedValues.get(1).longValue()); + } + private Partition mockPartition(TopicIdPartition topicIdPartition) { TopicPartition tp = topicIdPartition.topicPartition(); Partition partition = mock(Partition.class);