Skip to content

Commit

Permalink
Integrate RLMQuotaManager for throttling copies to remote storage
Browse files Browse the repository at this point in the history
  • Loading branch information
abhijeetk88 committed Jun 3, 2024
1 parent b0fb2ac commit 5440667
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 0 deletions.
1 change: 1 addition & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.mockito" class="AssignmentsManagerTest"/>
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.opentest4j" class="RemoteLogManagerTest"/>
<!-- see KIP-544 for why KafkaYammerMetrics should be used instead of the global default yammer metrics registry
https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable -->
<disallow class="com.yammer.metrics.Metrics" />
Expand Down
29 changes: 29 additions & 0 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -117,6 +118,8 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -153,6 +156,8 @@ public class RemoteLogManager implements Closeable {

private final RemoteLogMetadataManager remoteLogMetadataManager;

private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true);
private final Condition copyQuotaManagerLockCondition = copyQuotaManagerLock.newCondition();
private final RLMQuotaManager rlmCopyQuotaManager;
private final RLMQuotaManager rlmFetchQuotaManager;

Expand Down Expand Up @@ -239,6 +244,13 @@ private void removeMetrics() {
remoteStorageReaderThreadPool.removeMetrics();
}

/**
* Returns the timeout for the RLM Tasks to wait for the quota to be available
*/
Duration quotaTimeout() {
return Duration.ofSeconds(1);
}

RLMQuotaManager createRLMCopyQuotaManager() {
return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMCopy$.MODULE$,
"Tracking copy byte-rate for Remote Log Manager", time);
Expand Down Expand Up @@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
isCancelled(), isLeader());
return;
}

copyQuotaManagerLock.lock();
try {
while (rlmCopyQuotaManager.isQuotaExceeded()) {
logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available.");
// If the thread gets interrupted while waiting, the InterruptedException is thrown
// back to the caller. It's important to note that the task being executed is already
// cancelled before the executing thread is interrupted. The caller is responsible
// for handling the exception gracefully by checking if the task is already cancelled.
boolean ignored = copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), TimeUnit.MILLISECONDS);
}
rlmCopyQuotaManager.record(candidateLogSegment.logSegment.log().sizeInBytes());
// Signal waiting threads to check the quota again
copyQuotaManagerLockCondition.signalAll();
} finally {
copyQuotaManagerLock.unlock();
}
copyLogSegment(log, candidateLogSegment.logSegment, candidateLogSegment.nextSegmentOffset);
}
}
Expand Down
187 changes: 187 additions & 0 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.UnifiedLog;
import kafka.log.remote.quota.RLMQuotaManager;
import kafka.log.remote.quota.RLMQuotaManagerConfig;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
Expand Down Expand Up @@ -86,6 +87,7 @@
import org.mockito.InOrder;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
import org.opentest4j.AssertionFailedError;
import scala.Option;
import scala.collection.JavaConverters;

Expand All @@ -96,6 +98,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -138,6 +141,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -179,6 +183,7 @@ public class RemoteLogManagerTest {

private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class);
private final RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class);
private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class);
private RemoteLogManagerConfig remoteLogManagerConfig = null;

private BrokerTopicStats brokerTopicStats = null;
Expand Down Expand Up @@ -231,6 +236,12 @@ public RemoteStorageManager createRemoteStorageManager() {
public RemoteLogMetadataManager createRemoteLogMetadataManager() {
return remoteLogMetadataManager;
}
public RLMQuotaManager createRLMCopyQuotaManager() {
return rlmCopyQuotaManager;
}
public Duration quotaTimeout() {
return Duration.ofMillis(100);
}
@Override
long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) {
return 0L;
Expand Down Expand Up @@ -2716,6 +2727,182 @@ public void testFetchQuotaManagerConfig() {
assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testCopyQuota(boolean quotaExceeded) throws Exception {
long oldSegmentStartOffset = 0L;
long nextSegmentStartOffset = 150L;

when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());

// leader epoch preparation
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L));

// create 2 log segments, with 0 and 150 as log start offset
LogSegment oldSegment = mock(LogSegment.class);
LogSegment activeSegment = mock(LogSegment.class);

File tempFile = TestUtils.tempFile();
FileRecords fileRecords = mock(FileRecords.class);
when(fileRecords.file()).thenReturn(tempFile);
when(fileRecords.sizeInBytes()).thenReturn(10);

// Set up the segment that is eligible for copy
when(oldSegment.log()).thenReturn(fileRecords);
when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);

// set up the active segment
when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);

when(mockLog.activeSegment()).thenReturn(activeSegment);
when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));

File mockProducerSnapshotIndex = TestUtils.tempFile();
ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));

when(mockLog.producerStateManager()).thenReturn(mockStateManager);
when(mockLog.lastStableOffset()).thenReturn(250L);

File tempDir = TestUtils.tempDirectory();
OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get();
File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, "");
txnFile.createNewFile();
TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
when(oldSegment.timeIndex()).thenReturn(timeIdx);
when(oldSegment.offsetIndex()).thenReturn(idx);
when(oldSegment.txnIndex()).thenReturn(txnIndex);

CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty());

when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
doNothing().when(rlmCopyQuotaManager).record(anyInt());

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(2);

if (quotaExceeded) {
// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded
assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog)));

// Verify the highest offset in remote storage is updated only once
ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture());
// Verify the highest offset in remote storage was -1L before the copy started
assertEquals(-1L, capture.getValue());
} else {
// Verify the copy operation completes within the timeout, since it does not need to wait for quota availability
assertTimeoutPreemptively(Duration.ofMillis(100), () -> task.copyLogSegmentsToRemote(mockLog));

// Verify quota check was performed
verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded();
// Verify bytes to copy was recorded with the quota manager
verify(rlmCopyQuotaManager, times(1)).record(10);

// Verify the highest offset in remote storage is updated
ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
List<Long> capturedValues = capture.getAllValues();
// Verify the highest offset in remote storage was -1L before the copy
assertEquals(-1L, capturedValues.get(0).longValue());
// Verify it was updated to 149L after the copy
assertEquals(149L, capturedValues.get(1).longValue());
}
}

@Test
public void testCopyThrottling() throws Exception {
long oldestSegmentStartOffset = 0L;

when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());

// leader epoch preparation
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L));

// create 3 log segments
LogSegment segmentToCopy = mock(LogSegment.class);
LogSegment segmentToThrottle = mock(LogSegment.class);
LogSegment activeSegment = mock(LogSegment.class);

File tempFile = TestUtils.tempFile();
FileRecords fileRecords = mock(FileRecords.class);
when(fileRecords.file()).thenReturn(tempFile);
when(fileRecords.sizeInBytes()).thenReturn(10);

// set up the segment that will be copied
when(segmentToCopy.log()).thenReturn(fileRecords);
when(segmentToCopy.baseOffset()).thenReturn(oldestSegmentStartOffset);
when(segmentToCopy.readNextOffset()).thenReturn(100L);

// set up the segment that will not be copied because of hitting quota
when(segmentToThrottle.log()).thenReturn(fileRecords);
when(segmentToThrottle.baseOffset()).thenReturn(100L);
when(segmentToThrottle.readNextOffset()).thenReturn(150L);

// set up the active segment
when(activeSegment.log()).thenReturn(fileRecords);
when(activeSegment.baseOffset()).thenReturn(150L);

when(mockLog.activeSegment()).thenReturn(activeSegment);
when(mockLog.logStartOffset()).thenReturn(oldestSegmentStartOffset);
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segmentToCopy, segmentToThrottle, activeSegment)));

File mockProducerSnapshotIndex = TestUtils.tempFile();
ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));

when(mockLog.producerStateManager()).thenReturn(mockStateManager);
when(mockLog.lastStableOffset()).thenReturn(250L);

File tempDir = TestUtils.tempDirectory();
OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1000).get();
TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1500).get();
File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldestSegmentStartOffset, "");
txnFile.createNewFile();
TransactionIndex txnIndex = new TransactionIndex(oldestSegmentStartOffset, txnFile);
when(segmentToCopy.timeIndex()).thenReturn(timeIdx);
when(segmentToCopy.offsetIndex()).thenReturn(idx);
when(segmentToCopy.txnIndex()).thenReturn(txnIndex);

CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty());

// After the first call, isQuotaExceeded should return true
when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(false, true);
doNothing().when(rlmCopyQuotaManager).record(anyInt());

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(2);

// Verify that the copy operation times out, since the second segment cannot be copied due to quota being exceeded
assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog)));

// Verify the highest offset in remote storage is updated corresponding to the only segment that was copied
ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
List<Long> capturedValues = capture.getAllValues();
// Verify the highest offset in remote storage was -1L before the copy
assertEquals(-1L, capturedValues.get(0).longValue());
// Verify it was updated to 99L after the copy
assertEquals(99L, capturedValues.get(1).longValue());
}

private Partition mockPartition(TopicIdPartition topicIdPartition) {
TopicPartition tp = topicIdPartition.topicPartition();
Partition partition = mock(Partition.class);
Expand Down

0 comments on commit 5440667

Please sign in to comment.