Skip to content

Commit

Permalink
KAFKA-14965 apache#2 Improvements for ListOffsetEvent based on Offset…
Browse files Browse the repository at this point in the history
…AndTimestamp and ListOffsetResult & Tests
  • Loading branch information
lianetm committed Jun 12, 2023
1 parent 4bd7bb7 commit e5aed9d
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.StaleMetadataException;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
import org.apache.kafka.common.ClusterResource;
Expand All @@ -44,15 +45,14 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;

/**
* Build requests for retrieving partition offsets from partition leaders (see
* {@link #fetchOffsets(Set, long, boolean)}). Requests are kept in-memory
* ready to be sent on the next call to {@link #poll(long)}.
* Manager responsible for building requests to retrieve partition offsets (see
* {@link #fetchOffsets(Map, boolean)}). Requests are kept in-memory ready to be sent on the next
* call to {@link #poll(long)}.
* <p>
* Partition leadership information required to build the requests is retrieved from the
* {@link ConsumerMetadata}, so this implements {@link ClusterResourceListener} to get notified
Expand Down Expand Up @@ -109,38 +109,42 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
return pollResult;
}


/**
* Retrieve offsets for the given partitions and timestamp.
*
* @param partitions Partitions to get offsets for
* @param timestamp Target time to look offsets for
* @param requireTimestamps True if this should fail with an UnsupportedVersionException if
* the broker does not support fetching precise timestamps for offsets
* @return Future containing the map of offsets retrieved for each partition. The future will
* complete when the responses for the requests are received and processed following a call
* to {@link #poll(long)}
* @param timestampsToSearch Partitions and target timestamps to get offsets for
* @return Future containing the map of {@link TopicPartition} and {@link OffsetAndTimestamp}
* found (offset of the first message whose timestamp is greater than or equals to the target
* timestamp).The future will complete when the requests responses are received and
* processed, following a call to {@link #poll(long)}
*/
public CompletableFuture<Map<TopicPartition, Long>> fetchOffsets(final Set<TopicPartition> partitions,
final long timestamp,
final boolean requireTimestamps) {
metadata.addTransientTopics(offsetFetcherUtils.topicsForPartitions(partitions));
public CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsets(
final Map<TopicPartition, Long> timestampsToSearch,
final boolean requireTimestamps) {
if (timestampsToSearch.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptyMap());
}
metadata.addTransientTopics(offsetFetcherUtils.topicsForPartitions(timestampsToSearch.keySet()));

Map<TopicPartition, Long> timestampsToSearch = partitions.stream()
.collect(Collectors.toMap(Function.identity(), tp -> timestamp));
ListOffsetsRequestState listOffsetsRequestState = new ListOffsetsRequestState(
timestampsToSearch,
requireTimestamps,
offsetFetcherUtils,
isolationLevel);
listOffsetsRequestState.globalResult.whenComplete((result, error) -> {
metadata.clearTransientTopics();
log.debug("Fetch offsets completed for partitions {} and timestamp {}. Result {}, " +
"error", partitions, timestamp, result, error);
log.debug("Fetch offsets completed for partitions and timestamps {}. Result {}, " +
"error", timestampsToSearch, result, error);
}).exceptionally(error -> {
log.error(error.getMessage());
return null;
});

fetchOffsetsByTimes(timestampsToSearch, requireTimestamps, listOffsetsRequestState);

return listOffsetsRequestState.globalResult;
return listOffsetsRequestState.globalResult.thenApply(result ->
offsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, result.fetchedOffsets));
}

/**
Expand All @@ -150,9 +154,14 @@ public CompletableFuture<Map<TopicPartition, Long>> fetchOffsets(final Set<Topic
private void fetchOffsetsByTimes(final Map<TopicPartition, Long> timestampsToSearch,
final boolean requireTimestamps,
final ListOffsetsRequestState listOffsetsRequestState) {
if (timestampsToSearch.isEmpty()) {
// Early return if empty map to avoid wrongfully raising StaleMetadataException on
// empty grouping
return;
}
try {
List<NetworkClientDelegate.UnsentRequest> unsentRequests =
sendListOffsetsRequests(timestampsToSearch, requireTimestamps, listOffsetsRequestState);
List<NetworkClientDelegate.UnsentRequest> unsentRequests = sendListOffsetsRequests(
timestampsToSearch, requireTimestamps, listOffsetsRequestState);
requestsToSend.addAll(unsentRequests);
} catch (StaleMetadataException e) {
requestsToRetry.add(listOffsetsRequestState);
Expand Down Expand Up @@ -208,7 +217,7 @@ private List<NetworkClientDelegate.UnsentRequest> sendListOffsetsRequests(
ListOffsetResult listOffsetResult =
new ListOffsetResult(listOffsetsRequestState.fetchedOffsets,
listOffsetsRequestState.remainingToSearch.keySet());
listOffsetsRequestState.globalResult.complete(listOffsetResult.offsetAndMetadataMap());
listOffsetsRequestState.globalResult.complete(listOffsetResult);
} else {
requestsToRetry.add(listOffsetsRequestState);
}
Expand Down Expand Up @@ -270,7 +279,7 @@ private static class ListOffsetsRequestState {
private final Map<TopicPartition, Long> timestampsToSearch;
private final Map<TopicPartition, ListOffsetData> fetchedOffsets;
private final Map<TopicPartition, Long> remainingToSearch;
private final CompletableFuture<Map<TopicPartition, Long>> globalResult;
private final CompletableFuture<ListOffsetResult> globalResult;
final boolean requireTimestamps;
final OffsetFetcherUtils offsetFetcherUtils;
final IsolationLevel isolationLevel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -406,11 +407,15 @@ private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition
if (partitions.isEmpty()) {
return Collections.emptyMap();
}
Map<TopicPartition, Long> timestampToSearch =
partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> timestamp));
final ListOffsetsApplicationEvent listOffsetsEvent = new ListOffsetsApplicationEvent(
partitions.stream().collect(Collectors.toSet()),
timestamp,
timestampToSearch,
false);
return eventHandler.addAndGet(listOffsetsEvent, timeout);
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
eventHandler.addAndGet(listOffsetsEvent, timeout);
return offsetAndTimestampMap.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(),
e -> e.getValue().offset()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private boolean process(final UnsubscribeApplicationEvent event) {
}

private boolean process(final ListOffsetsApplicationEvent event) {
requestManagers.listOffsetsRequestManager.fetchOffsets(event.partitions, event.timestamp,
requestManagers.listOffsetsRequestManager.fetchOffsets(event.timestampsToSearch,
event.requireTimestamps)
.whenComplete((result, error) -> {
if (error != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,41 @@
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

/**
* Event for retrieving partition offsets from the partition leader
* by performing a {@link org.apache.kafka.common.requests.ListOffsetsRequest}
* Event for retrieving partition offsets by performing a
* {@link org.apache.kafka.common.requests.ListOffsetsRequest ListOffsetsRequest}.
* This event is created with a map of {@link TopicPartition} and target timestamps to search
* offsets for. It is completed with a map of {@link TopicPartition} and the
* {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is greater than
* or equals to the target timestamp)
*/
public class ListOffsetsApplicationEvent extends CompletableApplicationEvent<Map<TopicPartition, Long>> {
private final CompletableFuture<Map<TopicPartition, Long>> future;
public class ListOffsetsApplicationEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestamp>> {
private final CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> future;

final Set<TopicPartition> partitions;
final long timestamp;
final Map<TopicPartition, Long> timestampsToSearch;
final boolean requireTimestamps;

public ListOffsetsApplicationEvent(final Set<TopicPartition> partitions,
long timestamp,
boolean requireTimestamps) {
public ListOffsetsApplicationEvent(Map<TopicPartition, Long> timestampToSearch, boolean requireTimestamps) {
super(Type.LIST_OFFSETS);
this.partitions = partitions;
this.timestamp = timestamp;
this.timestampsToSearch = timestampToSearch;
this.requireTimestamps = requireTimestamps;
this.future = new CompletableFuture<>();
}

public CompletableFuture<Map<TopicPartition, Long>> future() {
public CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> future() {
return future;
}

@Override
public String toString() {
return "ListOffsetsApplicationEvent {" +
"partitions=" + partitions + ", " +
"target timestamp=" + timestamp + ", " +
"timestampsToSearch=" + timestampsToSearch + ", " +
"requireTimestamps=" + requireTimestamps + '}';
}
}
Loading

0 comments on commit e5aed9d

Please sign in to comment.