Skip to content

Commit

Permalink
KAFKA-17400: Added share fetch purgatory for delaying share fetch req…
Browse files Browse the repository at this point in the history
…uests (#16969)

Introduced a share fetch purgatory on the broker which delays share fetch requests that cannot be completed instantaneously. Introduced 2 new classes -

DelayedShareFetch - Contains logic to instantaneously complete or force complete a share fetch request on timeout.
DelayedShareFetchKey - Contains the key which can be used to watch the entries within the share fetch purgatory.
ShareFetchUtils - This utility class contains functionalities required for post-processing once the replica manager fetch is completed.
There are many scenarios which can cause a share fetch request to be delayed and multiple scenarios when a delayed share fetch can be attempted to be completed. In this PR, we are only targeting the case when record lock partition limit is reached, the ShareFetch should wait for up to MaxWaitMs for records to be released.

Reviewers: David Arthur <[email protected]>, Andrew Schofield <[email protected]>, Apoorv Mittal <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
adixitconfluent authored Sep 11, 2024
1 parent 3f4c25f commit f466e86
Show file tree
Hide file tree
Showing 14 changed files with 1,702 additions and 506 deletions.
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
files="MessageGenerator.java"/>

<!-- core -->
<suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder|SharePartition).java"/>
<suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder|SharePartition|SharePartitionManager).java"/>
<suppress checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling|JavaNCSS" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
<suppress checks="MethodLength" files="RemoteLogManager.java"/>
<suppress checks="MethodLength" files="RemoteLogManagerConfig.java"/>
Expand All @@ -45,7 +45,7 @@
files="(KafkaClusterTestKit).java"/>
<suppress checks="JavaNCSS"
files="(RemoteLogManagerTest|SharePartitionTest).java"/>
<suppress checks="ClassDataAbstractionCoupling" files="SharePartitionManagerTest"/>
<suppress checks="ClassDataAbstractionCoupling|ClassFanOutComplexity" files="SharePartitionManagerTest"/>

<!-- server tests -->
<suppress checks="MethodLength|JavaNCSS|NPath" files="DescribeTopicPartitionsRequestHandlerTest.java"/>
Expand Down
200 changes: 200 additions & 0 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.share;

import kafka.server.DelayedOperation;
import kafka.server.LogReadResult;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.storage.internals.log.FetchPartitionData;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import scala.Option;
import scala.Tuple2;
import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;
import scala.runtime.BoxedUnit;

/**
* A delayed share fetch operation has been introduced in case there is a share fetch request which cannot be completed instantaneously.
*/
public class DelayedShareFetch extends DelayedOperation {
private final SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData;
private final ReplicaManager replicaManager;
private final Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap;
private Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionDataFromTryComplete = new LinkedHashMap<>();

private static final Logger log = LoggerFactory.getLogger(DelayedShareFetch.class);

DelayedShareFetch(
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData,
ReplicaManager replicaManager,
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap) {
super(shareFetchPartitionData.fetchParams().maxWaitMs, Option.empty());
this.shareFetchPartitionData = shareFetchPartitionData;
this.replicaManager = replicaManager;
this.partitionCacheMap = partitionCacheMap;
}

@Override
public void onExpiration() {
}

/**
* Complete the share fetch operation by fetching records for all partitions in the share fetch request irrespective
* of whether they have any acquired records. This is called when the fetch operation is forced to complete either
* because records can be acquired for some partitions or due to MaxWaitMs timeout.
*/
@Override
public void onComplete() {
log.trace("Completing the delayed share fetch request for group {}, member {}, " +
"topic partitions {}", shareFetchPartitionData.groupId(),
shareFetchPartitionData.memberId(), shareFetchPartitionData.partitionMaxBytes().keySet());

if (shareFetchPartitionData.future().isDone())
return;

Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
// tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch.
if (topicPartitionDataFromTryComplete.isEmpty())
topicPartitionData = acquirablePartitions();
// tryComplete invoked forceComplete, so we can use the data from tryComplete.
else
topicPartitionData = topicPartitionDataFromTryComplete;
try {
if (topicPartitionData.isEmpty()) {
// No locks for share partitions could be acquired, so we complete the request with an empty response.
shareFetchPartitionData.future().complete(Collections.emptyMap());
return;
}
log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}",
topicPartitionData, shareFetchPartitionData.groupId(), shareFetchPartitionData.fetchParams());

Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = replicaManager.readFromLog(
shareFetchPartitionData.fetchParams(),
CollectionConverters.asScala(
topicPartitionData.entrySet().stream().map(entry ->
new Tuple2<>(entry.getKey(), entry.getValue())).collect(Collectors.toList())
),
QuotaFactory.UnboundedQuota$.MODULE$,
true);

Map<TopicIdPartition, FetchPartitionData> responseData = new HashMap<>();
responseLogResult.foreach(tpLogResult -> {
TopicIdPartition topicIdPartition = tpLogResult._1();
LogReadResult logResult = tpLogResult._2();
FetchPartitionData fetchPartitionData = logResult.toFetchPartitionData(false);
responseData.put(topicIdPartition, fetchPartitionData);
return BoxedUnit.UNIT;
});

log.trace("Data successfully retrieved by replica manager: {}", responseData);
ShareFetchUtils.processFetchResponse(shareFetchPartitionData, responseData, partitionCacheMap, replicaManager)
.whenComplete((result, throwable) -> {
if (throwable != null) {
log.error("Error processing fetch response for share partitions", throwable);
shareFetchPartitionData.future().completeExceptionally(throwable);
} else {
shareFetchPartitionData.future().complete(result);
}
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(shareFetchPartitionData.groupId(), topicPartitionData.keySet());
});

} catch (Exception e) {
// Release the locks acquired for the partitions in the share fetch request in case there is an exception
log.error("Error processing delayed share fetch request", e);
shareFetchPartitionData.future().completeExceptionally(e);
releasePartitionLocks(shareFetchPartitionData.groupId(), topicPartitionData.keySet());
}
}

/**
* Try to complete the fetch operation if we can acquire records for any partition in the share fetch request.
*/
@Override
public boolean tryComplete() {
log.trace("Try to complete the delayed share fetch request for group {}, member {}, topic partitions {}",
shareFetchPartitionData.groupId(), shareFetchPartitionData.memberId(),
shareFetchPartitionData.partitionMaxBytes().keySet());

topicPartitionDataFromTryComplete = acquirablePartitions();

if (!topicPartitionDataFromTryComplete.isEmpty())
return forceComplete();
log.info("Can't acquire records for any partition in the share fetch request for group {}, member {}, " +
"topic partitions {}", shareFetchPartitionData.groupId(),
shareFetchPartitionData.memberId(), shareFetchPartitionData.partitionMaxBytes().keySet());
return false;
}

/**
* Prepare fetch request structure for partitions in the share fetch request for which we can acquire records.
*/
// Visible for testing
Map<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
// Initialize the topic partitions for which the fetch should be attempted.
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();

shareFetchPartitionData.partitionMaxBytes().keySet().forEach(topicIdPartition -> {
SharePartition sharePartition = partitionCacheMap.get(new SharePartitionManager.SharePartitionKey(
shareFetchPartitionData.groupId(), topicIdPartition));

int partitionMaxBytes = shareFetchPartitionData.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
// Add the share partition to the list of partitions to be fetched only if we can
// acquire the fetch lock on it.
if (sharePartition.maybeAcquireFetchLock()) {
// If the share partition is already at capacity, we should not attempt to fetch.
if (sharePartition.canAcquireRecords()) {
topicPartitionData.put(
topicIdPartition,
new FetchRequest.PartitionData(
topicIdPartition.topicId(),
sharePartition.nextFetchOffset(),
0,
partitionMaxBytes,
Optional.empty()
)
);
} else {
sharePartition.releaseFetchLock();
log.trace("Record lock partition limit exceeded for SharePartition {}, " +
"cannot acquire more records", sharePartition);
}
}
});
return topicPartitionData;
}

private void releasePartitionLocks(String groupId, Set<TopicIdPartition> topicIdPartitions) {
topicIdPartitions.forEach(tp -> partitionCacheMap.get(new
SharePartitionManager.SharePartitionKey(groupId, tp)).releaseFetchLock());
}
}
61 changes: 61 additions & 0 deletions core/src/main/java/kafka/server/share/DelayedShareFetchKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.share;

import kafka.server.DelayedOperationKey;

import org.apache.kafka.common.TopicIdPartition;

import java.util.Objects;

/**
* A key for delayed operations that fetch data for share consumers.
*/
public class DelayedShareFetchKey implements DelayedOperationKey {
private final String groupId;
private final TopicIdPartition topicIdPartition;

DelayedShareFetchKey(String groupId, TopicIdPartition topicIdPartition) {
this.groupId = groupId;
this.topicIdPartition = topicIdPartition;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DelayedShareFetchKey that = (DelayedShareFetchKey) o;
return topicIdPartition.equals(that.topicIdPartition) && groupId.equals(that.groupId);
}

@Override
public int hashCode() {
return Objects.hash(topicIdPartition, groupId);
}

@Override
public String toString() {
return "DelayedShareFetchKey(groupId=" + groupId +
", topicIdPartition=" + topicIdPartition +
")";
}

@Override
public String keyLabel() {
return String.format("groupId=%s, topicIdPartition=%s", groupId, topicIdPartition);
}
}
114 changes: 114 additions & 0 deletions core/src/main/java/kafka/server/share/ShareFetchUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.share;

import kafka.server.ReplicaManager;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.storage.internals.log.FetchPartitionData;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import scala.Option;

/**
* Utility class for post-processing of share fetch operations.
*/
public class ShareFetchUtils {
private static final Logger log = LoggerFactory.getLogger(ShareFetchUtils.class);

// Process the replica manager fetch response to update share partitions and futures. We acquire the fetched data
// from share partitions.
static CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> processFetchResponse(
SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData,
Map<TopicIdPartition, FetchPartitionData> responseData,
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap,
ReplicaManager replicaManager
) {
Map<TopicIdPartition, CompletableFuture<ShareFetchResponseData.PartitionData>> futures = new HashMap<>();
responseData.forEach((topicIdPartition, fetchPartitionData) -> {

SharePartition sharePartition = partitionCacheMap.get(new SharePartitionManager.SharePartitionKey(
shareFetchPartitionData.groupId(), topicIdPartition));
futures.put(topicIdPartition, sharePartition.acquire(shareFetchPartitionData.memberId(), fetchPartitionData)
.handle((acquiredRecords, throwable) -> {
log.trace("Acquired records for topicIdPartition: {} with share fetch data: {}, records: {}",
topicIdPartition, shareFetchPartitionData, acquiredRecords);
ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(topicIdPartition.partition());

if (throwable != null) {
partitionData.setErrorCode(Errors.forException(throwable).code());
return partitionData;
}

if (fetchPartitionData.error.code() == Errors.OFFSET_OUT_OF_RANGE.code()) {
// In case we get OFFSET_OUT_OF_RANGE error, that's because the Log Start Offset is later than the fetch offset.
// So, we would update the start and end offset of the share partition and still return an empty
// response and let the client retry the fetch. This way we do not lose out on the data that
// would be returned for other share partitions in the fetch request.
sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition, replicaManager));
partitionData.setPartitionIndex(topicIdPartition.partition())
.setRecords(null)
.setErrorCode(Errors.NONE.code())
.setAcquiredRecords(Collections.emptyList())
.setAcknowledgeErrorCode(Errors.NONE.code());
return partitionData;
}

// Maybe, in the future, check if no records are acquired, and we want to retry
// replica manager fetch. Depends on the share partition manager implementation,
// if we want parallel requests for the same share partition or not.
partitionData.setPartitionIndex(topicIdPartition.partition())
.setRecords(fetchPartitionData.records)
.setErrorCode(fetchPartitionData.error.code())
.setAcquiredRecords(acquiredRecords)
.setAcknowledgeErrorCode(Errors.NONE.code());
return partitionData;
}));
});
return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])).thenApply(v -> {
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processedResult = new HashMap<>();
futures.forEach((topicIdPartition, future) -> processedResult.put(topicIdPartition, future.join()));
return processedResult;
});
}

/**
* The method is used to get the offset for the earliest timestamp for the topic-partition.
*
* @return The offset for the earliest timestamp.
*/
static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager) {
// Isolation level is only required when reading from the latest offset hence use Option.empty() for now.
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(),
Optional.empty(), true);
return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset;
}
}
Loading

0 comments on commit f466e86

Please sign in to comment.