Skip to content

Commit

Permalink
KAFKA-15265: Integrate RLMQuotaManager for throttling fetches from re…
Browse files Browse the repository at this point in the history
…mote storage (#16071)

Reviewers: Kamal Chandraprakash<[email protected]>, Luke Chen <[email protected]>, Satish Duggana <[email protected]>
  • Loading branch information
abhijeetk88 authored Jun 5, 2024
1 parent 62e5cce commit bd9d68f
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 10 deletions.
6 changes: 5 additions & 1 deletion core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ RLMQuotaManager createRLMFetchQuotaManager() {
"Tracking fetch byte-rate for Remote Log Manager", time);
}

public boolean isRemoteLogFetchQuotaExceeded() {
return rlmFetchQuotaManager.isQuotaExceeded();
}

static RLMQuotaManagerConfig copyQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) {
return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerCopyMaxBytesPerSecond(),
rlmConfig.remoteLogManagerCopyNumQuotaSamples(),
Expand Down Expand Up @@ -1660,7 +1664,7 @@ long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throw
* @throws java.util.concurrent.RejectedExecutionException if the task cannot be accepted for execution (task queue is full)
*/
public Future<Void> asyncRead(RemoteStorageFetchInfo fetchInfo, Consumer<RemoteLogReadResult> callback) {
return remoteStorageReaderThreadPool.submit(new RemoteLogReader(fetchInfo, this, callback, brokerTopicStats));
return remoteStorageReaderThreadPool.submit(new RemoteLogReader(fetchInfo, this, callback, brokerTopicStats, rlmFetchQuotaManager));
}

void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/kafka/log/remote/RemoteLogReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.log.remote;

import kafka.log.remote.quota.RLMQuotaManager;
import kafka.server.BrokerTopicStats;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.utils.LogContext;
Expand All @@ -34,17 +35,20 @@ public class RemoteLogReader implements Callable<Void> {
private final RemoteLogManager rlm;
private final BrokerTopicStats brokerTopicStats;
private final Consumer<RemoteLogReadResult> callback;
private final RLMQuotaManager quotaManager;

public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
RemoteLogManager rlm,
Consumer<RemoteLogReadResult> callback,
BrokerTopicStats brokerTopicStats) {
BrokerTopicStats brokerTopicStats,
RLMQuotaManager quotaManager) {
this.fetchInfo = fetchInfo;
this.rlm = rlm;
this.brokerTopicStats = brokerTopicStats;
this.callback = callback;
this.brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchRequestRate().mark();
this.brokerTopicStats.allTopicsStats().remoteFetchRequestRate().mark();
this.quotaManager = quotaManager;
logger = new LogContext() {
@Override
public String logPrefix() {
Expand Down Expand Up @@ -73,6 +77,7 @@ public Void call() {
}

logger.debug("Finished reading records from remote storage for topic partition {}", fetchInfo.topicPartition);
quotaManager.record(result.fetchDataInfo.map(fetchDataInfo -> fetchDataInfo.records.sizeInBytes()).orElse(0));
callback.accept(result);

return null;
Expand Down
25 changes: 19 additions & 6 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1758,12 +1758,25 @@ class ReplicaManager(val config: KafkaConfig,
createLogReadResult(highWatermark, leaderLogStartOffset, leaderLogEndOffset,
new OffsetMovedToTieredStorageException("Given offset" + offset + " is moved to tiered storage"))
} else {
// For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information.
// For the first topic-partition that needs remote data, we will use this information to read the data in another thread.
val fetchDataInfo =
new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(),
Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp.topicPartition(),
fetchInfo, params.isolation, params.hardMaxBytesLimit())))
val fetchDataInfo = if (remoteLogManager.get.isRemoteLogFetchQuotaExceeded) {
// We do not want to send an exception in a LogReadResult response (like we do in other cases when we send
// UnknownOffsetMetadata), because it is classified as an error in reading the data, and a response is
// immediately sent back to the client. Instead, we want to serve data for the other topic partitions of the
// fetch request via delayed fetch if required (when sending immediate response, we skip delayed fetch).
new FetchDataInfo(
LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
MemoryRecords.EMPTY,
false,
Optional.empty(),
Optional.empty()
)
} else {
// For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information.
// For the first topic-partition that needs remote data, we will use this information to read the data in another thread.
new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(),
Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp.topicPartition(),
fetchInfo, params.isolation, params.hardMaxBytesLimit())))
}

LogReadResult(fetchDataInfo,
divergingEpoch = None,
Expand Down
16 changes: 14 additions & 2 deletions core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.log.remote;

import kafka.log.remote.quota.RLMQuotaManager;
import kafka.server.BrokerTopicStats;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -45,6 +46,7 @@ public class RemoteLogReaderTest {
public static final String TOPIC = "test";
RemoteLogManager mockRLM = mock(RemoteLogManager.class);
BrokerTopicStats brokerTopicStats = null;
RLMQuotaManager mockQuotaManager = mock(RLMQuotaManager.class);
LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
Records records = mock(Records.class);

Expand All @@ -62,7 +64,7 @@ public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOE

Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null, false);
RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats);
RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats, mockQuotaManager);
remoteLogReader.call();

// verify the callback did get invoked with the expected remoteLogReadResult
Expand All @@ -73,6 +75,11 @@ public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOE
assertTrue(actualRemoteLogReadResult.fetchDataInfo.isPresent());
assertEquals(fetchDataInfo, actualRemoteLogReadResult.fetchDataInfo.get());

// verify the record method on quota manager was called with the expected value
ArgumentCaptor<Double> recordedArg = ArgumentCaptor.forClass(Double.class);
verify(mockQuotaManager, times(1)).record(recordedArg.capture());
assertEquals(100, recordedArg.getValue());

// Verify metrics for remote reads are updated correctly
assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteFetchRequestRate().count());
assertEquals(100, brokerTopicStats.topicStats(TOPIC).remoteFetchBytesRate().count());
Expand All @@ -89,7 +96,7 @@ public void testRemoteLogReaderWithError() throws RemoteStorageException, IOExce

Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null, false);
RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats);
RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats, mockQuotaManager);
remoteLogReader.call();

// verify the callback did get invoked with the expected remoteLogReadResult
Expand All @@ -99,6 +106,11 @@ public void testRemoteLogReaderWithError() throws RemoteStorageException, IOExce
assertTrue(actualRemoteLogReadResult.error.isPresent());
assertFalse(actualRemoteLogReadResult.fetchDataInfo.isPresent());

// verify the record method on quota manager was called with the expected value
ArgumentCaptor<Double> recordedArg = ArgumentCaptor.forClass(Double.class);
verify(mockQuotaManager, times(1)).record(recordedArg.capture());
assertEquals(0, recordedArg.getValue());

// Verify metrics for remote reads are updated correctly
assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteFetchRequestRate().count());
assertEquals(0, brokerTopicStats.topicStats(TOPIC).remoteFetchBytesRate().count());
Expand Down
73 changes: 73 additions & 0 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6703,6 +6703,79 @@ class ReplicaManagerTest {
))
}
}

@Test
def testRemoteReadQuotaExceeded(): Unit = {
when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(true)

val tp0 = new TopicPartition(topic, 0)
val tpId0 = new TopicIdPartition(topicId, tp0)
val fetch: Seq[(TopicIdPartition, LogReadResult)] = readFromLogWithOffsetOutOfRange(tp0)

assertEquals(1, fetch.size)
assertEquals(tpId0, fetch.head._1)
val fetchInfo = fetch.head._2.info
assertEquals(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, fetchInfo.fetchOffsetMetadata)
assertFalse(fetchInfo.records.records().iterator().hasNext)
assertFalse(fetchInfo.firstEntryIncomplete)
assertFalse(fetchInfo.abortedTransactions.isPresent)
assertFalse(fetchInfo.delayedRemoteStorageFetch.isPresent)
}

@Test
def testRemoteReadQuotaNotExceeded(): Unit = {
when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(false)

val tp0 = new TopicPartition(topic, 0)
val tpId0 = new TopicIdPartition(topicId, tp0)
val fetch: Seq[(TopicIdPartition, LogReadResult)] = readFromLogWithOffsetOutOfRange(tp0)

assertEquals(1, fetch.size)
assertEquals(tpId0, fetch.head._1)
val fetchInfo = fetch.head._2.info
assertEquals(1L, fetchInfo.fetchOffsetMetadata.messageOffset)
assertEquals(UnifiedLog.UnknownOffset, fetchInfo.fetchOffsetMetadata.segmentBaseOffset)
assertEquals(-1, fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
assertEquals(MemoryRecords.EMPTY, fetchInfo.records)
assertTrue(fetchInfo.delayedRemoteStorageFetch.isPresent)
}

private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true)
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
val topicIds = Map(tp.topic -> topicId).asJava
val leaderEpoch = 0
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(
new LeaderAndIsrPartitionState()
.setTopicName(tp.topic)
.setPartitionIndex(tp.partition)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())

val params = new FetchParams(ApiKeys.FETCH.latestVersion, -1, 1, 1000, 0, 100, FetchIsolation.HIGH_WATERMARK, None.asJava)
replicaManager.readFromLog(
params,
Seq(new TopicIdPartition(topicId, 0, topic) -> new PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch), Optional.of(leaderEpoch))),
UnboundedQuota,
readFromPurgatory = false)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}

// Some threads are closed, but the state didn't reflect in the JVM immediately, so add some wait time for it
private def assertNoNonDaemonThreadsWithWaiting(threadNamePrefix: String, waitTimeMs: Long = 500L): Unit = {
var nonDemonThreads: mutable.Set[Thread] = mutable.Set.empty[Thread]
Expand Down

0 comments on commit bd9d68f

Please sign in to comment.