Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17502: Modified commitSync() and close() handling in clients #17136

Merged
merged 5 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -84,7 +86,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
private Uuid memberId;
private boolean fetchMoreRecords = false;
private final Map<TopicIdPartition, Acknowledgements> fetchAcknowledgementsMap;
private final Map<Integer, Pair<AcknowledgeRequestState>> acknowledgeRequestStates;
private final Map<Integer, Tuple<AcknowledgeRequestState>> acknowledgeRequestStates;
private final long retryBackoffMs;
private final long retryBackoffMaxMs;
private boolean closing = false;
Expand Down Expand Up @@ -132,7 +134,7 @@ public PollResult poll(long currentTimeMs) {
return pollResult;
}

if (!fetchMoreRecords || closing) {
if (!fetchMoreRecords) {
return PollResult.EMPTY;
}

Expand Down Expand Up @@ -216,7 +218,7 @@ public void fetch(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
private PollResult processAcknowledgements(long currentTimeMs) {
List<UnsentRequest> unsentRequests = new ArrayList<>();
AtomicBoolean isAsyncDone = new AtomicBoolean();
for (Map.Entry<Integer, Pair<AcknowledgeRequestState>> requestStates : acknowledgeRequestStates.entrySet()) {
for (Map.Entry<Integer, Tuple<AcknowledgeRequestState>> requestStates : acknowledgeRequestStates.entrySet()) {
int nodeId = requestStates.getKey();

if (!isNodeFree(nodeId)) {
Expand All @@ -226,10 +228,25 @@ private PollResult processAcknowledgements(long currentTimeMs) {
// For commitAsync
maybeBuildRequest(requestStates.getValue().getAsyncRequest(), currentTimeMs, true, isAsyncDone).ifPresent(unsentRequests::add);
// Check to ensure we start processing commitSync/close only if there are no commitAsync requests left to process.
if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
} else if (isAsyncDone.get()) {
maybeBuildRequest(requestStates.getValue().getSyncRequest(), currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
if (isAsyncDone.get()) {
// We try to process the close request only if we have processed the async and the sync requests for the node.
if (requestStates.getValue().getSyncRequestQueue() == null) {
if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
} else {
AcknowledgeRequestState closeRequestState = requestStates.getValue().getCloseRequest();

maybeBuildRequest(closeRequestState, currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
}
} else {
for (AcknowledgeRequestState acknowledgeRequestState : requestStates.getValue().getSyncRequestQueue()) {
if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
break;
}
maybeBuildRequest(acknowledgeRequestState, currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
}
}
}
}
}
Expand All @@ -242,12 +259,10 @@ private PollResult processAcknowledgements(long currentTimeMs) {
pollResult = PollResult.EMPTY;
} else if (closing) {
if (!closeFuture.isDone()) {
log.trace("Completing acknowledgement on close");
closeFuture.complete(null);
}
pollResult = PollResult.EMPTY;
}

return pollResult;
}

Expand Down Expand Up @@ -298,25 +313,56 @@ private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState acknow

/**
* Prunes the empty acknowledgementRequestStates.
* Returns true if there are still some acknowledgements left to be processed.
* Returns true if there are still any acknowledgements left to be processed.
*/
private boolean checkAndRemoveCompletedAcknowledgements() {
boolean areAnyAcksLeft = false;
Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator = acknowledgeRequestStates.entrySet().iterator();
Iterator<Map.Entry<Integer, Tuple<AcknowledgeRequestState>>> iterator = acknowledgeRequestStates.entrySet().iterator();

while (iterator.hasNext()) {
Map.Entry<Integer, Pair<AcknowledgeRequestState>> acknowledgeRequestStatePair = iterator.next();
if (isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getAsyncRequest()) || isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getSyncRequest())) {
Map.Entry<Integer, Tuple<AcknowledgeRequestState>> acknowledgeRequestStatePair = iterator.next();
boolean areAsyncAcksLeft = true, areSyncAcksLeft = true;
if (!isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getAsyncRequest())) {
acknowledgeRequestStatePair.getValue().setAsyncRequest(null);
areAsyncAcksLeft = false;
}
if (!areRequestStatesInProgress(acknowledgeRequestStatePair.getValue().getSyncRequestQueue())) {
acknowledgeRequestStatePair.getValue().nullifySyncRequestQueue();
areSyncAcksLeft = false;
}
if (!isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getCloseRequest())) {
acknowledgeRequestStatePair.getValue().setCloseRequest(null);
}

if (areAsyncAcksLeft || areSyncAcksLeft) {
areAnyAcksLeft = true;
} else if (!closing) {
} else if (acknowledgeRequestStatePair.getValue().getCloseRequest() == null) {
iterator.remove();
}
}

if (!acknowledgeRequestStates.isEmpty()) areAnyAcksLeft = true;
return areAnyAcksLeft;
}

private boolean isRequestStateInProgress(AcknowledgeRequestState acknowledgeRequestState) {
return acknowledgeRequestState != null && !(acknowledgeRequestState.isEmpty());
if (acknowledgeRequestState == null) {
return false;
} else if (acknowledgeRequestState.onClose()) {
return !acknowledgeRequestState.isProcessed;
} else {
return !(acknowledgeRequestState.isEmpty());
}
}

private boolean areRequestStatesInProgress(Queue<AcknowledgeRequestState> acknowledgeRequestStates) {
if (acknowledgeRequestStates == null) return false;
for (AcknowledgeRequestState acknowledgeRequestState : acknowledgeRequestStates) {
if (isRequestStateInProgress(acknowledgeRequestState)) {
return true;
}
}
return false;
}

/**
Expand All @@ -340,42 +386,34 @@ public CompletableFuture<Map<TopicIdPartition, Acknowledgements>> commitSync(
sessionHandlers.forEach((nodeId, sessionHandler) -> {
Node node = cluster.nodeById(nodeId);
if (node != null) {
acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, null));

// Ensure there is no commitSync()/close() request already present as they are blocking calls
// and only one request can be active at a time.
if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != null && !acknowledgeRequestStates.get(nodeId).getSyncRequest().isEmpty()) {
log.error("Attempt to call commitSync() when there is an existing sync request for node {}", node.id());
future.completeExceptionally(
new IllegalStateException("Attempt to call commitSync() when there is an existing sync request for node : " + node.id()));
} else {
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>();
for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements = acknowledgementsMap.get(tip);
if (acknowledgements != null) {
acknowledgementsMapForNode.put(tip, acknowledgements);

metricsManager.recordAcknowledgementSent(acknowledgements.size());
log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id());
resultCount.incrementAndGet();
}
}
acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null));

// Add the incoming commitSync() request to the queue.
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>();
for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements = acknowledgementsMap.get(tip);
if (acknowledgements != null) {
acknowledgementsMapForNode.put(tip, acknowledgements);

// There can only be one commitSync()/close() happening at a time. So per node, there will be one acknowledge request state representing commitSync() and close().
acknowledgeRequestStates.get(nodeId).setSyncRequest(new AcknowledgeRequestState(logContext,
ShareConsumeRequestManager.class.getSimpleName() + ":1",
deadlineMs,
retryBackoffMs,
retryBackoffMaxMs,
sessionHandler,
nodeId,
acknowledgementsMapForNode,
resultHandler,
AcknowledgeRequestType.COMMIT_SYNC
));
metricsManager.recordAcknowledgementSent(acknowledgements.size());
log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id());
resultCount.incrementAndGet();
}
}

acknowledgeRequestStates.get(nodeId).addSyncRequest(new AcknowledgeRequestState(logContext,
ShareConsumeRequestManager.class.getSimpleName() + ":1",
deadlineMs,
retryBackoffMs,
retryBackoffMaxMs,
sessionHandler,
nodeId,
acknowledgementsMapForNode,
resultHandler,
AcknowledgeRequestType.COMMIT_SYNC
));
}

});

resultHandler.completeIfEmpty();
Expand All @@ -396,7 +434,7 @@ public void commitAsync(final Map<TopicIdPartition, Acknowledgements> acknowledg
if (node != null) {
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>();

acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, null));
acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null));

for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements = acknowledgementsMap.get(tip);
Expand Down Expand Up @@ -470,17 +508,17 @@ public CompletableFuture<Void> acknowledgeOnClose(final Map<TopicIdPartition, Ac
}
}

acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, null));
acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null));

// Ensure there is no commitSync()/close() request already present as they are blocking calls
// Ensure there is no close() request already present as they are blocking calls
// and only one request can be active at a time.
if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != null && !acknowledgeRequestStates.get(nodeId).getSyncRequest().isEmpty()) {
log.error("Attempt to call close() when there is an existing sync request for node {}-{}", node.id(), acknowledgeRequestStates.get(nodeId).getSyncRequest());
if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != null && !acknowledgeRequestStates.get(nodeId).getCloseRequest().isEmpty()) {
log.error("Attempt to call close() when there is an existing close request for node {}-{}", node.id(), acknowledgeRequestStates.get(nodeId).getSyncRequestQueue());
closeFuture.completeExceptionally(
new IllegalStateException("Attempt to call close() when there is an existing sync request for node : " + node.id()));
new IllegalStateException("Attempt to call close() when there is an existing close request for node : " + node.id()));
} else {
// There can only be one commitSync()/close() happening at a time. So per node, there will be one acknowledge request state.
acknowledgeRequestStates.get(nodeId).setSyncRequest(new AcknowledgeRequestState(logContext,
// There can only be one close() happening at a time. So per node, there will be one acknowledge request state.
acknowledgeRequestStates.get(nodeId).setCloseRequest(new AcknowledgeRequestState(logContext,
ShareConsumeRequestManager.class.getSimpleName() + ":3",
deadlineMs,
retryBackoffMs,
Expand Down Expand Up @@ -626,9 +664,6 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget,
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
acknowledgeRequestState.processingComplete();

if (!closeFuture.isDone()) {
closeFuture.complete(null);
}
} else {
if (!acknowledgeRequestState.sessionHandler.handleResponse(response, resp.requestHeader().apiVersion())) {
// Received a response-level error code.
Expand Down Expand Up @@ -678,7 +713,9 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget,
}
}

metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs());
if (acknowledgeRequestState.isProcessed) {
metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs());
}
} finally {
log.debug("Removing pending request for node {} - success", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
Expand Down Expand Up @@ -785,6 +822,13 @@ public class AcknowledgeRequestState extends TimedRequestState {
*/
private final AcknowledgeRequestType requestType;

/**
* Boolean to indicate if the request has been processed,
* Set to true once we process the response and do not retry the request.
* Initialized to false every time we build a request.
*/
private boolean isProcessed;

AcknowledgeRequestState(LogContext logContext,
String owner,
long deadlineMs,
Expand All @@ -803,6 +847,7 @@ public class AcknowledgeRequestState extends TimedRequestState {
this.inFlightAcknowledgements = new HashMap<>();
this.incompleteAcknowledgements = new HashMap<>();
this.requestType = acknowledgeRequestType;
this.isProcessed = false;
}

UnsentRequest buildRequest() {
Expand All @@ -823,6 +868,7 @@ UnsentRequest buildRequest() {

log.trace("Building acknowledgements to send : {}", finalAcknowledgementsToSend);
nodesWithPendingRequests.add(nodeId);
isProcessed = false;

BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> {
if (error != null) {
Expand Down Expand Up @@ -925,6 +971,7 @@ ShareSessionHandler sessionHandler() {
void processingComplete() {
inFlightAcknowledgements.clear();
resultHandler.completeIfEmpty();
isProcessed = true;
}

/**
Expand Down Expand Up @@ -1009,33 +1056,50 @@ public void completeIfEmpty() {
}
}

static class Pair<V> {
static class Tuple<V> {
private V asyncRequest;
private V syncRequest;
private Queue<V> syncRequestQueue;
private V closeRequest;

public Pair(V asyncRequest, V syncRequest) {
public Tuple(V asyncRequest, Queue<V> syncRequestQueue, V closeRequest) {
this.asyncRequest = asyncRequest;
this.syncRequest = syncRequest;
this.syncRequestQueue = syncRequestQueue;
this.closeRequest = closeRequest;
}

public void setAsyncRequest(V asyncRequest) {
this.asyncRequest = asyncRequest;
}

public void setSyncRequest(V second) {
this.syncRequest = second;
public void nullifySyncRequestQueue() {
this.syncRequestQueue = null;
}

public void addSyncRequest(V syncRequest) {
if (syncRequestQueue == null) {
syncRequestQueue = new LinkedList<>();
}
this.syncRequestQueue.add(syncRequest);
}

public void setCloseRequest(V closeRequest) {
this.closeRequest = closeRequest;
}

public V getAsyncRequest() {
return asyncRequest;
}

public V getSyncRequest() {
return syncRequest;
public Queue<V> getSyncRequestQueue() {
return syncRequestQueue;
}

public V getCloseRequest() {
return closeRequest;
}
}

Pair<AcknowledgeRequestState> requestStates(int nodeId) {
Tuple<AcknowledgeRequestState> requestStates(int nodeId) {
return acknowledgeRequestStates.get(nodeId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,11 +711,11 @@ public Map<TopicIdPartition, Optional<KafkaException>> commitSync(final Duration
completedAcknowledgements.forEach((tip, acks) -> {
Errors ackErrorCode = acks.getAcknowledgeErrorCode();
if (ackErrorCode == null) {
result.put(tip, null);
result.put(tip, Optional.empty());
} else {
ApiException exception = ackErrorCode.exception();
if (exception == null) {
result.put(tip, null);
result.put(tip, Optional.empty());
} else {
result.put(tip, Optional.of(ackErrorCode.exception()));
}
Expand Down
Loading