Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
rreddy-22 authored Sep 21, 2023
2 parents fb6aa97 + 7d89bdc commit 8fe7d13
Show file tree
Hide file tree
Showing 21 changed files with 803 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
Expand Down Expand Up @@ -129,7 +128,7 @@ public void maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets
* Handles {@link org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It creates an
* {@link OffsetCommitRequestState} and enqueue it to send later.
*/
public CompletableFuture<ClientResponse> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
public CompletableFuture<Void> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
return pendingRequests.addOffsetCommitRequest(offsets);
}

Expand All @@ -145,19 +144,13 @@ public void updateAutoCommitTimer(final long currentTimeMs) {
this.autoCommitState.ifPresent(t -> t.ack(currentTimeMs));
}


// Visible for testing
List<OffsetFetchRequestState> unsentOffsetFetchRequests() {
return pendingRequests.unsentOffsetFetches;
}

// Visible for testing
Queue<OffsetCommitRequestState> unsentOffsetCommitRequests() {
return pendingRequests.unsentOffsetCommits;
}

// Visible for testing
CompletableFuture<ClientResponse> sendAutoCommit(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) {
CompletableFuture<Void> sendAutoCommit(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) {
log.debug("Enqueuing autocommit offsets: {}", allConsumedOffsets);
return this.addOffsetCommitRequest(allConsumedOffsets)
.whenComplete((response, throwable) -> {
Expand All @@ -182,23 +175,19 @@ private class OffsetCommitRequestState {
private final String groupId;
private final GroupState.Generation generation;
private final String groupInstanceId;
private final NetworkClientDelegate.FutureCompletionHandler future;
private final CompletableFuture<Void> future;

public OffsetCommitRequestState(final Map<TopicPartition, OffsetAndMetadata> offsets,
final String groupId,
final String groupInstanceId,
final GroupState.Generation generation) {
this.offsets = offsets;
this.future = new NetworkClientDelegate.FutureCompletionHandler();
this.future = new CompletableFuture<>();
this.groupId = groupId;
this.generation = generation;
this.groupInstanceId = groupInstanceId;
}

public CompletableFuture<ClientResponse> future() {
return future.future();
}

public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
Expand Down Expand Up @@ -230,15 +219,20 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
return new NetworkClientDelegate.UnsentRequest(
builder,
coordinatorRequestManager.coordinator(),
future);
(response, throwable) -> {
if (throwable == null) {
future.complete(null);
} else {
future.completeExceptionally(throwable);
}
});
}
}

private class OffsetFetchRequestState extends RequestState {
public final Set<TopicPartition> requestedPartitions;
public final GroupState.Generation requestedGeneration;
public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future;

private final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future;
public OffsetFetchRequestState(final Set<TopicPartition> partitions,
final GroupState.Generation generation,
final long retryBackoffMs,
Expand All @@ -253,19 +247,16 @@ public boolean sameRequest(final OffsetFetchRequestState request) {
return Objects.equals(requestedGeneration, request.requestedGeneration) && requestedPartitions.equals(request.requestedPartitions);
}

public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long currentTimeMs) {
public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
OffsetFetchRequest.Builder builder = new OffsetFetchRequest.Builder(
groupState.groupId,
true,
new ArrayList<>(this.requestedPartitions),
throwOnFetchStableOffsetUnsupported);
NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest(
return new NetworkClientDelegate.UnsentRequest(
builder,
coordinatorRequestManager.coordinator());
unsentRequest.future().whenComplete((r, t) -> {
onResponse(currentTimeMs, (OffsetFetchResponse) r.responseBody());
});
return unsentRequest;
coordinatorRequestManager.coordinator(),
(r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody()));
}

public void onResponse(
Expand Down Expand Up @@ -359,12 +350,12 @@ private void onSuccess(final long currentTimeMs,
}
}

private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> chainFuture(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> chainFuture(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> otherFuture) {
return this.future.whenComplete((r, t) -> {
if (t != null) {
future.completeExceptionally(t);
otherFuture.completeExceptionally(t);
} else {
future.complete(r);
otherFuture.complete(r);
}
});
}
Expand All @@ -384,8 +375,8 @@ public String toString() {
* <p>This is used to stage the unsent {@link OffsetCommitRequestState} and {@link OffsetFetchRequestState}.
* <li>unsentOffsetCommits holds the offset commit requests that have not been sent out</>
* <li>unsentOffsetFetches holds the offset fetch requests that have not been sent out</li>
* <li>inflightOffsetFetches holds the offset fetch requests that have been sent out but incompleted</>.
*
* <li>inflightOffsetFetches holds the offset fetch requests that have been sent out but not completed</>.
* <p>
* {@code addOffsetFetchRequest} dedupes the requests to avoid sending the same requests.
*/

Expand All @@ -395,27 +386,28 @@ class PendingRequests {
List<OffsetFetchRequestState> unsentOffsetFetches = new ArrayList<>();
List<OffsetFetchRequestState> inflightOffsetFetches = new ArrayList<>();

public boolean hasUnsentRequests() {
// Visible for testing
boolean hasUnsentRequests() {
return !unsentOffsetCommits.isEmpty() || !unsentOffsetFetches.isEmpty();
}

public CompletableFuture<ClientResponse> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
CompletableFuture<Void> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
// TODO: Dedupe committing the same offsets to the same partitions
OffsetCommitRequestState request = new OffsetCommitRequestState(
offsets,
groupState.groupId,
groupState.groupInstanceId.orElse(null),
groupState.generation);
unsentOffsetCommits.add(request);
return request.future();
return request.future;
}

/**
* <p>Adding an offset fetch request to the outgoing buffer. If the same request was made, we chain the future
* to the existing one.
* <p>Adding an offset fetch request to the outgoing buffer. If the same request was made, we chain the future
* to the existing one.
*
* <p>If the request is new, it invokes a callback to remove itself from the {@code inflightOffsetFetches}
* upon completion.</>
* <p>If the request is new, it invokes a callback to remove itself from the {@code inflightOffsetFetches}
* upon completion.
*/
private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetchRequest(final OffsetFetchRequestState request) {
Optional<OffsetFetchRequestState> dupe =
Expand Down Expand Up @@ -450,12 +442,11 @@ private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetch

/**
* Clear {@code unsentOffsetCommits} and moves all the sendable request in {@code unsentOffsetFetches} to the
* {@code inflightOffsetFetches} to bookkeep all of the inflight requests.
*
* {@code inflightOffsetFetches} to bookkeep all the inflight requests.
* Note: Sendable requests are determined by their timer as we are expecting backoff on failed attempt. See
* {@link RequestState}.
**/
public List<NetworkClientDelegate.UnsentRequest> drain(final long currentTimeMs) {
List<NetworkClientDelegate.UnsentRequest> drain(final long currentTimeMs) {
List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>();

// Add all unsent offset commit requests to the unsentRequests list
Expand All @@ -472,7 +463,7 @@ public List<NetworkClientDelegate.UnsentRequest> drain(final long currentTimeMs)
// Add all sendable offset fetch requests to the unsentRequests list and to the inflightOffsetFetches list
for (OffsetFetchRequestState request : partitionedBySendability.get(true)) {
request.onSendAttempt(currentTimeMs);
unsentRequests.add(request.toUnsentRequest(currentTimeMs));
unsentRequests.add(request.toUnsentRequest());
inflightOffsetFetches.add(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
* Whether there is an existing coordinator.
* Whether there is an inflight request.
* Whether the backoff timer has expired.
* The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer
* or a singleton list of {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
* The {@link NetworkClientDelegate.PollResult} contains either a wait timer
* or a singleton list of {@link NetworkClientDelegate.UnsentRequest}.
* <p/>
* The {@link FindCoordinatorRequest} will be handled by the {@link #onResponse(long, FindCoordinatorResponse)} callback, which
* subsequently invokes {@code onResponse} to handle the exception and response. Note that the coordinator node will be
Expand Down Expand Up @@ -86,7 +86,7 @@ public CoordinatorRequestManager(
* Note that this method does not involve any actual network IO, and it only determines if we need to send a new request or not.
*
* @param currentTimeMs current time in ms.
* @return {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. This will not be {@code null}.
* @return {@link NetworkClientDelegate.PollResult}. This will not be {@code null}.
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class DefaultBackgroundThread extends KafkaThread {
private final RequestManagers requestManagers;

// Visible for testing
@SuppressWarnings("ParameterNumber")
DefaultBackgroundThread(final Time time,
final ConsumerConfig config,
final LogContext logContext,
Expand All @@ -89,7 +90,8 @@ public class DefaultBackgroundThread extends KafkaThread {
final GroupState groupState,
final CoordinatorRequestManager coordinatorManager,
final CommitRequestManager commitRequestManager,
final OffsetsRequestManager offsetsRequestManager) {
final OffsetsRequestManager offsetsRequestManager,
final TopicMetadataRequestManager topicMetadataRequestManager) {
super(BACKGROUND_THREAD_NAME, true);
this.time = time;
this.running = true;
Expand All @@ -102,9 +104,9 @@ public class DefaultBackgroundThread extends KafkaThread {
this.networkClientDelegate = networkClient;
this.errorEventHandler = errorEventHandler;
this.groupState = groupState;

this.requestManagers = new RequestManagers(
offsetsRequestManager,
topicMetadataRequestManager,
Optional.ofNullable(coordinatorManager),
Optional.ofNullable(commitRequestManager));
}
Expand Down Expand Up @@ -169,6 +171,9 @@ public DefaultBackgroundThread(final Time time,
logContext);
CoordinatorRequestManager coordinatorRequestManager = null;
CommitRequestManager commitRequestManager = null;
TopicMetadataRequestManager topicMetadataRequestManger = new TopicMetadataRequestManager(
logContext,
config);

if (groupState.groupId != null) {
coordinatorRequestManager = new CoordinatorRequestManager(
Expand All @@ -188,15 +193,14 @@ public DefaultBackgroundThread(final Time time,
}

this.requestManagers = new RequestManagers(
offsetsRequestManager,
Optional.ofNullable(coordinatorRequestManager),
Optional.ofNullable(commitRequestManager));

offsetsRequestManager,
topicMetadataRequestManger,
Optional.ofNullable(coordinatorRequestManager),
Optional.ofNullable(commitRequestManager));
this.applicationEventProcessor = new ApplicationEventProcessor(
backgroundEventQueue,
requestManagers,
metadata);

backgroundEventQueue,
requestManagers,
metadata);
} catch (final Exception e) {
close();
throw new KafkaException("Failed to construct background processor", e.getCause());
Expand All @@ -217,7 +221,7 @@ public void run() {
}
} catch (final Throwable t) {
log.error("The background thread failed due to unexpected error", t);
throw new RuntimeException(t);
throw new KafkaException(t);
} finally {
close();
log.debug("{} closed", getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

/**
* A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle network poll and send operations.
Expand Down Expand Up @@ -211,17 +212,19 @@ public static class UnsentRequest {
private Optional<Node> node; // empty if random node can be chosen
private Timer timer;

public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder, final Optional<Node> node) {
this(requestBuilder, node, new FutureCompletionHandler());
}

public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder,
final Optional<Node> node,
final FutureCompletionHandler handler) {
final Optional<Node> node) {
Objects.requireNonNull(requestBuilder);
this.requestBuilder = requestBuilder;
this.node = node;
this.handler = handler;
this.handler = new FutureCompletionHandler();
}

public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder,
final Optional<Node> node,
final BiConsumer<ClientResponse, Throwable> callback) {
this(requestBuilder, node);
this.handler.future.whenComplete(callback);
}

public void setTimer(final Time time, final long requestTimeoutMs) {
Expand Down Expand Up @@ -263,10 +266,6 @@ public void onFailure(final RuntimeException e) {
future.completeExceptionally(e);
}

public CompletableFuture<ClientResponse> future() {
return future;
}

@Override
public void onComplete(final ClientResponse response) {
if (response.authenticationException() != null) {
Expand All @@ -280,5 +279,4 @@ public void onComplete(final ClientResponse response) {
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState,

/**
* Determine if there are pending fetch offsets requests to be sent and build a
* {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}
* {@link NetworkClientDelegate.PollResult}
* containing it.
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCo
commitCallback.onComplete(offsets, null);
}
}).exceptionally(e -> {
System.out.println(e);
throw new KafkaException(e);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,24 @@ public class RequestManagers {
public final Optional<CoordinatorRequestManager> coordinatorRequestManager;
public final Optional<CommitRequestManager> commitRequestManager;
public final OffsetsRequestManager offsetsRequestManager;
public final TopicMetadataRequestManager topicMetadataRequestManager;
private final List<Optional<? extends RequestManager>> entries;

public RequestManagers(OffsetsRequestManager offsetsRequestManager,
TopicMetadataRequestManager topicMetadataRequestManager,
Optional<CoordinatorRequestManager> coordinatorRequestManager,
Optional<CommitRequestManager> commitRequestManager) {
this.offsetsRequestManager = requireNonNull(offsetsRequestManager,
"OffsetsRequestManager cannot be null");
this.coordinatorRequestManager = coordinatorRequestManager;
this.commitRequestManager = commitRequestManager;
this.topicMetadataRequestManager = topicMetadataRequestManager;

List<Optional<? extends RequestManager>> list = new ArrayList<>();
list.add(coordinatorRequestManager);
list.add(commitRequestManager);
list.add(Optional.of(offsetsRequestManager));
list.add(Optional.of(topicMetadataRequestManager));
entries = Collections.unmodifiableList(list);
}

Expand Down
Loading

0 comments on commit 8fe7d13

Please sign in to comment.