diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index e5bf9597ca6f8..5b0d91ff439f7 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -249,6 +249,10 @@ RLMQuotaManager createRLMFetchQuotaManager() { "Tracking fetch byte-rate for Remote Log Manager", time); } + public boolean isRemoteLogFetchQuotaExceeded() { + return rlmFetchQuotaManager.isQuotaExceeded(); + } + static RLMQuotaManagerConfig copyQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) { return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerCopyMaxBytesPerSecond(), rlmConfig.remoteLogManagerCopyNumQuotaSamples(), @@ -1660,7 +1664,7 @@ long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throw * @throws java.util.concurrent.RejectedExecutionException if the task cannot be accepted for execution (task queue is full) */ public Future asyncRead(RemoteStorageFetchInfo fetchInfo, Consumer callback) { - return remoteStorageReaderThreadPool.submit(new RemoteLogReader(fetchInfo, this, callback, brokerTopicStats)); + return remoteStorageReaderThreadPool.submit(new RemoteLogReader(fetchInfo, this, callback, brokerTopicStats, rlmFetchQuotaManager)); } void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition, diff --git a/core/src/main/java/kafka/log/remote/RemoteLogReader.java b/core/src/main/java/kafka/log/remote/RemoteLogReader.java index 5d24b2bbbdfbe..9395cbd60edfe 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogReader.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogReader.java @@ -16,6 +16,7 @@ */ package kafka.log.remote; +import kafka.log.remote.quota.RLMQuotaManager; import kafka.server.BrokerTopicStats; import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.apache.kafka.common.utils.LogContext; @@ -34,17 +35,20 @@ public class RemoteLogReader implements Callable { private final RemoteLogManager rlm; private final BrokerTopicStats brokerTopicStats; private final Consumer callback; + private final RLMQuotaManager quotaManager; public RemoteLogReader(RemoteStorageFetchInfo fetchInfo, RemoteLogManager rlm, Consumer callback, - BrokerTopicStats brokerTopicStats) { + BrokerTopicStats brokerTopicStats, + RLMQuotaManager quotaManager) { this.fetchInfo = fetchInfo; this.rlm = rlm; this.brokerTopicStats = brokerTopicStats; this.callback = callback; this.brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchRequestRate().mark(); this.brokerTopicStats.allTopicsStats().remoteFetchRequestRate().mark(); + this.quotaManager = quotaManager; logger = new LogContext() { @Override public String logPrefix() { @@ -73,6 +77,7 @@ public Void call() { } logger.debug("Finished reading records from remote storage for topic partition {}", fetchInfo.topicPartition); + quotaManager.record(result.fetchDataInfo.map(fetchDataInfo -> fetchDataInfo.records.sizeInBytes()).orElse(0)); callback.accept(result); return null; diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index aa56269a2f40d..7b800c05e55e4 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1754,12 +1754,25 @@ class ReplicaManager(val config: KafkaConfig, createLogReadResult(highWatermark, leaderLogStartOffset, leaderLogEndOffset, new OffsetMovedToTieredStorageException("Given offset" + offset + " is moved to tiered storage")) } else { - // For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information. - // For the first topic-partition that needs remote data, we will use this information to read the data in another thread. - val fetchDataInfo = - new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(), - Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp.topicPartition(), - fetchInfo, params.isolation, params.hardMaxBytesLimit()))) + val fetchDataInfo = if (remoteLogManager.get.isRemoteLogFetchQuotaExceeded) { + // We do not want to send an exception in a LogReadResult response (like we do in other cases when we send + // UnknownOffsetMetadata), because it is classified as an error in reading the data, and a response is + // immediately sent back to the client. Instead, we want to serve data for the other topic partitions of the + // fetch request via delayed fetch if required (when sending immediate response, we skip delayed fetch). + new FetchDataInfo( + LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, + MemoryRecords.EMPTY, + false, + Optional.empty(), + Optional.empty() + ) + } else { + // For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information. + // For the first topic-partition that needs remote data, we will use this information to read the data in another thread. + new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(), + Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp.topicPartition(), + fetchInfo, params.isolation, params.hardMaxBytesLimit()))) + } LogReadResult(fetchDataInfo, divergingEpoch = None, diff --git a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java index bff58364b9d1f..8b1e1bd32aee0 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java @@ -16,6 +16,7 @@ */ package kafka.log.remote; +import kafka.log.remote.quota.RLMQuotaManager; import kafka.server.BrokerTopicStats; import kafka.utils.TestUtils; import org.apache.kafka.common.TopicPartition; @@ -45,6 +46,7 @@ public class RemoteLogReaderTest { public static final String TOPIC = "test"; RemoteLogManager mockRLM = mock(RemoteLogManager.class); BrokerTopicStats brokerTopicStats = null; + RLMQuotaManager mockQuotaManager = mock(RLMQuotaManager.class); LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100); Records records = mock(Records.class); @@ -62,7 +64,7 @@ public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOE Consumer callback = mock(Consumer.class); RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null, false); - RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats); + RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats, mockQuotaManager); remoteLogReader.call(); // verify the callback did get invoked with the expected remoteLogReadResult @@ -73,6 +75,11 @@ public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOE assertTrue(actualRemoteLogReadResult.fetchDataInfo.isPresent()); assertEquals(fetchDataInfo, actualRemoteLogReadResult.fetchDataInfo.get()); + // verify the record method on quota manager was called with the expected value + ArgumentCaptor recordedArg = ArgumentCaptor.forClass(Double.class); + verify(mockQuotaManager, times(1)).record(recordedArg.capture()); + assertEquals(100, recordedArg.getValue()); + // Verify metrics for remote reads are updated correctly assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteFetchRequestRate().count()); assertEquals(100, brokerTopicStats.topicStats(TOPIC).remoteFetchBytesRate().count()); @@ -89,7 +96,7 @@ public void testRemoteLogReaderWithError() throws RemoteStorageException, IOExce Consumer callback = mock(Consumer.class); RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null, false); - RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats); + RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats, mockQuotaManager); remoteLogReader.call(); // verify the callback did get invoked with the expected remoteLogReadResult @@ -99,6 +106,11 @@ public void testRemoteLogReaderWithError() throws RemoteStorageException, IOExce assertTrue(actualRemoteLogReadResult.error.isPresent()); assertFalse(actualRemoteLogReadResult.fetchDataInfo.isPresent()); + // verify the record method on quota manager was called with the expected value + ArgumentCaptor recordedArg = ArgumentCaptor.forClass(Double.class); + verify(mockQuotaManager, times(1)).record(recordedArg.capture()); + assertEquals(0, recordedArg.getValue()); + // Verify metrics for remote reads are updated correctly assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteFetchRequestRate().count()); assertEquals(0, brokerTopicStats.topicStats(TOPIC).remoteFetchBytesRate().count()); diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 6b655ea7837eb..a049ae24c1d58 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -6669,6 +6669,79 @@ class ReplicaManagerTest { )) } } + + @Test + def testRemoteReadQuotaExceeded(): Unit = { + when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(true) + + val tp0 = new TopicPartition(topic, 0) + val tpId0 = new TopicIdPartition(topicId, tp0) + val fetch: Seq[(TopicIdPartition, LogReadResult)] = readFromLogWithOffsetOutOfRange(tp0) + + assertEquals(1, fetch.size) + assertEquals(tpId0, fetch.head._1) + val fetchInfo = fetch.head._2.info + assertEquals(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, fetchInfo.fetchOffsetMetadata) + assertFalse(fetchInfo.records.records().iterator().hasNext) + assertFalse(fetchInfo.firstEntryIncomplete) + assertFalse(fetchInfo.abortedTransactions.isPresent) + assertFalse(fetchInfo.delayedRemoteStorageFetch.isPresent) + } + + @Test + def testRemoteReadQuotaNotExceeded(): Unit = { + when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(false) + + val tp0 = new TopicPartition(topic, 0) + val tpId0 = new TopicIdPartition(topicId, tp0) + val fetch: Seq[(TopicIdPartition, LogReadResult)] = readFromLogWithOffsetOutOfRange(tp0) + + assertEquals(1, fetch.size) + assertEquals(tpId0, fetch.head._1) + val fetchInfo = fetch.head._2.info + assertEquals(1L, fetchInfo.fetchOffsetMetadata.messageOffset) + assertEquals(UnifiedLog.UnknownOffset, fetchInfo.fetchOffsetMetadata.segmentBaseOffset) + assertEquals(-1, fetchInfo.fetchOffsetMetadata.relativePositionInSegment) + assertEquals(MemoryRecords.EMPTY, fetchInfo.records) + assertTrue(fetchInfo.delayedRemoteStorageFetch.isPresent) + } + + private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = { + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true) + try { + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, None) + val partition0Replicas = Seq[Integer](0, 1).asJava + val topicIds = Map(tp.topic -> topicId).asJava + val leaderEpoch = 0 + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq( + new LeaderAndIsrPartitionState() + .setTopicName(tp.topic) + .setPartitionIndex(tp.partition) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(partition0Replicas) + .setPartitionEpoch(0) + .setReplicas(partition0Replicas) + .setIsNew(true) + ).asJava, + topicIds, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) + + val params = new FetchParams(ApiKeys.FETCH.latestVersion, -1, 1, 1000, 0, 100, FetchIsolation.HIGH_WATERMARK, None.asJava) + replicaManager.readFromLog( + params, + Seq(new TopicIdPartition(topicId, 0, topic) -> new PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch), Optional.of(leaderEpoch))), + UnboundedQuota, + readFromPurgatory = false) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + // Some threads are closed, but the state didn't reflect in the JVM immediately, so add some wait time for it private def assertNoNonDaemonThreadsWithWaiting(threadNamePrefix: String, waitTimeMs: Long = 500L): Unit = { var nonDemonThreads: mutable.Set[Thread] = mutable.Set.empty[Thread]