From 0b4fcbb16d0fbab67df906bdfe0bb7b880503e22 Mon Sep 17 00:00:00 2001 From: Abhijeet Kumar Date: Wed, 12 Jun 2024 06:27:02 +0530 Subject: [PATCH] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage (#15820) - Added the integration of the quota manager to throttle copy requests to the remote storage. Reference KIP-956 - Added unit-tests for the copy throttling logic. Reviewers: Satish Duggana , Luke Chen , Kamal Chandraprakash --- checkstyle/import-control-core.xml | 1 + .../kafka/log/remote/RemoteLogManager.java | 29 +++ .../log/remote/RemoteLogManagerTest.java | 210 ++++++++++++++++++ 3 files changed, 240 insertions(+) diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index ed6c53a322ba3..a30de55e415c6 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -38,6 +38,7 @@ + diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 43c03190767fb..b920a962afca5 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -97,6 +97,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.security.PrivilegedAction; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -123,6 +124,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -160,6 +163,8 @@ public class RemoteLogManager implements Closeable { private final RemoteLogMetadataManager remoteLogMetadataManager; + private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true); + private final Condition copyQuotaManagerLockCondition = copyQuotaManagerLock.newCondition(); private final RLMQuotaManager rlmCopyQuotaManager; private final RLMQuotaManager rlmFetchQuotaManager; @@ -250,6 +255,13 @@ private void removeMetrics() { remoteStorageReaderThreadPool.removeMetrics(); } + /** + * Returns the timeout for the RLM Tasks to wait for the quota to be available + */ + Duration quotaTimeout() { + return Duration.ofSeconds(1); + } + RLMQuotaManager createRLMCopyQuotaManager() { return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMCopy$.MODULE$, "Tracking copy byte-rate for Remote Log Manager", time); @@ -763,6 +775,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + + copyQuotaManagerLock.lock(); + try { + while (rlmCopyQuotaManager.isQuotaExceeded()) { + logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available."); + // If the thread gets interrupted while waiting, the InterruptedException is thrown + // back to the caller. It's important to note that the task being executed is already + // cancelled before the executing thread is interrupted. The caller is responsible + // for handling the exception gracefully by checking if the task is already cancelled. + boolean ignored = copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), TimeUnit.MILLISECONDS); + } + rlmCopyQuotaManager.record(candidateLogSegment.logSegment.log().sizeInBytes()); + // Signal waiting threads to check the quota again + copyQuotaManagerLockCondition.signalAll(); + } finally { + copyQuotaManagerLock.unlock(); + } copyLogSegment(log, candidateLogSegment.logSegment, candidateLogSegment.nextSegmentOffset); } } diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 83fc5966b49dc..4c4976f060d11 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -21,6 +21,7 @@ import kafka.cluster.EndPoint; import kafka.cluster.Partition; import kafka.log.UnifiedLog; +import kafka.log.remote.quota.RLMQuotaManager; import kafka.log.remote.quota.RLMQuotaManagerConfig; import kafka.server.BrokerTopicStats; import kafka.server.KafkaConfig; @@ -87,6 +88,7 @@ import org.mockito.InOrder; import org.mockito.MockedConstruction; import org.mockito.Mockito; +import org.opentest4j.AssertionFailedError; import scala.Option; import scala.collection.JavaConverters; @@ -101,6 +103,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -146,6 +149,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; @@ -153,6 +157,7 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; @@ -187,6 +192,7 @@ public class RemoteLogManagerTest { private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class); private final RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class); + private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class); private RemoteLogManagerConfig remoteLogManagerConfig = null; private BrokerTopicStats brokerTopicStats = null; @@ -230,6 +236,12 @@ public RemoteStorageManager createRemoteStorageManager() { public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; } + public RLMQuotaManager createRLMCopyQuotaManager() { + return rlmCopyQuotaManager; + } + public Duration quotaTimeout() { + return Duration.ofMillis(100); + } @Override long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) { return 0L; @@ -2735,6 +2747,204 @@ public void testEpochEntriesAsByteBuffer() throws Exception { } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testCopyQuota(boolean quotaExceeded) throws Exception { + RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded); + + if (quotaExceeded) { + // Verify that the copy operation times out, since no segments can be copied due to quota being exceeded + assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog))); + + // Verify the highest offset in remote storage is updated only once + ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); + verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture()); + // Verify the highest offset in remote storage was -1L before the copy started + assertEquals(-1L, capture.getValue()); + } else { + // Verify the copy operation completes within the timeout, since it does not need to wait for quota availability + assertTimeoutPreemptively(Duration.ofMillis(100), () -> task.copyLogSegmentsToRemote(mockLog)); + + // Verify quota check was performed + verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded(); + // Verify bytes to copy was recorded with the quota manager + verify(rlmCopyQuotaManager, times(1)).record(10); + + // Verify the highest offset in remote storage is updated + ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); + verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture()); + List capturedValues = capture.getAllValues(); + // Verify the highest offset in remote storage was -1L before the copy + assertEquals(-1L, capturedValues.get(0).longValue()); + // Verify it was updated to 149L after the copy + assertEquals(149L, capturedValues.get(1).longValue()); + } + } + + @Test + public void testRLMShutdownDuringQuotaExceededScenario() throws Exception { + remoteLogManager.startup(); + setupRLMTask(true); + remoteLogManager.onLeadershipChange( + Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds); + // Ensure the copy operation is waiting for quota to be available + TestUtils.waitForCondition(() -> { + verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded(); + return true; + }, "Quota exceeded check did not happen"); + // Verify RLM is able to shut down + assertTimeoutPreemptively(Duration.ofMillis(100), () -> remoteLogManager.close()); + } + + // helper method to set up a RemoteLogManager.RLMTask for testing copy quota behaviour + private RemoteLogManager.RLMTask setupRLMTask(boolean quotaExceeded) throws RemoteStorageException, IOException { + long oldSegmentStartOffset = 0L; + long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(mockLog.parentDir()).thenReturn("dir1"); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + // create 2 log segments, with 0 and 150 as log start offset + LogSegment oldSegment = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + File tempFile = TestUtils.tempFile(); + FileRecords fileRecords = mock(FileRecords.class); + when(fileRecords.file()).thenReturn(tempFile); + when(fileRecords.sizeInBytes()).thenReturn(10); + + // Set up the segment that is eligible for copy + when(oldSegment.log()).thenReturn(fileRecords); + when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); + when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + + // set up the active segment + when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + + when(mockLog.activeSegment()).thenReturn(activeSegment); + when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + + File mockProducerSnapshotIndex = TestUtils.tempFile(); + ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + + when(mockLog.producerStateManager()).thenReturn(mockStateManager); + when(mockLog.lastStableOffset()).thenReturn(250L); + + File tempDir = TestUtils.tempDirectory(); + OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); + TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); + File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); + txnFile.createNewFile(); + TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); + when(oldSegment.timeIndex()).thenReturn(timeIdx); + when(oldSegment.offsetIndex()).thenReturn(idx); + when(oldSegment.txnIndex()).thenReturn(txnIndex); + + CompletableFuture dummyFuture = new CompletableFuture<>(); + dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + + when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded); + doNothing().when(rlmCopyQuotaManager).record(anyInt()); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.convertToLeader(2); + return task; + } + + @Test + public void testCopyThrottling() throws Exception { + long oldestSegmentStartOffset = 0L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + + // leader epoch preparation + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + + // create 3 log segments + LogSegment segmentToCopy = mock(LogSegment.class); + LogSegment segmentToThrottle = mock(LogSegment.class); + LogSegment activeSegment = mock(LogSegment.class); + + File tempFile = TestUtils.tempFile(); + FileRecords fileRecords = mock(FileRecords.class); + when(fileRecords.file()).thenReturn(tempFile); + when(fileRecords.sizeInBytes()).thenReturn(10); + + // set up the segment that will be copied + when(segmentToCopy.log()).thenReturn(fileRecords); + when(segmentToCopy.baseOffset()).thenReturn(oldestSegmentStartOffset); + when(segmentToCopy.readNextOffset()).thenReturn(100L); + + // set up the segment that will not be copied because of hitting quota + when(segmentToThrottle.log()).thenReturn(fileRecords); + when(segmentToThrottle.baseOffset()).thenReturn(100L); + when(segmentToThrottle.readNextOffset()).thenReturn(150L); + + // set up the active segment + when(activeSegment.log()).thenReturn(fileRecords); + when(activeSegment.baseOffset()).thenReturn(150L); + + when(mockLog.activeSegment()).thenReturn(activeSegment); + when(mockLog.logStartOffset()).thenReturn(oldestSegmentStartOffset); + when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segmentToCopy, segmentToThrottle, activeSegment))); + + File mockProducerSnapshotIndex = TestUtils.tempFile(); + ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + + when(mockLog.producerStateManager()).thenReturn(mockStateManager); + when(mockLog.lastStableOffset()).thenReturn(250L); + + File tempDir = TestUtils.tempDirectory(); + OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1000).get(); + TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1500).get(); + File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldestSegmentStartOffset, ""); + txnFile.createNewFile(); + TransactionIndex txnIndex = new TransactionIndex(oldestSegmentStartOffset, txnFile); + when(segmentToCopy.timeIndex()).thenReturn(timeIdx); + when(segmentToCopy.offsetIndex()).thenReturn(idx); + when(segmentToCopy.txnIndex()).thenReturn(txnIndex); + + CompletableFuture dummyFuture = new CompletableFuture<>(); + dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + + // After the first call, isQuotaExceeded should return true + when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(false, true); + doNothing().when(rlmCopyQuotaManager).record(anyInt()); + + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.convertToLeader(2); + + // Verify that the copy operation times out, since the second segment cannot be copied due to quota being exceeded + assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog))); + + // Verify the highest offset in remote storage is updated corresponding to the only segment that was copied + ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); + verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture()); + List capturedValues = capture.getAllValues(); + // Verify the highest offset in remote storage was -1L before the copy + assertEquals(-1L, capturedValues.get(0).longValue()); + // Verify it was updated to 99L after the copy + assertEquals(99L, capturedValues.get(1).longValue()); + } + private Partition mockPartition(TopicIdPartition topicIdPartition) { TopicPartition tp = topicIdPartition.topicPartition(); Partition partition = mock(Partition.class);