From f466e86bb5600b3b969df4b5f8fa4f5512bd1835 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Thu, 12 Sep 2024 00:17:33 +0530 Subject: [PATCH] KAFKA-17400: Added share fetch purgatory for delaying share fetch requests (#16969) Introduced a share fetch purgatory on the broker which delays share fetch requests that cannot be completed instantaneously. Introduced 2 new classes - DelayedShareFetch - Contains logic to instantaneously complete or force complete a share fetch request on timeout. DelayedShareFetchKey - Contains the key which can be used to watch the entries within the share fetch purgatory. ShareFetchUtils - This utility class contains functionalities required for post-processing once the replica manager fetch is completed. There are many scenarios which can cause a share fetch request to be delayed and multiple scenarios when a delayed share fetch can be attempted to be completed. In this PR, we are only targeting the case when record lock partition limit is reached, the ShareFetch should wait for up to MaxWaitMs for records to be released. Reviewers: David Arthur , Andrew Schofield , Apoorv Mittal , Jun Rao --- checkstyle/suppressions.xml | 4 +- .../kafka/server/share/DelayedShareFetch.java | 200 ++++ .../server/share/DelayedShareFetchKey.java | 61 ++ .../kafka/server/share/ShareFetchUtils.java | 114 +++ .../kafka/server/share/SharePartition.java | 19 +- .../server/share/SharePartitionManager.java | 278 ++---- .../scala/kafka/server/BrokerServer.scala | 1 + .../share/DelayedShareFetchKeyTest.java | 56 ++ .../server/share/DelayedShareFetchTest.java | 293 ++++++ .../server/share/ShareFetchUtilsTest.java | 302 ++++++ .../share/SharePartitionManagerTest.java | 861 +++++++++++------- .../server/share/SharePartitionTest.java | 5 +- .../unit/kafka/server/KafkaConfigTest.scala | 1 + .../kafka/server/config/ShareGroupConfig.java | 13 +- 14 files changed, 1702 insertions(+), 506 deletions(-) create mode 100644 core/src/main/java/kafka/server/share/DelayedShareFetch.java create mode 100644 core/src/main/java/kafka/server/share/DelayedShareFetchKey.java create mode 100644 core/src/main/java/kafka/server/share/ShareFetchUtils.java create mode 100644 core/src/test/java/kafka/server/share/DelayedShareFetchKeyTest.java create mode 100644 core/src/test/java/kafka/server/share/DelayedShareFetchTest.java create mode 100644 core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 331ecebf655d..c9dc82487177 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -36,7 +36,7 @@ files="MessageGenerator.java"/> - + @@ -45,7 +45,7 @@ files="(KafkaClusterTestKit).java"/> - + diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java new file mode 100644 index 000000000000..ef80047f222e --- /dev/null +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import kafka.server.DelayedOperation; +import kafka.server.LogReadResult; +import kafka.server.QuotaFactory; +import kafka.server.ReplicaManager; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.storage.internals.log.FetchPartitionData; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import scala.Option; +import scala.Tuple2; +import scala.collection.Seq; +import scala.jdk.javaapi.CollectionConverters; +import scala.runtime.BoxedUnit; + +/** + * A delayed share fetch operation has been introduced in case there is a share fetch request which cannot be completed instantaneously. + */ +public class DelayedShareFetch extends DelayedOperation { + private final SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData; + private final ReplicaManager replicaManager; + private final Map partitionCacheMap; + private Map topicPartitionDataFromTryComplete = new LinkedHashMap<>(); + + private static final Logger log = LoggerFactory.getLogger(DelayedShareFetch.class); + + DelayedShareFetch( + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData, + ReplicaManager replicaManager, + Map partitionCacheMap) { + super(shareFetchPartitionData.fetchParams().maxWaitMs, Option.empty()); + this.shareFetchPartitionData = shareFetchPartitionData; + this.replicaManager = replicaManager; + this.partitionCacheMap = partitionCacheMap; + } + + @Override + public void onExpiration() { + } + + /** + * Complete the share fetch operation by fetching records for all partitions in the share fetch request irrespective + * of whether they have any acquired records. This is called when the fetch operation is forced to complete either + * because records can be acquired for some partitions or due to MaxWaitMs timeout. + */ + @Override + public void onComplete() { + log.trace("Completing the delayed share fetch request for group {}, member {}, " + + "topic partitions {}", shareFetchPartitionData.groupId(), + shareFetchPartitionData.memberId(), shareFetchPartitionData.partitionMaxBytes().keySet()); + + if (shareFetchPartitionData.future().isDone()) + return; + + Map topicPartitionData; + // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. + if (topicPartitionDataFromTryComplete.isEmpty()) + topicPartitionData = acquirablePartitions(); + // tryComplete invoked forceComplete, so we can use the data from tryComplete. + else + topicPartitionData = topicPartitionDataFromTryComplete; + try { + if (topicPartitionData.isEmpty()) { + // No locks for share partitions could be acquired, so we complete the request with an empty response. + shareFetchPartitionData.future().complete(Collections.emptyMap()); + return; + } + log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", + topicPartitionData, shareFetchPartitionData.groupId(), shareFetchPartitionData.fetchParams()); + + Seq> responseLogResult = replicaManager.readFromLog( + shareFetchPartitionData.fetchParams(), + CollectionConverters.asScala( + topicPartitionData.entrySet().stream().map(entry -> + new Tuple2<>(entry.getKey(), entry.getValue())).collect(Collectors.toList()) + ), + QuotaFactory.UnboundedQuota$.MODULE$, + true); + + Map responseData = new HashMap<>(); + responseLogResult.foreach(tpLogResult -> { + TopicIdPartition topicIdPartition = tpLogResult._1(); + LogReadResult logResult = tpLogResult._2(); + FetchPartitionData fetchPartitionData = logResult.toFetchPartitionData(false); + responseData.put(topicIdPartition, fetchPartitionData); + return BoxedUnit.UNIT; + }); + + log.trace("Data successfully retrieved by replica manager: {}", responseData); + ShareFetchUtils.processFetchResponse(shareFetchPartitionData, responseData, partitionCacheMap, replicaManager) + .whenComplete((result, throwable) -> { + if (throwable != null) { + log.error("Error processing fetch response for share partitions", throwable); + shareFetchPartitionData.future().completeExceptionally(throwable); + } else { + shareFetchPartitionData.future().complete(result); + } + // Releasing the lock to move ahead with the next request in queue. + releasePartitionLocks(shareFetchPartitionData.groupId(), topicPartitionData.keySet()); + }); + + } catch (Exception e) { + // Release the locks acquired for the partitions in the share fetch request in case there is an exception + log.error("Error processing delayed share fetch request", e); + shareFetchPartitionData.future().completeExceptionally(e); + releasePartitionLocks(shareFetchPartitionData.groupId(), topicPartitionData.keySet()); + } + } + + /** + * Try to complete the fetch operation if we can acquire records for any partition in the share fetch request. + */ + @Override + public boolean tryComplete() { + log.trace("Try to complete the delayed share fetch request for group {}, member {}, topic partitions {}", + shareFetchPartitionData.groupId(), shareFetchPartitionData.memberId(), + shareFetchPartitionData.partitionMaxBytes().keySet()); + + topicPartitionDataFromTryComplete = acquirablePartitions(); + + if (!topicPartitionDataFromTryComplete.isEmpty()) + return forceComplete(); + log.info("Can't acquire records for any partition in the share fetch request for group {}, member {}, " + + "topic partitions {}", shareFetchPartitionData.groupId(), + shareFetchPartitionData.memberId(), shareFetchPartitionData.partitionMaxBytes().keySet()); + return false; + } + + /** + * Prepare fetch request structure for partitions in the share fetch request for which we can acquire records. + */ + // Visible for testing + Map acquirablePartitions() { + // Initialize the topic partitions for which the fetch should be attempted. + Map topicPartitionData = new LinkedHashMap<>(); + + shareFetchPartitionData.partitionMaxBytes().keySet().forEach(topicIdPartition -> { + SharePartition sharePartition = partitionCacheMap.get(new SharePartitionManager.SharePartitionKey( + shareFetchPartitionData.groupId(), topicIdPartition)); + + int partitionMaxBytes = shareFetchPartitionData.partitionMaxBytes().getOrDefault(topicIdPartition, 0); + // Add the share partition to the list of partitions to be fetched only if we can + // acquire the fetch lock on it. + if (sharePartition.maybeAcquireFetchLock()) { + // If the share partition is already at capacity, we should not attempt to fetch. + if (sharePartition.canAcquireRecords()) { + topicPartitionData.put( + topicIdPartition, + new FetchRequest.PartitionData( + topicIdPartition.topicId(), + sharePartition.nextFetchOffset(), + 0, + partitionMaxBytes, + Optional.empty() + ) + ); + } else { + sharePartition.releaseFetchLock(); + log.trace("Record lock partition limit exceeded for SharePartition {}, " + + "cannot acquire more records", sharePartition); + } + } + }); + return topicPartitionData; + } + + private void releasePartitionLocks(String groupId, Set topicIdPartitions) { + topicIdPartitions.forEach(tp -> partitionCacheMap.get(new + SharePartitionManager.SharePartitionKey(groupId, tp)).releaseFetchLock()); + } +} diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java b/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java new file mode 100644 index 000000000000..e4fe1a9af7fd --- /dev/null +++ b/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import kafka.server.DelayedOperationKey; + +import org.apache.kafka.common.TopicIdPartition; + +import java.util.Objects; + +/** + * A key for delayed operations that fetch data for share consumers. + */ +public class DelayedShareFetchKey implements DelayedOperationKey { + private final String groupId; + private final TopicIdPartition topicIdPartition; + + DelayedShareFetchKey(String groupId, TopicIdPartition topicIdPartition) { + this.groupId = groupId; + this.topicIdPartition = topicIdPartition; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DelayedShareFetchKey that = (DelayedShareFetchKey) o; + return topicIdPartition.equals(that.topicIdPartition) && groupId.equals(that.groupId); + } + + @Override + public int hashCode() { + return Objects.hash(topicIdPartition, groupId); + } + + @Override + public String toString() { + return "DelayedShareFetchKey(groupId=" + groupId + + ", topicIdPartition=" + topicIdPartition + + ")"; + } + + @Override + public String keyLabel() { + return String.format("groupId=%s, topicIdPartition=%s", groupId, topicIdPartition); + } +} diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java b/core/src/main/java/kafka/server/share/ShareFetchUtils.java new file mode 100644 index 000000000000..4a8358272656 --- /dev/null +++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import kafka.server.ReplicaManager; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.storage.internals.log.FetchPartitionData; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import scala.Option; + +/** + * Utility class for post-processing of share fetch operations. + */ +public class ShareFetchUtils { + private static final Logger log = LoggerFactory.getLogger(ShareFetchUtils.class); + + // Process the replica manager fetch response to update share partitions and futures. We acquire the fetched data + // from share partitions. + static CompletableFuture> processFetchResponse( + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData, + Map responseData, + Map partitionCacheMap, + ReplicaManager replicaManager + ) { + Map> futures = new HashMap<>(); + responseData.forEach((topicIdPartition, fetchPartitionData) -> { + + SharePartition sharePartition = partitionCacheMap.get(new SharePartitionManager.SharePartitionKey( + shareFetchPartitionData.groupId(), topicIdPartition)); + futures.put(topicIdPartition, sharePartition.acquire(shareFetchPartitionData.memberId(), fetchPartitionData) + .handle((acquiredRecords, throwable) -> { + log.trace("Acquired records for topicIdPartition: {} with share fetch data: {}, records: {}", + topicIdPartition, shareFetchPartitionData, acquiredRecords); + ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData() + .setPartitionIndex(topicIdPartition.partition()); + + if (throwable != null) { + partitionData.setErrorCode(Errors.forException(throwable).code()); + return partitionData; + } + + if (fetchPartitionData.error.code() == Errors.OFFSET_OUT_OF_RANGE.code()) { + // In case we get OFFSET_OUT_OF_RANGE error, that's because the Log Start Offset is later than the fetch offset. + // So, we would update the start and end offset of the share partition and still return an empty + // response and let the client retry the fetch. This way we do not lose out on the data that + // would be returned for other share partitions in the fetch request. + sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition, replicaManager)); + partitionData.setPartitionIndex(topicIdPartition.partition()) + .setRecords(null) + .setErrorCode(Errors.NONE.code()) + .setAcquiredRecords(Collections.emptyList()) + .setAcknowledgeErrorCode(Errors.NONE.code()); + return partitionData; + } + + // Maybe, in the future, check if no records are acquired, and we want to retry + // replica manager fetch. Depends on the share partition manager implementation, + // if we want parallel requests for the same share partition or not. + partitionData.setPartitionIndex(topicIdPartition.partition()) + .setRecords(fetchPartitionData.records) + .setErrorCode(fetchPartitionData.error.code()) + .setAcquiredRecords(acquiredRecords) + .setAcknowledgeErrorCode(Errors.NONE.code()); + return partitionData; + })); + }); + return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])).thenApply(v -> { + Map processedResult = new HashMap<>(); + futures.forEach((topicIdPartition, future) -> processedResult.put(topicIdPartition, future.join())); + return processedResult; + }); + } + + /** + * The method is used to get the offset for the earliest timestamp for the topic-partition. + * + * @return The offset for the earliest timestamp. + */ + static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager) { + // Isolation level is only required when reading from the latest offset hence use Option.empty() for now. + Option timestampAndOffset = replicaManager.fetchOffsetForTimestamp( + topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(), + Optional.empty(), true); + return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset; + } +} diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index a8b0f881a28e..7fc3f8d30b26 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -16,6 +16,8 @@ */ package kafka.server.share; +import kafka.server.DelayedOperationPurgatory; + import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; @@ -261,6 +263,11 @@ public static RecordState forId(byte id) { */ private SharePartitionState partitionState; + /** + * The delayed share fetch purgatory is used to store the share fetch requests that could not be processed immediately. + */ + private final DelayedOperationPurgatory delayedShareFetchPurgatory; + SharePartition( String groupId, TopicIdPartition topicIdPartition, @@ -269,7 +276,8 @@ public static RecordState forId(byte id) { int recordLockDurationMs, Timer timer, Time time, - Persister persister + Persister persister, + DelayedOperationPurgatory delayedShareFetchPurgatory ) { this.groupId = groupId; this.topicIdPartition = topicIdPartition; @@ -284,6 +292,7 @@ public static RecordState forId(byte id) { this.time = time; this.persister = persister; this.partitionState = SharePartitionState.EMPTY; + this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; } /** @@ -995,6 +1004,9 @@ boolean canAcquireRecords() { * @return A boolean which indicates whether the fetch lock is acquired. */ boolean maybeAcquireFetchLock() { + if (partitionState() != SharePartitionState.ACTIVE) { + return false; + } return fetchLock.compareAndSet(false, true); } @@ -1770,6 +1782,11 @@ && checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.las // Even if write share group state RPC call fails, we will still go ahead with the state transition. // Update the cached state and start and end offsets after releasing the acquisition lock on timeout. maybeUpdateCachedStateAndOffsets(); + + // If we have an acquisition lock timeout for a share-partition, then we should check if + // there is a pending share fetch request for the share-partition and complete it. + DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchKey(groupId, topicIdPartition); + delayedShareFetchPurgatory.checkAndComplete(delayedShareFetchKey); }); } } finally { diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index c14c1ea15c35..d6d43170215f 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -16,7 +16,7 @@ */ package kafka.server.share; -import kafka.server.QuotaFactory; +import kafka.server.DelayedOperationPurgatory; import kafka.server.ReplicaManager; import org.apache.kafka.clients.consumer.AcknowledgeType; @@ -34,9 +34,6 @@ import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Meter; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.record.FileRecords.TimestampAndOffset; -import org.apache.kafka.common.requests.FetchRequest; -import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.requests.ShareFetchRequest; import org.apache.kafka.common.requests.ShareRequestMetadata; import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; @@ -55,7 +52,6 @@ import org.apache.kafka.server.util.timer.SystemTimerReaper; import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.storage.internals.log.FetchParams; -import org.apache.kafka.storage.internals.log.FetchPartitionData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,22 +60,17 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; -import scala.Option; -import scala.Tuple2; import scala.jdk.javaapi.CollectionConverters; -import scala.runtime.BoxedUnit; /** * The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions. @@ -149,6 +140,11 @@ public class SharePartitionManager implements AutoCloseable { */ private final ShareGroupMetrics shareGroupMetrics; + /** + * The delayed share fetch purgatory is used to store the share fetch requests that could not be processed immediately. + */ + private final DelayedOperationPurgatory delayedShareFetchPurgatory; + public SharePartitionManager( ReplicaManager replicaManager, Time time, @@ -156,6 +152,7 @@ public SharePartitionManager( int recordLockDurationMs, int maxDeliveryCount, int maxInFlightMessages, + int shareFetchPurgatoryPurgeIntervalRequests, Persister persister, Metrics metrics ) { @@ -166,6 +163,7 @@ public SharePartitionManager( recordLockDurationMs, maxDeliveryCount, maxInFlightMessages, + shareFetchPurgatoryPurgeIntervalRequests, persister, metrics ); @@ -179,6 +177,7 @@ private SharePartitionManager( int recordLockDurationMs, int maxDeliveryCount, int maxInFlightMessages, + int shareFetchPurgatoryPurgeIntervalRequests, Persister persister, Metrics metrics ) { @@ -195,6 +194,7 @@ private SharePartitionManager( this.maxInFlightMessages = maxInFlightMessages; this.persister = persister; this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time); + this.delayedShareFetchPurgatory = new DelayedOperationPurgatory<>("ShareFetch", this.timer, this.replicaManager.localBrokerId(), shareFetchPurgatoryPurgeIntervalRequests, true, true); } // Visible for testing. @@ -209,7 +209,8 @@ private SharePartitionManager( int maxDeliveryCount, int maxInFlightMessages, Persister persister, - Metrics metrics + Metrics metrics, + DelayedOperationPurgatory delayedShareFetchPurgatory ) { this.replicaManager = replicaManager; this.time = time; @@ -223,6 +224,7 @@ private SharePartitionManager( this.maxInFlightMessages = maxInFlightMessages; this.persister = persister; this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time); + this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; } /** @@ -284,6 +286,12 @@ public CompletableFuture batch.acknowledgeTypes().forEach(this.shareGroupMetrics::recordAcknowledgement)); future.complete(Errors.NONE); }); + + // If we have an acknowledgement completed for a topic-partition, then we should check if + // there is a pending share fetch request for the topic-partition and complete it. + DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchKey(groupId, topicIdPartition); + delayedShareFetchPurgatory.checkAndComplete(delayedShareFetchKey); + futures.put(topicIdPartition, future); } else { futures.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION)); @@ -348,6 +356,11 @@ public CompletableFuture cachedTopicIdPartitionsInShareSession(String groupId, Uui return cachedTopicIdPartitions; } + // Add the share fetch request to the delayed share fetch purgatory to process the fetch request if it can be + // completed else watch until it can be completed/timeout. + private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, Set keys) { + delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, CollectionConverters.asScala(keys).toSeq()); + } + @Override public void close() throws Exception { + this.delayedShareFetchPurgatory.shutdown(); this.timer.close(); this.persister.stop(); if (!fetchQueue.isEmpty()) { @@ -533,8 +553,6 @@ void maybeProcessFetchQueue() { return; } - // Initialize the topic partitions for which the fetch should be attempted. - Map topicPartitionData = new LinkedHashMap<>(); ShareFetchPartitionData shareFetchPartitionData = fetchQueue.poll(); if (shareFetchPartitionData == null) { // No more requests to process, so release the lock. Though we should not reach here as the lock @@ -543,6 +561,16 @@ void maybeProcessFetchQueue() { return; } + if (shareFetchPartitionData.partitionMaxBytes.isEmpty()) { + // If there are no partitions to fetch then complete the future with an empty map. + shareFetchPartitionData.future.complete(Collections.emptyMap()); + // Release the lock so that other threads can process the queue. + releaseProcessFetchQueueLock(); + if (!fetchQueue.isEmpty()) + maybeProcessFetchQueue(); + return; + } + try { shareFetchPartitionData.partitionMaxBytes.keySet().forEach(topicIdPartition -> { SharePartitionKey sharePartitionKey = sharePartitionKey( @@ -560,53 +588,20 @@ void maybeProcessFetchQueue() { maybeCompleteInitializationWithException(sharePartitionKey, shareFetchPartitionData.future, throwable); return; } - // Fetch messages for the partition only if it is active. - int partitionMaxBytes = shareFetchPartitionData.partitionMaxBytes.getOrDefault(topicIdPartition, 0); - // Add the share partition to the list of partitions to be fetched only if we can - // acquire the fetch lock on it. - if (sharePartition.maybeAcquireFetchLock()) { - // If the share partition is already at capacity, we should not attempt to fetch. - if (sharePartition.canAcquireRecords()) { - topicPartitionData.put( - topicIdPartition, - new FetchRequest.PartitionData( - topicIdPartition.topicId(), - sharePartition.nextFetchOffset(), - 0, - partitionMaxBytes, - Optional.empty() - ) - ); - } else { - sharePartition.releaseFetchLock(); - log.info("Record lock partition limit exceeded for SharePartition with key {}, " + - "cannot acquire more records", sharePartitionKey); - } - } }); }); - if (topicPartitionData.isEmpty()) { - // No locks for share partitions could be acquired, so we complete the request if not - // already. The records shall be re-fetched in next poll by the client. - if (!shareFetchPartitionData.future.isDone()) { - shareFetchPartitionData.future.complete(Collections.emptyMap()); - } - // Though if no partitions can be locked then there must be some other request which - // is in-flight and should release the lock. But it's safe to release the lock as - // the lock on share partition already exists which facilitates correct behaviour - // with multiple requests from queue being processed. - releaseProcessFetchQueueLock(); - if (!fetchQueue.isEmpty()) - maybeProcessFetchQueue(); - return; - } - - log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", - topicPartitionData, shareFetchPartitionData.groupId, shareFetchPartitionData.fetchParams); + Set delayedShareFetchWatchKeys = new HashSet<>(); + shareFetchPartitionData.partitionMaxBytes.keySet().forEach( + topicIdPartition -> delayedShareFetchWatchKeys.add( + new DelayedShareFetchKey(shareFetchPartitionData.groupId, topicIdPartition))); - processReplicaManagerFetch(shareFetchPartitionData, topicPartitionData); + // Add the share fetch to the delayed share fetch purgatory to process the fetch request. + addDelayedShareFetch(new DelayedShareFetch(shareFetchPartitionData, replicaManager, partitionCacheMap), + delayedShareFetchWatchKeys); + // Release the lock so that other threads can process the queue. + releaseProcessFetchQueueLock(); // If there are more requests in the queue, then process them. if (!fetchQueue.isEmpty()) maybeProcessFetchQueue(); @@ -614,34 +609,37 @@ void maybeProcessFetchQueue() { } catch (Exception e) { // In case exception occurs then release the locks so queue can be further processed. log.error("Error processing fetch queue for share partitions", e); - shareFetchPartitionData.future.completeExceptionally(e); - releaseFetchQueueAndPartitionsLock(shareFetchPartitionData.groupId, topicPartitionData.keySet()); + releaseProcessFetchQueueLock(); + // If there are more requests in the queue, then process them. + if (!fetchQueue.isEmpty()) + maybeProcessFetchQueue(); } } private SharePartition fetchSharePartition(SharePartitionKey sharePartitionKey) { return partitionCacheMap.computeIfAbsent(sharePartitionKey, - k -> { - long start = time.hiResClockMs(); - SharePartition partition = new SharePartition( - sharePartitionKey.groupId, - sharePartitionKey.topicIdPartition, - maxInFlightMessages, - maxDeliveryCount, - recordLockDurationMs, - timer, - time, - persister - ); - this.shareGroupMetrics.partitionLoadTime(start); - return partition; - }); + k -> { + long start = time.hiResClockMs(); + SharePartition partition = new SharePartition( + sharePartitionKey.groupId, + sharePartitionKey.topicIdPartition, + maxInFlightMessages, + maxDeliveryCount, + recordLockDurationMs, + timer, + time, + persister, + delayedShareFetchPurgatory + ); + this.shareGroupMetrics.partitionLoadTime(start); + return partition; + }); } private void maybeCompleteInitializationWithException( - SharePartitionKey sharePartitionKey, - CompletableFuture> future, - Throwable throwable) { + SharePartitionKey sharePartitionKey, + CompletableFuture> future, + Throwable throwable) { if (throwable instanceof LeaderNotAvailableException) { log.debug("The share partition with key {} is not initialized yet", sharePartitionKey); // Do not process the fetch request for this partition as the leader is not initialized yet. @@ -668,100 +666,6 @@ private void maybeCompleteInitializationWithException( future.completeExceptionally(throwable); } - private void processReplicaManagerFetch( - ShareFetchPartitionData shareFetchPartitionData, - Map topicPartitionData - ) { - replicaManager.fetchMessages( - shareFetchPartitionData.fetchParams, - CollectionConverters.asScala( - topicPartitionData.entrySet().stream().map(entry -> - new Tuple2<>(entry.getKey(), entry.getValue())).collect(Collectors.toList()) - ), - QuotaFactory.UnboundedQuota$.MODULE$, - responsePartitionData -> { - log.trace("Data successfully retrieved by replica manager: {}", responsePartitionData); - List> responseData = CollectionConverters.asJava( - responsePartitionData); - processFetchResponse(shareFetchPartitionData, responseData).whenComplete( - (result, throwable) -> { - if (throwable != null) { - log.error("Error processing fetch response for share partitions", throwable); - shareFetchPartitionData.future.completeExceptionally(throwable); - } else { - shareFetchPartitionData.future.complete(result); - } - // Releasing the lock to move ahead with the next request in queue. - releaseFetchQueueAndPartitionsLock(shareFetchPartitionData.groupId, topicPartitionData.keySet()); - }); - return BoxedUnit.UNIT; - }); - } - - // Visible for testing. - CompletableFuture> processFetchResponse( - ShareFetchPartitionData shareFetchPartitionData, - List> responseData - ) { - Map> futures = new HashMap<>(); - responseData.forEach(data -> { - TopicIdPartition topicIdPartition = data._1; - FetchPartitionData fetchPartitionData = data._2; - - SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey(shareFetchPartitionData.groupId, topicIdPartition)); - futures.put(topicIdPartition, sharePartition.acquire(shareFetchPartitionData.memberId, fetchPartitionData) - .handle((acquiredRecords, throwable) -> { - log.trace("Acquired records for topicIdPartition: {} with share fetch data: {}, records: {}", - topicIdPartition, shareFetchPartitionData, acquiredRecords); - PartitionData partitionData = new PartitionData() - .setPartitionIndex(topicIdPartition.partition()); - - if (throwable != null) { - partitionData.setErrorCode(Errors.forException(throwable).code()); - return partitionData; - } - - if (fetchPartitionData.error.code() == Errors.OFFSET_OUT_OF_RANGE.code()) { - // In case we get OFFSET_OUT_OF_RANGE error, that's because the LSO is later than the fetch offset. - // So, we would update the start and end offset of the share partition and still return an empty - // response and let the client retry the fetch. This way we do not lose out on the data that - // would be returned for other share partitions in the fetch request. - sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition)); - partitionData - .setPartitionIndex(topicIdPartition.partition()) - .setRecords(null) - .setErrorCode(Errors.NONE.code()) - .setAcquiredRecords(Collections.emptyList()) - .setAcknowledgeErrorCode(Errors.NONE.code()); - return partitionData; - } - - // Maybe, in the future, check if no records are acquired, and we want to retry - // replica manager fetch. Depends on the share partition manager implementation, - // if we want parallel requests for the same share partition or not. - partitionData - .setPartitionIndex(topicIdPartition.partition()) - .setRecords(fetchPartitionData.records) - .setErrorCode(fetchPartitionData.error.code()) - .setAcquiredRecords(acquiredRecords) - .setAcknowledgeErrorCode(Errors.NONE.code()); - return partitionData; - })); - }); - - return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])).thenApply(v -> { - Map processedResult = new HashMap<>(); - futures.forEach((topicIdPartition, future) -> processedResult.put(topicIdPartition, future.join())); - return processedResult; - }); - } - - // Visible for testing. - void releaseFetchQueueAndPartitionsLock(String groupId, Set topicIdPartitions) { - topicIdPartitions.forEach(tp -> partitionCacheMap.get(sharePartitionKey(groupId, tp)).releaseFetchLock()); - releaseProcessFetchQueueLock(); - } - // Visible for testing. boolean acquireProcessFetchQueueLock() { return processFetchQueueLock.compareAndSet(false, true); @@ -775,20 +679,6 @@ private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition top return new SharePartitionKey(groupId, topicIdPartition); } - /** - * The method is used to get the offset for the earliest timestamp for the topic-partition. - * - * @return The offset for the earliest timestamp. - */ - // Visible for testing. - long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition) { - // Isolation level is only required when reading from the latest offset hence use Option.empty() for now. - Option timestampAndOffset = replicaManager.fetchOffsetForTimestamp( - topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(), - Optional.empty(), true); - return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset; - } - /** * The SharePartitionKey is used to uniquely identify a share partition. The key is made up of the * share group id, the topic id and the partition id. The key is used to store the SharePartition @@ -850,6 +740,26 @@ public ShareFetchPartitionData(FetchParams fetchParams, String groupId, String m this.future = future; this.partitionMaxBytes = partitionMaxBytes; } + + public String groupId() { + return groupId; + } + + public String memberId() { + return memberId; + } + + public CompletableFuture> future() { + return future; + } + + public Map partitionMaxBytes() { + return partitionMaxBytes; + } + + public FetchParams fetchParams() { + return fetchParams; + } } static class ShareGroupMetrics { diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 4b3f242009fe..f7b913b7f485 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -421,6 +421,7 @@ class BrokerServer( config.shareGroupConfig.shareGroupRecordLockDurationMs, config.shareGroupConfig.shareGroupDeliveryCountLimit, config.shareGroupConfig.shareGroupPartitionMaxRecordLocks, + config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests, persister, new Metrics() ) diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchKeyTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchKeyTest.java new file mode 100644 index 000000000000..91b0d0ae082e --- /dev/null +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchKeyTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class DelayedShareFetchKeyTest { + + @Test + public void testDelayedShareFetchEqualsAndHashcode() { + Uuid topicUuid = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(topicUuid, new TopicPartition("topic", 0)); + TopicIdPartition tp1 = new TopicIdPartition(topicUuid, new TopicPartition("topic", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic2", 0)); + + Map keyMap = new HashMap<>(); + keyMap.put("key0", new DelayedShareFetchKey("grp", tp0)); + keyMap.put("key1", new DelayedShareFetchKey("grp", tp1)); + keyMap.put("key2", new DelayedShareFetchKey("grp", tp2)); + keyMap.put("key3", new DelayedShareFetchKey("grp2", tp0)); + keyMap.put("key4", new DelayedShareFetchKey("grp2", tp1)); + + keyMap.forEach((key1, value1) -> keyMap.forEach((key2, value2) -> { + if (key1.equals(key2)) { + assertEquals(value1, value2); + assertEquals(value1.hashCode(), value2.hashCode()); + } else { + assertNotEquals(value1, value2); + } + })); + } +} diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java new file mode 100644 index 000000000000..cc490c0bd97a --- /dev/null +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import kafka.server.ReplicaManager; +import kafka.server.ReplicaQuota; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.storage.internals.log.FetchIsolation; +import org.apache.kafka.storage.internals.log.FetchParams; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + + +public class DelayedShareFetchTest { + private static final int MAX_WAIT_MS = 5000; + + @Test + public void testDelayedShareFetchTryCompleteReturnsFalse() { + String groupId = "grp"; + Uuid topicId = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + + Map partitionCacheMap = new ConcurrentHashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes); + + when(sp0.canAcquireRecords()).thenReturn(false); + when(sp1.canAcquireRecords()).thenReturn(false); + DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() + .withShareFetchPartitionData(shareFetchPartitionData) + .withPartitionCacheMap(partitionCacheMap) + .build(); + + // Since there is no partition that can be acquired, tryComplete should return false. + assertFalse(delayedShareFetch.tryComplete()); + assertFalse(delayedShareFetch.isCompleted()); + } + + @Test + public void testDelayedShareFetchTryCompleteReturnsTrue() { + String groupId = "grp"; + Uuid topicId = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + + Map partitionCacheMap = new ConcurrentHashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes); + + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() + .withShareFetchPartitionData(shareFetchPartitionData) + .withPartitionCacheMap(partitionCacheMap) + .build(); + assertFalse(delayedShareFetch.isCompleted()); + + // Since sp1 can be acquired, tryComplete should return true. + assertTrue(delayedShareFetch.tryComplete()); + assertTrue(delayedShareFetch.isCompleted()); + } + + @Test + public void testEmptyFutureReturnedByDelayedShareFetchOnComplete() { + String groupId = "grp"; + Uuid topicId = Uuid.randomUuid(); + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + + Map partitionCacheMap = new ConcurrentHashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes); + + when(sp0.canAcquireRecords()).thenReturn(false); + when(sp1.canAcquireRecords()).thenReturn(false); + DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() + .withShareFetchPartitionData(shareFetchPartitionData) + .withReplicaManager(replicaManager) + .withPartitionCacheMap(partitionCacheMap) + .build(); + assertFalse(delayedShareFetch.isCompleted()); + delayedShareFetch.forceComplete(); + + // Since no partition could be acquired, the future should be empty and replicaManager.readFromLog should not be called. + assertEquals(0, shareFetchPartitionData.future().join().size()); + Mockito.verify(replicaManager, times(0)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); + assertTrue(delayedShareFetch.isCompleted()); + } + + @Test + public void testReplicaManagerFetchShouldHappenOnComplete() { + String groupId = "grp"; + Uuid topicId = Uuid.randomUuid(); + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + + Map partitionCacheMap = new ConcurrentHashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes); + + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() + .withShareFetchPartitionData(shareFetchPartitionData) + .withReplicaManager(replicaManager) + .withPartitionCacheMap(partitionCacheMap) + .build(); + assertFalse(delayedShareFetch.isCompleted()); + delayedShareFetch.forceComplete(); + + // Since we can acquire records from sp0, replicaManager.readFromLog should be called once and only for sp0. + Mockito.verify(replicaManager, times(1)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); + Mockito.verify(sp0, times(1)).nextFetchOffset(); + Mockito.verify(sp1, times(0)).nextFetchOffset(); + assertTrue(delayedShareFetch.isCompleted()); + } + + @Test + public void testToCompleteAnAlreadyCompletedFuture() { + String groupId = "grp"; + Uuid topicId = Uuid.randomUuid(); + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + + SharePartition sp0 = mock(SharePartition.class); + + Map partitionCacheMap = new ConcurrentHashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0); + + CompletableFuture> future = new CompletableFuture<>(); + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), + future, partitionMaxBytes); + + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(false); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchPartitionData(shareFetchPartitionData) + .withReplicaManager(replicaManager) + .withPartitionCacheMap(partitionCacheMap) + .build()); + assertFalse(delayedShareFetch.isCompleted()); + + // Force completing the share fetch request for the first time should complete the future with an empty map. + delayedShareFetch.forceComplete(); + assertTrue(delayedShareFetch.isCompleted()); + // Verifying that the first forceComplete calls acquirablePartitions method in DelayedShareFetch. + Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(); + assertEquals(0, shareFetchPartitionData.future().join().size()); + + // Force completing the share fetch request for the second time should hit the future completion check and not + // proceed ahead in the function. + delayedShareFetch.forceComplete(); + assertTrue(delayedShareFetch.isCompleted()); + // Verifying that the second forceComplete does not call acquirablePartitions method in DelayedShareFetch. + Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(); + } + + static class DelayedShareFetchBuilder { + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = mock(SharePartitionManager.ShareFetchPartitionData.class); + private ReplicaManager replicaManager = mock(ReplicaManager.class); + private Map partitionCacheMap = new HashMap<>(); + + DelayedShareFetchBuilder withShareFetchPartitionData(SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData) { + this.shareFetchPartitionData = shareFetchPartitionData; + return this; + } + + DelayedShareFetchBuilder withReplicaManager(ReplicaManager replicaManager) { + this.replicaManager = replicaManager; + return this; + } + + DelayedShareFetchBuilder withPartitionCacheMap(Map partitionCacheMap) { + this.partitionCacheMap = partitionCacheMap; + return this; + } + + public static DelayedShareFetchBuilder builder() { + return new DelayedShareFetchBuilder(); + } + + public DelayedShareFetch build() { + return new DelayedShareFetch( + shareFetchPartitionData, + replicaManager, + partitionCacheMap); + } + } +} diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java new file mode 100644 index 000000000000..12040928b407 --- /dev/null +++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import kafka.server.ReplicaManager; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.storage.internals.log.FetchIsolation; +import org.apache.kafka.storage.internals.log.FetchParams; +import org.apache.kafka.storage.internals.log.FetchPartitionData; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import scala.Option; + +import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +public class ShareFetchUtilsTest { + + @Test + public void testProcessFetchResponse() { + String groupId = "grp"; + String memberId = Uuid.randomUuid().toString(); + Uuid topicId = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + when(sp0.nextFetchOffset()).thenReturn((long) 3); + when(sp1.nextFetchOffset()).thenReturn((long) 3); + + when(sp0.acquire(any(), any())).thenReturn( + CompletableFuture.completedFuture(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)))); + when(sp1.acquire(any(), any())).thenReturn( + CompletableFuture.completedFuture(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)))); + + doNothing().when(sp1).updateCacheAndOffsets(any(Long.class)); + doNothing().when(sp0).updateCacheAndOffsets(any(Long.class)); + + Map partitionCacheMap = new ConcurrentHashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, memberId, + new CompletableFuture<>(), partitionMaxBytes); + + MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, + new SimpleRecord("0".getBytes(), "v".getBytes()), + new SimpleRecord("1".getBytes(), "v".getBytes()), + new SimpleRecord("2".getBytes(), "v".getBytes()), + new SimpleRecord(null, "value".getBytes())); + + MemoryRecords records1 = MemoryRecords.withRecords(100L, Compression.NONE, + new SimpleRecord("0".getBytes(), "v".getBytes()), + new SimpleRecord("1".getBytes(), "v".getBytes()), + new SimpleRecord("2".getBytes(), "v".getBytes()), + new SimpleRecord(null, "value".getBytes())); + + Map responseData = new HashMap<>(); + responseData.put(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L, + records, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + responseData.put(tp1, new FetchPartitionData(Errors.NONE, 0L, 100L, + records1, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + CompletableFuture> result = + ShareFetchUtils.processFetchResponse(shareFetchPartitionData, responseData, partitionCacheMap, mock(ReplicaManager.class)); + + assertTrue(result.isDone()); + Map resultData = result.join(); + assertEquals(2, resultData.size()); + assertTrue(resultData.containsKey(tp0)); + assertTrue(resultData.containsKey(tp1)); + assertEquals(0, resultData.get(tp0).partitionIndex()); + assertEquals(1, resultData.get(tp1).partitionIndex()); + assertEquals(Errors.NONE.code(), resultData.get(tp0).errorCode()); + assertEquals(Errors.NONE.code(), resultData.get(tp1).errorCode()); + assertEquals(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)), + resultData.get(tp0).acquiredRecords()); + assertEquals(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)), + resultData.get(tp1).acquiredRecords()); + } + + @Test + public void testProcessFetchResponseWithEmptyRecords() { + String groupId = "grp"; + String memberId = Uuid.randomUuid().toString(); + Uuid topicId = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + when(sp0.nextFetchOffset()).thenReturn((long) 3); + when(sp1.nextFetchOffset()).thenReturn((long) 3); + + when(sp0.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture(Collections.emptyList())); + when(sp1.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture(Collections.emptyList())); + + doNothing().when(sp1).updateCacheAndOffsets(any(Long.class)); + doNothing().when(sp0).updateCacheAndOffsets(any(Long.class)); + + Map partitionCacheMap = new ConcurrentHashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, memberId, + new CompletableFuture<>(), partitionMaxBytes); + + Map responseData = new HashMap<>(); + responseData.put(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L, + MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + responseData.put(tp1, new FetchPartitionData(Errors.NONE, 0L, 0L, + MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + CompletableFuture> result = + ShareFetchUtils.processFetchResponse(shareFetchPartitionData, responseData, partitionCacheMap, mock(ReplicaManager.class)); + + assertTrue(result.isDone()); + Map resultData = result.join(); + assertEquals(2, resultData.size()); + assertTrue(resultData.containsKey(tp0)); + assertTrue(resultData.containsKey(tp1)); + assertEquals(0, resultData.get(tp0).partitionIndex()); + assertEquals(1, resultData.get(tp1).partitionIndex()); + assertEquals(Errors.NONE.code(), resultData.get(tp0).errorCode()); + assertEquals(Errors.NONE.code(), resultData.get(tp1).errorCode()); + assertEquals(Collections.emptyList(), resultData.get(tp0).acquiredRecords()); + assertEquals(Collections.emptyList(), resultData.get(tp1).acquiredRecords()); + } + + @Test + public void testProcessFetchResponseWithLsoMovementForTopicPartition() { + String groupId = "grp"; + Uuid fooId = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1)); + + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + + SharePartition sp0 = Mockito.mock(SharePartition.class); + SharePartition sp1 = Mockito.mock(SharePartition.class); + + Map partitionCacheMap = new ConcurrentHashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), + groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), partitionMaxBytes); + + ReplicaManager replicaManager = mock(ReplicaManager.class); + + // Mock the replicaManager.fetchOffsetForTimestamp method to return a timestamp and offset for the topic partition. + FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(100L, 1L, Optional.empty()); + doReturn(Option.apply(timestampAndOffset)).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean()); + + when(sp0.nextFetchOffset()).thenReturn((long) 0, (long) 5); + when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 4); + + when(sp0.acquire(anyString(), any(FetchPartitionData.class))).thenReturn( + CompletableFuture.completedFuture(Collections.emptyList()), + CompletableFuture.completedFuture(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)))); + when(sp1.acquire(anyString(), any(FetchPartitionData.class))).thenReturn( + CompletableFuture.completedFuture(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1))), + CompletableFuture.completedFuture(Collections.emptyList())); + + doNothing().when(sp1).updateCacheAndOffsets(any(Long.class)); + doNothing().when(sp0).updateCacheAndOffsets(any(Long.class)); + + MemoryRecords records1 = MemoryRecords.withRecords(Compression.NONE, + new SimpleRecord("0".getBytes(), "v".getBytes()), + new SimpleRecord("1".getBytes(), "v".getBytes()), + new SimpleRecord("2".getBytes(), "v".getBytes()), + new SimpleRecord(null, "value".getBytes())); + + Map responseData1 = new HashMap<>(); + responseData1.put(tp0, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE, 0L, 0L, + MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + responseData1.put(tp1, new FetchPartitionData(Errors.NONE, 0L, 0L, + records1, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + CompletableFuture> result1 = + ShareFetchUtils.processFetchResponse(shareFetchPartitionData, responseData1, partitionCacheMap, replicaManager); + + assertTrue(result1.isDone()); + Map resultData1 = result1.join(); + assertEquals(2, resultData1.size()); + assertTrue(resultData1.containsKey(tp0)); + assertTrue(resultData1.containsKey(tp1)); + assertEquals(0, resultData1.get(tp0).partitionIndex()); + assertEquals(1, resultData1.get(tp1).partitionIndex()); + assertEquals(Errors.NONE.code(), resultData1.get(tp0).errorCode()); + assertEquals(Errors.NONE.code(), resultData1.get(tp1).errorCode()); + + // Since we have OFFSET_OUT_OF_RANGE exception for tp1 and no exception for tp2 from SharePartition class, + // we should have 1 call for updateCacheAndOffsets for tp0 and 0 calls for tp1. + Mockito.verify(sp0, times(1)).updateCacheAndOffsets(any(Long.class)); + Mockito.verify(sp1, times(0)).updateCacheAndOffsets(any(Long.class)); + + MemoryRecords records2 = MemoryRecords.withRecords(100L, Compression.NONE, + new SimpleRecord("0".getBytes(), "v".getBytes()), + new SimpleRecord("1".getBytes(), "v".getBytes()), + new SimpleRecord("2".getBytes(), "v".getBytes()), + new SimpleRecord(null, "value".getBytes())); + + Map responseData2 = new HashMap<>(); + responseData2.put(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L, + records2, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + responseData2.put(tp1, new FetchPartitionData(Errors.NONE, 0L, 0L, + MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + CompletableFuture> result2 = + ShareFetchUtils.processFetchResponse(shareFetchPartitionData, responseData2, partitionCacheMap, replicaManager); + + assertTrue(result2.isDone()); + Map resultData2 = result2.join(); + assertEquals(2, resultData2.size()); + assertTrue(resultData2.containsKey(tp0)); + assertTrue(resultData2.containsKey(tp1)); + assertEquals(0, resultData2.get(tp0).partitionIndex()); + assertEquals(1, resultData2.get(tp1).partitionIndex()); + assertEquals(Errors.NONE.code(), resultData2.get(tp0).errorCode()); + assertEquals(Errors.NONE.code(), resultData2.get(tp1).errorCode()); + + // Since we don't see any exception for tp1 and tp2 from SharePartition class, + // the updateCacheAndOffsets calls should remain the same as the previous case. + Mockito.verify(sp0, times(1)).updateCacheAndOffsets(any(Long.class)); + Mockito.verify(sp1, times(0)).updateCacheAndOffsets(any(Long.class)); + } + +} diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 5bf0412fd3c6..9494f16c2d70 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -16,6 +16,8 @@ */ package kafka.server.share; +import kafka.server.DelayedOperationPurgatory; +import kafka.server.LogReadResult; import kafka.server.ReplicaManager; import kafka.server.ReplicaQuota; @@ -24,7 +26,6 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.errors.FencedStateEpochException; @@ -36,13 +37,11 @@ import org.apache.kafka.common.errors.ShareSessionNotFoundException; import org.apache.kafka.common.message.ShareAcknowledgeResponseData; import org.apache.kafka.common.message.ShareFetchResponseData; -import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.ShareFetchRequest; import org.apache.kafka.common.requests.ShareFetchResponse; @@ -66,9 +65,11 @@ import org.apache.kafka.server.util.timer.SystemTimer; import org.apache.kafka.server.util.timer.SystemTimerReaper; import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.FetchIsolation; import org.apache.kafka.storage.internals.log.FetchParams; -import org.apache.kafka.storage.internals.log.FetchPartitionData; +import org.apache.kafka.storage.internals.log.LogOffsetMetadata; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -86,11 +87,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalInt; -import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -98,7 +96,10 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import scala.Option; import scala.Tuple2; +import scala.collection.Seq; +import scala.jdk.javaapi.CollectionConverters; import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -110,11 +111,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -126,7 +128,10 @@ public class SharePartitionManagerTest { private static final int RECORD_LOCK_DURATION_MS = 30000; private static final int MAX_DELIVERY_COUNT = 5; private static final short MAX_IN_FLIGHT_MESSAGES = 200; - private static final int PARTITION_MAX_BYTES = 40000; + static final int PARTITION_MAX_BYTES = 40000; + private static final int DELAYED_SHARE_FETCH_MAX_WAIT_MS = 2000; + private static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000; + private static final int DELAYED_SHARE_FETCH_TIMEOUT_MS = 3000; private static Timer mockTimer; @@ -998,7 +1003,7 @@ public void testSharePartitionKey() { public void testMultipleSequentialShareFetches() { String groupId = "grp"; Uuid memberId1 = Uuid.randomUuid(); - FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, + FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); Uuid fooId = Uuid.randomUuid(); Uuid barId = Uuid.randomUuid(); @@ -1022,28 +1027,31 @@ public void testMultipleSequentialShareFetches() { Time time = mock(Time.class); when(time.hiResClockMs()).thenReturn(0L).thenReturn(100L); Metrics metrics = new Metrics(); + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() .withReplicaManager(replicaManager) .withTime(time) .withMetrics(metrics) + .withTimer(mockTimer) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) .build(); - doAnswer(invocation -> { - sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId, partitionMaxBytes.keySet()); - return null; - }).when(replicaManager).fetchMessages(any(), any(), any(ReplicaQuota.class), any()); + doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); sharePartitionManager.fetchMessages(groupId, memberId1.toString(), fetchParams, partitionMaxBytes); - Mockito.verify(replicaManager, times(1)).fetchMessages( - any(), any(), any(ReplicaQuota.class), any()); + Mockito.verify(replicaManager, times(1)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); sharePartitionManager.fetchMessages(groupId, memberId1.toString(), fetchParams, partitionMaxBytes); - Mockito.verify(replicaManager, times(2)).fetchMessages( - any(), any(), any(ReplicaQuota.class), any()); + Mockito.verify(replicaManager, times(2)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); sharePartitionManager.fetchMessages(groupId, memberId1.toString(), fetchParams, partitionMaxBytes); - Mockito.verify(replicaManager, times(3)).fetchMessages( - any(), any(), any(ReplicaQuota.class), any()); + Mockito.verify(replicaManager, times(3)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); Map> expectedMetrics = new HashMap<>(); expectedMetrics.put( @@ -1060,130 +1068,12 @@ public void testMultipleSequentialShareFetches() { }); } - @Test - public void testProcessFetchResponse() { - String groupId = "grp"; - String memberId = Uuid.randomUuid().toString(); - Uuid topicId = Uuid.randomUuid(); - TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); - TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); - Map partitionMaxBytes = new HashMap<>(); - partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); - partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); - - Map partitionCacheMap = new ConcurrentHashMap<>(); - partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp0), - k -> new SharePartition(groupId, tp0, MAX_IN_FLIGHT_MESSAGES, MAX_DELIVERY_COUNT, - RECORD_LOCK_DURATION_MS, mockTimer, new MockTime(), NoOpShareStatePersister.getInstance())); - partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp1), - k -> new SharePartition(groupId, tp1, MAX_IN_FLIGHT_MESSAGES, MAX_DELIVERY_COUNT, - RECORD_LOCK_DURATION_MS, mockTimer, new MockTime(), NoOpShareStatePersister.getInstance())); - - SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap).build(); - - CompletableFuture> future = new CompletableFuture<>(); - SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( - new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, - 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, memberId, - future, partitionMaxBytes); - - MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, - new SimpleRecord("0".getBytes(), "v".getBytes()), - new SimpleRecord("1".getBytes(), "v".getBytes()), - new SimpleRecord("2".getBytes(), "v".getBytes()), - new SimpleRecord(null, "value".getBytes())); - - MemoryRecords records1 = MemoryRecords.withRecords(100L, Compression.NONE, - new SimpleRecord("0".getBytes(), "v".getBytes()), - new SimpleRecord("1".getBytes(), "v".getBytes()), - new SimpleRecord("2".getBytes(), "v".getBytes()), - new SimpleRecord(null, "value".getBytes())); - - List> responseData = new ArrayList<>(); - responseData.add(new Tuple2<>(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L, - records, Optional.empty(), OptionalLong.empty(), Optional.empty(), - OptionalInt.empty(), false))); - responseData.add(new Tuple2<>(tp1, new FetchPartitionData(Errors.NONE, 0L, 100L, - records1, Optional.empty(), OptionalLong.empty(), Optional.empty(), - OptionalInt.empty(), false))); - CompletableFuture> result = - sharePartitionManager.processFetchResponse(shareFetchPartitionData, responseData); - - assertTrue(result.isDone()); - Map resultData = result.join(); - assertEquals(2, resultData.size()); - assertTrue(resultData.containsKey(tp0)); - assertTrue(resultData.containsKey(tp1)); - assertEquals(0, resultData.get(tp0).partitionIndex()); - assertEquals(1, resultData.get(tp1).partitionIndex()); - assertEquals(Errors.NONE.code(), resultData.get(tp0).errorCode()); - assertEquals(Errors.NONE.code(), resultData.get(tp1).errorCode()); - assertEquals(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords() - .setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)), - resultData.get(tp0).acquiredRecords()); - assertEquals(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords() - .setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)), - resultData.get(tp1).acquiredRecords()); - } - - @Test - public void testProcessFetchResponseWithEmptyRecords() { - String groupId = "grp"; - String memberId = Uuid.randomUuid().toString(); - Uuid topicId = Uuid.randomUuid(); - TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); - TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); - Map partitionMaxBytes = new HashMap<>(); - partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); - partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); - - Map partitionCacheMap = new ConcurrentHashMap<>(); - partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp0), - k -> new SharePartition(groupId, tp0, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, - RECORD_LOCK_DURATION_MS, mockTimer, new MockTime(), NoOpShareStatePersister.getInstance())); - partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp1), - k -> new SharePartition(groupId, tp1, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, - RECORD_LOCK_DURATION_MS, mockTimer, new MockTime(), NoOpShareStatePersister.getInstance())); - - SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap).build(); - - CompletableFuture> future = new CompletableFuture<>(); - SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( - new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, - 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, memberId, - future, partitionMaxBytes); - - List> responseData = new ArrayList<>(); - responseData.add(new Tuple2<>(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L, - MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), - OptionalInt.empty(), false))); - responseData.add(new Tuple2<>(tp1, new FetchPartitionData(Errors.NONE, 0L, 0L, - MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), - OptionalInt.empty(), false))); - CompletableFuture> result = - sharePartitionManager.processFetchResponse(shareFetchPartitionData, responseData); - - assertTrue(result.isDone()); - Map resultData = result.join(); - assertEquals(2, resultData.size()); - assertTrue(resultData.containsKey(tp0)); - assertTrue(resultData.containsKey(tp1)); - assertEquals(0, resultData.get(tp0).partitionIndex()); - assertEquals(1, resultData.get(tp1).partitionIndex()); - assertEquals(Errors.NONE.code(), resultData.get(tp0).errorCode()); - assertEquals(Errors.NONE.code(), resultData.get(tp1).errorCode()); - assertEquals(Collections.emptyList(), resultData.get(tp0).acquiredRecords()); - assertEquals(Collections.emptyList(), resultData.get(tp1).acquiredRecords()); - } - @Test public void testMultipleConcurrentShareFetches() throws InterruptedException { String groupId = "grp"; Uuid memberId1 = Uuid.randomUuid(); - FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, + FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); Uuid fooId = Uuid.randomUuid(); Uuid barId = Uuid.randomUuid(); @@ -1199,23 +1089,16 @@ public void testMultipleConcurrentShareFetches() throws InterruptedException { final Time time = new MockTime(0, System.currentTimeMillis(), 0); ReplicaManager replicaManager = mock(ReplicaManager.class); - - Map partitionCacheMap = new ConcurrentHashMap<>(); - partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp0), - k -> new SharePartition(groupId, tp0, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, - RECORD_LOCK_DURATION_MS, mockTimer, time, NoOpShareStatePersister.getInstance())); - partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp1), - k -> new SharePartition(groupId, tp1, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, - RECORD_LOCK_DURATION_MS, mockTimer, time, NoOpShareStatePersister.getInstance())); - partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp2), - k -> new SharePartition(groupId, tp2, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, - RECORD_LOCK_DURATION_MS, mockTimer, time, NoOpShareStatePersister.getInstance())); - partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp3), - k -> new SharePartition(groupId, tp3, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, - RECORD_LOCK_DURATION_MS, mockTimer, time, NoOpShareStatePersister.getInstance())); + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap).withTime(time).withReplicaManager(replicaManager).build(); + .withTime(time) + .withReplicaManager(replicaManager) + .withTimer(mockTimer) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .build(); SharePartition sp0 = mock(SharePartition.class); SharePartition sp1 = mock(SharePartition.class); @@ -1232,37 +1115,32 @@ public void testMultipleConcurrentShareFetches() throws InterruptedException { assertEquals(4, sp1.nextFetchOffset()); assertEquals(10, sp2.nextFetchOffset()); assertEquals(20, sp3.nextFetchOffset()); - sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId, partitionMaxBytes.keySet()); - return null; + return buildLogReadResult(partitionMaxBytes.keySet()); }).doAnswer(invocation -> { assertEquals(15, sp0.nextFetchOffset()); assertEquals(1, sp1.nextFetchOffset()); assertEquals(25, sp2.nextFetchOffset()); assertEquals(15, sp3.nextFetchOffset()); - sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId, partitionMaxBytes.keySet()); - return null; + return buildLogReadResult(partitionMaxBytes.keySet()); }).doAnswer(invocation -> { assertEquals(6, sp0.nextFetchOffset()); assertEquals(18, sp1.nextFetchOffset()); assertEquals(26, sp2.nextFetchOffset()); assertEquals(23, sp3.nextFetchOffset()); - sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId, partitionMaxBytes.keySet()); - return null; + return buildLogReadResult(partitionMaxBytes.keySet()); }).doAnswer(invocation -> { assertEquals(30, sp0.nextFetchOffset()); assertEquals(5, sp1.nextFetchOffset()); assertEquals(26, sp2.nextFetchOffset()); assertEquals(16, sp3.nextFetchOffset()); - sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId, partitionMaxBytes.keySet()); - return null; + return buildLogReadResult(partitionMaxBytes.keySet()); }).doAnswer(invocation -> { assertEquals(25, sp0.nextFetchOffset()); assertEquals(5, sp1.nextFetchOffset()); assertEquals(26, sp2.nextFetchOffset()); assertEquals(16, sp3.nextFetchOffset()); - sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId, partitionMaxBytes.keySet()); - return null; - }).when(replicaManager).fetchMessages(any(), any(), any(ReplicaQuota.class), any()); + return buildLogReadResult(partitionMaxBytes.keySet()); + }).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); int threadCount = 100; ExecutorService executorService = Executors.newFixedThreadPool(threadCount); @@ -1280,19 +1158,19 @@ public void testMultipleConcurrentShareFetches() throws InterruptedException { if (!executorService.awaitTermination(50, TimeUnit.MILLISECONDS)) executorService.shutdown(); } - // We are checking the number of replicaManager fetchMessages() calls - Mockito.verify(replicaManager, atMost(100)).fetchMessages( - any(), any(), any(ReplicaQuota.class), any()); - Mockito.verify(replicaManager, atLeast(10)).fetchMessages( - any(), any(), any(ReplicaQuota.class), any()); + // We are checking the number of replicaManager readFromLog() calls + Mockito.verify(replicaManager, atMost(100)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); + Mockito.verify(replicaManager, atLeast(10)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); } @Test public void testReplicaManagerFetchShouldNotProceed() { String groupId = "grp"; Uuid memberId = Uuid.randomUuid(); - FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, - 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); + FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, + DELAYED_SHARE_FETCH_MAX_WAIT_MS, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); Uuid fooId = Uuid.randomUuid(); TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0)); Map partitionMaxBytes = new HashMap<>(); @@ -1307,13 +1185,20 @@ public void testReplicaManagerFetchShouldNotProceed() { Map partitionCacheMap = new HashMap<>(); partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0); + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).build(); + .withPartitionCacheMap(partitionCacheMap) + .withReplicaManager(replicaManager) + .withTimer(mockTimer) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory).build(); CompletableFuture> future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes); - Mockito.verify(replicaManager, times(0)).fetchMessages( - any(), any(), any(ReplicaQuota.class), any()); + Mockito.verify(replicaManager, times(0)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); Map result = future.join(); assertEquals(0, result.size()); } @@ -1322,7 +1207,7 @@ public void testReplicaManagerFetchShouldNotProceed() { public void testReplicaManagerFetchShouldProceed() { String groupId = "grp"; Uuid memberId = Uuid.randomUuid(); - FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, + FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); Uuid fooId = Uuid.randomUuid(); TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0)); @@ -1331,21 +1216,23 @@ public void testReplicaManagerFetchShouldProceed() { ReplicaManager replicaManager = mock(ReplicaManager.class); - SharePartition sp0 = mock(SharePartition.class); - when(sp0.maybeAcquireFetchLock()).thenReturn(true); - when(sp0.canAcquireRecords()).thenReturn(true); - when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0); + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).build(); + .withReplicaManager(replicaManager) + .withTimer(mockTimer) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .build(); + + doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes); // Since the nextFetchOffset does not point to endOffset + 1, i.e. some of the records in the cachedState are AVAILABLE, - // even though the maxInFlightMessages limit is exceeded, replicaManager.fetchMessages should be called - Mockito.verify(replicaManager, times(1)).fetchMessages( - any(), any(), any(ReplicaQuota.class), any()); + // even though the maxInFlightMessages limit is exceeded, replicaManager.readFromLog should be called + Mockito.verify(replicaManager, times(1)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); } @Test @@ -1750,159 +1637,430 @@ public void testAcknowledgeEmptyPartitionCacheMap() { } @Test - public void testProcessFetchResponseWithLsoMovementForTopicPartition() { + public void testFetchQueueProcessingWhenFrontItemIsEmpty() { String groupId = "grp"; - Uuid fooId = Uuid.randomUuid(); - TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0)); - TopicIdPartition tp1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1)); - + String memberId = Uuid.randomUuid().toString(); + FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); Map partitionMaxBytes = new HashMap<>(); partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + + final Time time = new MockTime(); + ReplicaManager replicaManager = mock(ReplicaManager.class); + + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData1 = new SharePartitionManager.ShareFetchPartitionData( + fetchParams, groupId, memberId, new CompletableFuture<>(), Collections.emptyMap()); + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData2 = new SharePartitionManager.ShareFetchPartitionData( + fetchParams, groupId, memberId, new CompletableFuture<>(), partitionMaxBytes); + + ConcurrentLinkedQueue fetchQueue = new ConcurrentLinkedQueue<>(); + // First request added to fetch queue is empty i.e. no topic partitions to fetch. + fetchQueue.add(shareFetchPartitionData1); + // Second request added to fetch queue has a topic partition to fetch. + fetchQueue.add(shareFetchPartitionData2); + + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withReplicaManager(replicaManager) + .withTime(time) + .withTimer(mockTimer) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .withFetchQueue(fetchQueue).build(); + + doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + sharePartitionManager.maybeProcessFetchQueue(); + + // Verifying that the second item in the fetchQueue is processed, even though the first item is empty. + verify(replicaManager, times(1)).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + } + + @Test + public void testAcknowledgeCompletesDelayedShareFetchRequest() { + String groupId = "grp"; + String memberId = Uuid.randomUuid().toString(); + + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0)); + + Map partitionMaxBytes = new HashMap<>(); partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp2, PARTITION_MAX_BYTES); + + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); - ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); - SharePartition sp0 = Mockito.mock(SharePartition.class); - SharePartition sp1 = Mockito.mock(SharePartition.class); + // mocked share partitions sp1 and sp2 can be acquired once there is an acknowledgement for it. + doAnswer(invocation -> { + when(sp1.canAcquireRecords()).thenReturn(true); + return CompletableFuture.completedFuture(Optional.empty()); + }).when(sp1).acknowledge(ArgumentMatchers.eq(memberId), any()); + doAnswer(invocation -> { + when(sp2.canAcquireRecords()).thenReturn(true); + return CompletableFuture.completedFuture(Optional.empty()); + }).when(sp2).acknowledge(ArgumentMatchers.eq(memberId), any()); - Map partitionCacheMap = new ConcurrentHashMap<>(); - partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0); + Map partitionCacheMap = new HashMap<>(); partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2); + + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), + groupId, + Uuid.randomUuid().toString(), + new CompletableFuture<>(), + partitionMaxBytes); + ReplicaManager replicaManager = mock(ReplicaManager.class); + + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + + // Initially you cannot acquire records for both sp1 and sp2. + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(false); + + Set delayedShareFetchWatchKeys = new HashSet<>(); + partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition))); + + DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() + .withShareFetchPartitionData(shareFetchPartitionData) + .withReplicaManager(replicaManager) + .withPartitionCacheMap(partitionCacheMap) + .build(); + + delayedShareFetchPurgatory.tryCompleteElseWatch( + delayedShareFetch, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq()); + + // Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys. + assertEquals(2, delayedShareFetchPurgatory.watched()); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).build(); + .withPartitionCacheMap(partitionCacheMap) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .withReplicaManager(replicaManager) + .withTimer(mockTimer) + .build(); - SharePartitionManager sharePartitionManagersSpy = Mockito.spy(sharePartitionManager); + doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + Map> acknowledgeTopics = new HashMap<>(); + acknowledgeTopics.put(tp1, Arrays.asList( + new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)), + new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1)) + )); + + assertEquals(2, delayedShareFetchPurgatory.watched()); + // Acknowledgement request for sp1. + sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics); + + // Since sp1 is acknowledged, the delayedShareFetchPurgatory should have 1 watched key corresponding to sp2. + assertEquals(1, delayedShareFetchPurgatory.watched()); + + Mockito.verify(sp1, times(1)).nextFetchOffset(); + Mockito.verify(sp2, times(0)).nextFetchOffset(); + } + + @Test + public void testAcknowledgeDoesNotCompleteDelayedShareFetchRequest() { + String groupId = "grp"; + String memberId = Uuid.randomUuid().toString(); + + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0)); + TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0)); - // Mocking the offsetForEarliestTimestamp method to return a valid LSO. - Mockito.doReturn(1L).when(sharePartitionManagersSpy).offsetForEarliestTimestamp(any(TopicIdPartition.class)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp2, PARTITION_MAX_BYTES); - when(sp0.nextFetchOffset()).thenReturn((long) 0, (long) 5); - when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 4); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + SharePartition sp3 = mock(SharePartition.class); - when(sp0.acquire(any(), any())).thenReturn( - CompletableFuture.completedFuture(Collections.emptyList()), - CompletableFuture.completedFuture(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords() - .setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)))); - when(sp1.acquire(any(), any())).thenReturn( - CompletableFuture.completedFuture(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords() - .setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1))), - CompletableFuture.completedFuture(Collections.emptyList())); + // mocked share partitions sp1, sp2 and sp3 can be acquired once there is an acknowledgement for it. + doAnswer(invocation -> { + when(sp1.canAcquireRecords()).thenReturn(true); + return CompletableFuture.completedFuture(Optional.empty()); + }).when(sp1).acknowledge(ArgumentMatchers.eq(memberId), any()); + doAnswer(invocation -> { + when(sp2.canAcquireRecords()).thenReturn(true); + return CompletableFuture.completedFuture(Optional.empty()); + }).when(sp2).acknowledge(ArgumentMatchers.eq(memberId), any()); + doAnswer(invocation -> { + when(sp3.canAcquireRecords()).thenReturn(true); + return CompletableFuture.completedFuture(Optional.empty()); + }).when(sp3).acknowledge(ArgumentMatchers.eq(memberId), any()); - doNothing().when(sp1).updateCacheAndOffsets(any(Long.class)); - doNothing().when(sp0).updateCacheAndOffsets(any(Long.class)); + Map partitionCacheMap = new HashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp3), sp3); - CompletableFuture> future = new CompletableFuture<>(); SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( - new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), - groupId, Uuid.randomUuid().toString(), future, partitionMaxBytes); - - MemoryRecords records1 = MemoryRecords.withRecords(Compression.NONE, - new SimpleRecord("0".getBytes(), "v".getBytes()), - new SimpleRecord("1".getBytes(), "v".getBytes()), - new SimpleRecord("2".getBytes(), "v".getBytes()), - new SimpleRecord(null, "value".getBytes())); - - List> responseData1 = new ArrayList<>(); - responseData1.add(new Tuple2<>(tp0, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE, 0L, 0L, - MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), - OptionalInt.empty(), false))); - responseData1.add(new Tuple2<>(tp1, new FetchPartitionData(Errors.NONE, 0L, 0L, - records1, Optional.empty(), OptionalLong.empty(), Optional.empty(), - OptionalInt.empty(), false))); - CompletableFuture> result1 = - sharePartitionManagersSpy.processFetchResponse(shareFetchPartitionData, responseData1); - - assertTrue(result1.isDone()); - Map resultData1 = result1.join(); - assertEquals(2, resultData1.size()); - assertTrue(resultData1.containsKey(tp0)); - assertTrue(resultData1.containsKey(tp1)); - assertEquals(0, resultData1.get(tp0).partitionIndex()); - assertEquals(1, resultData1.get(tp1).partitionIndex()); - assertEquals(Errors.NONE.code(), resultData1.get(tp0).errorCode()); - assertEquals(Errors.NONE.code(), resultData1.get(tp1).errorCode()); - - // Since we have OFFSET_OUT_OF_RANGE exception for tp1 and no exception for tp2 from SharePartition class, - // we should have 1 call for updateCacheAndOffsets for tp0 and 0 calls for tp1. - Mockito.verify(sp0, times(1)).updateCacheAndOffsets(any(Long.class)); - Mockito.verify(sp1, times(0)).updateCacheAndOffsets(any(Long.class)); - - MemoryRecords records2 = MemoryRecords.withRecords(100L, Compression.NONE, - new SimpleRecord("0".getBytes(), "v".getBytes()), - new SimpleRecord("1".getBytes(), "v".getBytes()), - new SimpleRecord("2".getBytes(), "v".getBytes()), - new SimpleRecord(null, "value".getBytes())); - - List> responseData2 = new ArrayList<>(); - responseData2.add(new Tuple2<>(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L, - records2, Optional.empty(), OptionalLong.empty(), Optional.empty(), - OptionalInt.empty(), false))); - responseData2.add(new Tuple2<>(tp1, new FetchPartitionData(Errors.NONE, 0L, 0L, - MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), - OptionalInt.empty(), false))); - CompletableFuture> result2 = - sharePartitionManagersSpy.processFetchResponse(shareFetchPartitionData, responseData2); - - assertTrue(result2.isDone()); - Map resultData2 = result2.join(); - assertEquals(2, resultData2.size()); - assertTrue(resultData2.containsKey(tp0)); - assertTrue(resultData2.containsKey(tp1)); - assertEquals(0, resultData2.get(tp0).partitionIndex()); - assertEquals(1, resultData2.get(tp1).partitionIndex()); - assertEquals(Errors.NONE.code(), resultData2.get(tp0).errorCode()); - assertEquals(Errors.NONE.code(), resultData2.get(tp1).errorCode()); - - // Since we don't see any exception for tp1 and tp2 from SharePartition class, - // the updateCacheAndOffsets calls should remain the same as the previous case. - Mockito.verify(sp0, times(1)).updateCacheAndOffsets(any(Long.class)); - Mockito.verify(sp1, times(0)).updateCacheAndOffsets(any(Long.class)); + groupId, + Uuid.randomUuid().toString(), + new CompletableFuture<>(), + partitionMaxBytes); + ReplicaManager replicaManager = mock(ReplicaManager.class); + + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + + // Initially you cannot acquire records for both all 3 share partitions. + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(false); + when(sp3.maybeAcquireFetchLock()).thenReturn(true); + when(sp3.canAcquireRecords()).thenReturn(false); + + Set delayedShareFetchWatchKeys = new HashSet<>(); + partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition))); + + DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() + .withShareFetchPartitionData(shareFetchPartitionData) + .withReplicaManager(replicaManager) + .withPartitionCacheMap(partitionCacheMap) + .build(); + + delayedShareFetchPurgatory.tryCompleteElseWatch( + delayedShareFetch, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq()); + + // Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys. + assertEquals(2, delayedShareFetchPurgatory.watched()); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .withReplicaManager(replicaManager) + .withTimer(mockTimer) + .build(); + + Map> acknowledgeTopics = new HashMap<>(); + acknowledgeTopics.put(tp3, Arrays.asList( + new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)), + new ShareAcknowledgementBatch(24, 56, Collections.singletonList((byte) 1)) + )); + + // Acknowledgement request for sp3. + sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics); + + // Since neither sp1 and sp2 have been acknowledged, the delayedShareFetchPurgatory should have 2 watched keys. + assertEquals(2, delayedShareFetchPurgatory.watched()); + + Mockito.verify(sp1, times(0)).nextFetchOffset(); + Mockito.verify(sp2, times(0)).nextFetchOffset(); } @Test - public void testFetchQueueProcessingWhenFrontItemIsEmpty() { + public void testReleaseSessionCompletesDelayedShareFetchRequest() { String groupId = "grp"; String memberId = Uuid.randomUuid().toString(); - FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, - 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); - TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0)); + TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0)); + Map partitionMaxBytes = new HashMap<>(); - partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp2, PARTITION_MAX_BYTES); - final Time time = new MockTime(); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + ShareSessionCache cache = mock(ShareSessionCache.class); + ShareSession shareSession = mock(ShareSession.class); + when(cache.remove(new ShareSessionKey(groupId, Uuid.fromString(memberId)))).thenReturn(shareSession); + + // mocked share partitions sp1 and sp2 can be acquired once there is a release acquired records on session close request for it. + doAnswer(invocation -> { + when(sp1.canAcquireRecords()).thenReturn(true); + return CompletableFuture.completedFuture(Optional.empty()); + }).when(sp1).releaseAcquiredRecords(ArgumentMatchers.eq(memberId)); + doAnswer(invocation -> { + when(sp2.canAcquireRecords()).thenReturn(true); + return CompletableFuture.completedFuture(Optional.empty()); + }).when(sp2).releaseAcquiredRecords(ArgumentMatchers.eq(memberId)); + + Map partitionCacheMap = new HashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2); + + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), + groupId, + Uuid.randomUuid().toString(), + new CompletableFuture<>(), + partitionMaxBytes); ReplicaManager replicaManager = mock(ReplicaManager.class); - SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData1 = new SharePartitionManager.ShareFetchPartitionData( - fetchParams, groupId, memberId, new CompletableFuture<>(), partitionMaxBytes); - SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData2 = new SharePartitionManager.ShareFetchPartitionData( - fetchParams, groupId, memberId, new CompletableFuture<>(), partitionMaxBytes); + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); - Map partitionCacheMap = new ConcurrentHashMap<>(); - partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp0), - key -> new SharePartition(groupId, tp0, MAX_IN_FLIGHT_MESSAGES, MAX_DELIVERY_COUNT, - RECORD_LOCK_DURATION_MS, mockTimer, time, NoOpShareStatePersister.getInstance())); + // Initially you cannot acquire records for both sp1 and sp2. + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(false); - ConcurrentLinkedQueue fetchQueue = new ConcurrentLinkedQueue<>(); - // First request added to fetch queue is empty i.e. no topic partitions to fetch. - fetchQueue.add(shareFetchPartitionData1); - // Second request added to fetch queue has a topic partition to fetch. - fetchQueue.add(shareFetchPartitionData2); + Set delayedShareFetchWatchKeys = new HashSet<>(); + partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition))); - SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).withTime(time) - .withFetchQueue(fetchQueue).build(); - sharePartitionManager.maybeProcessFetchQueue(); + DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() + .withShareFetchPartitionData(shareFetchPartitionData) + .withReplicaManager(replicaManager) + .withPartitionCacheMap(partitionCacheMap) + .build(); - // Verifying that the second item in the fetchQueue is processed, even though the first item is empty. - verify(replicaManager, times(1)).fetchMessages(any(), any(), any(ReplicaQuota.class), any()); + delayedShareFetchPurgatory.tryCompleteElseWatch( + delayedShareFetch, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq()); + + // Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys. + assertEquals(2, delayedShareFetchPurgatory.watched()); + + SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap) + .withCache(cache) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .withReplicaManager(replicaManager) + .withTimer(mockTimer) + .build()); + + doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + assertEquals(2, delayedShareFetchPurgatory.watched()); + + // The share session for this share group member returns tp1 and tp3, tp1 is common in both the delayed fetch request and the share session. + when(sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, Uuid.fromString(memberId))).thenReturn(Arrays.asList(tp1, tp3)); + + // Release acquired records on session close request for tp1 and tp3. + sharePartitionManager.releaseSession(groupId, memberId); + + // Since sp1's request to release acquired records on session close is completed, the delayedShareFetchPurgatory + // should have 1 watched key corresponding to sp2. + assertEquals(1, delayedShareFetchPurgatory.watched()); + + Mockito.verify(sp1, times(1)).nextFetchOffset(); + Mockito.verify(sp2, times(0)).nextFetchOffset(); + } + + @Test + public void testReleaseSessionDoesNotCompleteDelayedShareFetchRequest() { + String groupId = "grp"; + String memberId = Uuid.randomUuid().toString(); + + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0)); + TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0)); + + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp2, PARTITION_MAX_BYTES); + + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + SharePartition sp3 = mock(SharePartition.class); + + ShareSessionCache cache = mock(ShareSessionCache.class); + ShareSession shareSession = mock(ShareSession.class); + when(cache.remove(new ShareSessionKey(groupId, Uuid.fromString(memberId)))).thenReturn(shareSession); + + // mocked share partitions sp1, sp2 and sp3 can be acquired once there is a release acquired records on session close for it. + doAnswer(invocation -> { + when(sp1.canAcquireRecords()).thenReturn(true); + return CompletableFuture.completedFuture(Optional.empty()); + }).when(sp1).releaseAcquiredRecords(ArgumentMatchers.eq(memberId)); + doAnswer(invocation -> { + when(sp2.canAcquireRecords()).thenReturn(true); + return CompletableFuture.completedFuture(Optional.empty()); + }).when(sp2).releaseAcquiredRecords(ArgumentMatchers.eq(memberId)); + doAnswer(invocation -> { + when(sp3.canAcquireRecords()).thenReturn(true); + return CompletableFuture.completedFuture(Optional.empty()); + }).when(sp3).releaseAcquiredRecords(ArgumentMatchers.eq(memberId)); + + Map partitionCacheMap = new HashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp3), sp3); + + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), + groupId, + Uuid.randomUuid().toString(), + new CompletableFuture<>(), + partitionMaxBytes); + ReplicaManager replicaManager = mock(ReplicaManager.class); + + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + + // Initially you cannot acquire records for both all 3 share partitions. + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(false); + when(sp3.maybeAcquireFetchLock()).thenReturn(true); + when(sp3.canAcquireRecords()).thenReturn(false); + + Set delayedShareFetchWatchKeys = new HashSet<>(); + partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition))); + + DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() + .withShareFetchPartitionData(shareFetchPartitionData) + .withReplicaManager(replicaManager) + .withPartitionCacheMap(partitionCacheMap) + .build(); + + delayedShareFetchPurgatory.tryCompleteElseWatch( + delayedShareFetch, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq()); + + // Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys. + assertEquals(2, delayedShareFetchPurgatory.watched()); + + SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap) + .withCache(cache) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .withReplicaManager(replicaManager) + .withTimer(mockTimer) + .build()); + + // The share session for this share group member returns tp1 and tp3. No topic partition is common in + // both the delayed fetch request and the share session. + when(sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, Uuid.fromString(memberId))).thenReturn(Collections.singletonList(tp3)); + + // Release acquired records on session close for sp3. + sharePartitionManager.releaseSession(groupId, memberId); + + // Since neither sp1 and sp2 are a part of the release acquired records request on session close, the + // delayedShareFetchPurgatory should have 2 watched keys. + assertEquals(2, delayedShareFetchPurgatory.watched()); + + Mockito.verify(sp1, times(0)).nextFetchOffset(); + Mockito.verify(sp2, times(0)).nextFetchOffset(); } @Test public void testPendingInitializationShouldCompleteFetchRequest() throws Exception { String groupId = "grp"; Uuid memberId = Uuid.randomUuid(); - FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, + FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); Uuid fooId = Uuid.randomUuid(); TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0)); @@ -1919,17 +2077,25 @@ public void testPendingInitializationShouldCompleteFetchRequest() throws Excepti // Mock replica manager to verify no calls are made to fetchMessages. ReplicaManager replicaManager = mock(ReplicaManager.class); + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).build(); + .withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).withTimer(mockTimer) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory).build(); CompletableFuture> future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes); // Verify that the fetch request is completed. - assertTrue(future.isDone()); + TestUtils.waitForCondition( + future::isDone, + DELAYED_SHARE_FETCH_TIMEOUT_MS, + () -> "Processing in delayed share fetch queue never ended."); assertTrue(future.join().isEmpty()); // Verify that replica manager fetch is not called. - Mockito.verify(replicaManager, times(0)).fetchMessages( - any(), any(), any(ReplicaQuota.class), any()); + Mockito.verify(replicaManager, times(0)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); // Complete the pending initialization future. pendingInitializationFuture.complete(null); } @@ -1938,7 +2104,7 @@ public void testPendingInitializationShouldCompleteFetchRequest() throws Excepti public void testSharePartitionInitializationExceptions() throws Exception { String groupId = "grp"; Uuid memberId = Uuid.randomUuid(); - FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, + FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); Uuid fooId = Uuid.randomUuid(); TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0)); @@ -1948,14 +2114,23 @@ public void testSharePartitionInitializationExceptions() throws Exception { Map partitionCacheMap = new HashMap<>(); partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0); + ReplicaManager replicaManager = mock(ReplicaManager.class); + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap).build(); + .withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).withTimer(mockTimer) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory).build(); // Return LeaderNotAvailableException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new LeaderNotAvailableException("Leader not available"))); CompletableFuture> future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes); - assertTrue(future.isDone()); + TestUtils.waitForCondition( + future::isDone, + DELAYED_SHARE_FETCH_TIMEOUT_MS, + () -> "Processing in delayed share fetch queue never ended."); // Exception for client should not occur for LeaderNotAvailableException, this exception is to communicate // between SharePartitionManager and SharePartition to retry the request as SharePartition is not yet ready. assertFalse(future.isCompletedExceptionally()); @@ -1964,21 +2139,30 @@ public void testSharePartitionInitializationExceptions() throws Exception { // Return IllegalStateException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new IllegalStateException("Illegal state"))); future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes); - assertTrue(future.isDone()); + TestUtils.waitForCondition( + future::isDone, + DELAYED_SHARE_FETCH_TIMEOUT_MS, + () -> "Processing in delayed share fetch queue never ended."); assertTrue(future.isCompletedExceptionally()); assertFutureThrows(future, IllegalStateException.class); // Return CoordinatorNotAvailableException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new CoordinatorNotAvailableException("Coordinator not available"))); future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes); - assertTrue(future.isDone()); + TestUtils.waitForCondition( + future::isDone, + DELAYED_SHARE_FETCH_TIMEOUT_MS, + () -> "Processing in delayed share fetch queue never ended."); assertTrue(future.isCompletedExceptionally()); assertFutureThrows(future, CoordinatorNotAvailableException.class); // Return InvalidRequestException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new InvalidRequestException("Invalid request"))); future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes); - assertTrue(future.isDone()); + TestUtils.waitForCondition( + future::isDone, + DELAYED_SHARE_FETCH_TIMEOUT_MS, + () -> "Processing in delayed share fetch queue never ended."); assertTrue(future.isCompletedExceptionally()); assertFutureThrows(future, InvalidRequestException.class); @@ -1987,7 +2171,10 @@ public void testSharePartitionInitializationExceptions() throws Exception { // Assert that partitionCacheMap contains instance before the fetch request. assertEquals(1, partitionCacheMap.size()); future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes); - assertTrue(future.isDone()); + TestUtils.waitForCondition( + future::isDone, + DELAYED_SHARE_FETCH_TIMEOUT_MS, + () -> "Processing in delayed share fetch queue never ended."); assertTrue(future.isCompletedExceptionally()); assertFutureThrows(future, FencedStateEpochException.class); // Verify that the share partition is removed from the cache. @@ -1998,7 +2185,10 @@ public void testSharePartitionInitializationExceptions() throws Exception { // Return NotLeaderOrFollowerException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new NotLeaderOrFollowerException("Not leader or follower"))); future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes); - assertTrue(future.isDone()); + TestUtils.waitForCondition( + future::isDone, + DELAYED_SHARE_FETCH_TIMEOUT_MS, + () -> "Processing in delayed share fetch queue never ended."); assertTrue(future.isCompletedExceptionally()); assertFutureThrows(future, NotLeaderOrFollowerException.class); // Verify that the share partition is removed from the cache. @@ -2009,7 +2199,10 @@ public void testSharePartitionInitializationExceptions() throws Exception { // Return RuntimeException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new RuntimeException("Runtime exception"))); future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, partitionMaxBytes); - assertTrue(future.isDone()); + TestUtils.waitForCondition( + future::isDone, + DELAYED_SHARE_FETCH_TIMEOUT_MS, + () -> "Processing in delayed share fetch queue never ended."); assertTrue(future.isCompletedExceptionally()); assertFutureThrows(future, RuntimeException.class); } @@ -2075,6 +2268,23 @@ private void assertErroneousAndValidTopicIdPartitions( assertEquals(expectedValidSet, actualValidPartitions); } + private Seq> buildLogReadResult(Set topicIdPartitions) { + List> logReadResults = new ArrayList<>(); + topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult( + new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), + Option.empty(), + -1L, + -1L, + -1L, + -1L, + -1L, + Option.empty(), + Option.empty(), + Option.empty() + )))); + return CollectionConverters.asScala(logReadResults).toSeq(); + } + private static class SharePartitionManagerBuilder { private ReplicaManager replicaManager = mock(ReplicaManager.class); private Time time = new MockTime(); @@ -2084,6 +2294,7 @@ private static class SharePartitionManagerBuilder { private Timer timer = new MockTimer(); private Metrics metrics = new Metrics(); private ConcurrentLinkedQueue fetchQueue = new ConcurrentLinkedQueue<>(); + private DelayedOperationPurgatory delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class); private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) { this.replicaManager = replicaManager; @@ -2125,12 +2336,28 @@ private SharePartitionManagerBuilder withFetchQueue(ConcurrentLinkedQueue delayedShareFetchPurgatory) { + this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; + return this; + } + public static SharePartitionManagerBuilder builder() { return new SharePartitionManagerBuilder(); } public SharePartitionManager build() { - return new SharePartitionManager(replicaManager, time, cache, partitionCacheMap, fetchQueue, RECORD_LOCK_DURATION_MS, timer, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, persister, metrics); + return new SharePartitionManager(replicaManager, + time, + cache, + partitionCacheMap, + fetchQueue, + RECORD_LOCK_DURATION_MS, + timer, + MAX_DELIVERY_COUNT, + MAX_IN_FLIGHT_MESSAGES, + persister, + metrics, + delayedShareFetchPurgatory); } } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 51405f4a1fc9..38667fefbdc5 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -16,6 +16,7 @@ */ package kafka.server.share; +import kafka.server.DelayedOperationPurgatory; import kafka.server.share.SharePartition.InFlightState; import kafka.server.share.SharePartition.RecordState; import kafka.server.share.SharePartition.SharePartitionState; @@ -727,6 +728,7 @@ public void testCanAcquireRecordsWithCachedDataAndLimitReached() { @Test public void testMaybeAcquireAndReleaseFetchLock() { SharePartition sharePartition = SharePartitionBuilder.builder().build(); + sharePartition.maybeInitialize(); assertTrue(sharePartition.maybeAcquireFetchLock()); // Lock cannot be acquired again, as already acquired. assertFalse(sharePartition.maybeAcquireFetchLock()); @@ -4924,6 +4926,7 @@ private static class SharePartitionBuilder { private int maxDeliveryCount = MAX_DELIVERY_COUNT; private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES; private Persister persister = NoOpShareStatePersister.getInstance(); + private final DelayedOperationPurgatory delayedShareFetchPurgatory = Mockito.mock(DelayedOperationPurgatory.class); private SharePartitionBuilder withMaxInflightMessages(int maxInflightMessages) { this.maxInflightMessages = maxInflightMessages; @@ -4951,7 +4954,7 @@ public static SharePartitionBuilder builder() { public SharePartition build() { return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, maxInflightMessages, maxDeliveryCount, - acquisitionLockTimeoutMs, mockTimer, MOCK_TIME, persister); + acquisitionLockTimeoutMs, mockTimer, MOCK_TIME, persister, delayedShareFetchPurgatory); } } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index ac05bfd2d13c..a38864a851f4 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1142,6 +1142,7 @@ class KafkaConfigTest { case ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") case _ => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1") diff --git a/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java b/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java index 5593ac9b95bf..ef236352746f 100644 --- a/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java @@ -63,6 +63,10 @@ public class ShareGroupConfig { public static final int SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT = 60000; public static final String SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC = "The record acquisition lock maximum duration in milliseconds for share groups."; + public static final String SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG = "share.fetch.purgatory.purge.interval.requests"; + public static final int SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT = 1000; + public static final String SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC = "The purge interval (in number of requests) of the share fetch request purgatory"; + public static final ConfigDef CONFIG_DEF = new ConfigDef() .defineInternal(SHARE_GROUP_ENABLE_CONFIG, BOOLEAN, SHARE_GROUP_ENABLE_DEFAULT, null, MEDIUM, SHARE_GROUP_ENABLE_DOC) .define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC) @@ -70,7 +74,8 @@ public class ShareGroupConfig { .define(SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 30000), MEDIUM, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC) .define(SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT, between(30000, 3600000), MEDIUM, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC) .define(SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT, SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM, SHARE_GROUP_MAX_GROUPS_DOC) - .define(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC); + .define(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC) + .define(SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC);; private final boolean isShareGroupEnabled; private final int shareGroupPartitionMaxRecordLocks; @@ -79,6 +84,7 @@ public class ShareGroupConfig { private final int shareGroupRecordLockDurationMs; private final int shareGroupMaxRecordLockDurationMs; private final int shareGroupMinRecordLockDurationMs; + private final int shareFetchPurgatoryPurgeIntervalRequests; public ShareGroupConfig(AbstractConfig config) { // Share groups are enabled in two cases: 1) The internal configuration to enable it is @@ -93,6 +99,7 @@ public ShareGroupConfig(AbstractConfig config) { shareGroupRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG); shareGroupMaxRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG); shareGroupMinRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG); + shareFetchPurgatoryPurgeIntervalRequests = config.getInt(ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG); validate(); } @@ -125,6 +132,10 @@ public int shareGroupMinRecordLockDurationMs() { return shareGroupMinRecordLockDurationMs; } + public int shareFetchPurgatoryPurgeIntervalRequests() { + return shareFetchPurgatoryPurgeIntervalRequests; + } + private void validate() { Utils.require(shareGroupRecordLockDurationMs >= shareGroupMinRecordLockDurationMs, String.format("%s must be greater than or equals to %s",