Skip to content

Commit

Permalink
Modified commitSync() and close() handling in clients
Browse files Browse the repository at this point in the history
  • Loading branch information
ShivsundarR committed Sep 9, 2024
1 parent 4ac1dd4 commit bfa0e0a
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -84,10 +86,9 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
private Uuid memberId;
private boolean fetchMoreRecords = false;
private final Map<TopicIdPartition, Acknowledgements> fetchAcknowledgementsMap;
private final Map<Integer, Pair<AcknowledgeRequestState>> acknowledgeRequestStates;
private final Map<Integer, Tuple<AcknowledgeRequestState>> acknowledgeRequestStates;
private final long retryBackoffMs;
private final long retryBackoffMaxMs;
private boolean closing = false;
private final CompletableFuture<Void> closeFuture;

ShareConsumeRequestManager(final Time time,
Expand Down Expand Up @@ -132,7 +133,7 @@ public PollResult poll(long currentTimeMs) {
return pollResult;
}

if (!fetchMoreRecords || closing) {
if (!fetchMoreRecords) {
return PollResult.EMPTY;
}

Expand Down Expand Up @@ -216,7 +217,7 @@ public void fetch(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
private PollResult processAcknowledgements(long currentTimeMs) {
List<UnsentRequest> unsentRequests = new ArrayList<>();
AtomicBoolean isAsyncDone = new AtomicBoolean();
for (Map.Entry<Integer, Pair<AcknowledgeRequestState>> requestStates : acknowledgeRequestStates.entrySet()) {
for (Map.Entry<Integer, Tuple<AcknowledgeRequestState>> requestStates : acknowledgeRequestStates.entrySet()) {
int nodeId = requestStates.getKey();

if (!isNodeFree(nodeId)) {
Expand All @@ -226,10 +227,25 @@ private PollResult processAcknowledgements(long currentTimeMs) {
// For commitAsync
maybeBuildRequest(requestStates.getValue().getAsyncRequest(), currentTimeMs, true, isAsyncDone).ifPresent(unsentRequests::add);
// Check to ensure we start processing commitSync/close only if there are no commitAsync requests left to process.
if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
} else if (isAsyncDone.get()) {
maybeBuildRequest(requestStates.getValue().getSyncRequest(), currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
if (isAsyncDone.get()) {
// We try to process the close request only if we have processed the async and the sync requests for the node.
if (requestStates.getValue().getSyncRequestQueue() == null) {
if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
} else {
AcknowledgeRequestState closeRequestState = requestStates.getValue().getCloseRequest();

maybeBuildRequest(closeRequestState, currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
}
} else {
for (AcknowledgeRequestState acknowledgeRequestState : requestStates.getValue().getSyncRequestQueue()) {
if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
break;
}
maybeBuildRequest(acknowledgeRequestState, currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
}
}
}
}
}
Expand All @@ -240,14 +256,7 @@ private PollResult processAcknowledgements(long currentTimeMs) {
} else if (checkAndRemoveCompletedAcknowledgements()) {
// Return empty result until all the acknowledgement request states are processed
pollResult = PollResult.EMPTY;
} else if (closing) {
if (!closeFuture.isDone()) {
log.trace("Completing acknowledgement on close");
closeFuture.complete(null);
}
pollResult = PollResult.EMPTY;
}

return pollResult;
}

Expand Down Expand Up @@ -298,19 +307,32 @@ private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState acknow

/**
* Prunes the empty acknowledgementRequestStates.
* Returns true if there are still some acknowledgements left to be processed.
* Returns true if there are still any acknowledgements left to be processed.
* If there is a pending/inFlight close request, we always return true.
*/
private boolean checkAndRemoveCompletedAcknowledgements() {
boolean areAnyAcksLeft = false;
Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator = acknowledgeRequestStates.entrySet().iterator();
Iterator<Map.Entry<Integer, Tuple<AcknowledgeRequestState>>> iterator = acknowledgeRequestStates.entrySet().iterator();

while (iterator.hasNext()) {
Map.Entry<Integer, Pair<AcknowledgeRequestState>> acknowledgeRequestStatePair = iterator.next();
if (isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getAsyncRequest()) || isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getSyncRequest())) {
Map.Entry<Integer, Tuple<AcknowledgeRequestState>> acknowledgeRequestStatePair = iterator.next();
boolean areAsyncAcksLeft = true, areSyncAcksLeft = true;
if (!isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getAsyncRequest())) {
acknowledgeRequestStatePair.getValue().setAsyncRequest(null);
areAsyncAcksLeft = false;
}
if (!areRequestStatesInProgress(acknowledgeRequestStatePair.getValue().getSyncRequestQueue())) {
acknowledgeRequestStatePair.getValue().setSyncRequestQueue(null);
areSyncAcksLeft = false;
}

if (areAsyncAcksLeft || areSyncAcksLeft) {
areAnyAcksLeft = true;
} else if (!closing) {
} else if (acknowledgeRequestStatePair.getValue().getCloseRequest() == null) {
iterator.remove();
}
}

if (!acknowledgeRequestStates.isEmpty()) areAnyAcksLeft = true;
return areAnyAcksLeft;
}
Expand All @@ -319,6 +341,16 @@ private boolean isRequestStateInProgress(AcknowledgeRequestState acknowledgeRequ
return acknowledgeRequestState != null && !(acknowledgeRequestState.isEmpty());
}

private boolean areRequestStatesInProgress(Queue<AcknowledgeRequestState> acknowledgeRequestStates) {
if (acknowledgeRequestStates == null) return false;
for (AcknowledgeRequestState acknowledgeRequestState : acknowledgeRequestStates) {
if (isRequestStateInProgress(acknowledgeRequestState)) {
return true;
}
}
return false;
}

/**
* Enqueue an AcknowledgeRequestState to be picked up on the next poll
*
Expand All @@ -340,42 +372,38 @@ public CompletableFuture<Map<TopicIdPartition, Acknowledgements>> commitSync(
sessionHandlers.forEach((nodeId, sessionHandler) -> {
Node node = cluster.nodeById(nodeId);
if (node != null) {
acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, null));
acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null));

// Ensure there is no commitSync()/close() request already present as they are blocking calls
// and only one request can be active at a time.
if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != null && !acknowledgeRequestStates.get(nodeId).getSyncRequest().isEmpty()) {
log.error("Attempt to call commitSync() when there is an existing sync request for node {}", node.id());
future.completeExceptionally(
new IllegalStateException("Attempt to call commitSync() when there is an existing sync request for node : " + node.id()));
} else {
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>();
for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements = acknowledgementsMap.get(tip);
if (acknowledgements != null) {
acknowledgementsMapForNode.put(tip, acknowledgements);

metricsManager.recordAcknowledgementSent(acknowledgements.size());
log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id());
resultCount.incrementAndGet();
}
}
// Add the incoming commitSync() request to the queue.
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>();
for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements = acknowledgementsMap.get(tip);
if (acknowledgements != null) {
acknowledgementsMapForNode.put(tip, acknowledgements);

metricsManager.recordAcknowledgementSent(acknowledgements.size());
log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id());
resultCount.incrementAndGet();
}
}

// There can only be one commitSync()/close() happening at a time. So per node, there will be one acknowledge request state representing commitSync() and close().
acknowledgeRequestStates.get(nodeId).setSyncRequest(new AcknowledgeRequestState(logContext,
ShareConsumeRequestManager.class.getSimpleName() + ":1",
deadlineMs,
retryBackoffMs,
retryBackoffMaxMs,
sessionHandler,
nodeId,
acknowledgementsMapForNode,
resultHandler,
AcknowledgeRequestType.COMMIT_SYNC
));
if (acknowledgeRequestStates.get(nodeId).getSyncRequestQueue() == null) {
acknowledgeRequestStates.get(nodeId).setSyncRequestQueue(new LinkedList<>());
}

acknowledgeRequestStates.get(nodeId).addSyncRequest(new AcknowledgeRequestState(logContext,
ShareConsumeRequestManager.class.getSimpleName() + ":1",
deadlineMs,
retryBackoffMs,
retryBackoffMaxMs,
sessionHandler,
nodeId,
acknowledgementsMapForNode,
resultHandler,
AcknowledgeRequestType.COMMIT_SYNC
));
}

});

resultHandler.completeIfEmpty();
Expand All @@ -396,7 +424,7 @@ public void commitAsync(final Map<TopicIdPartition, Acknowledgements> acknowledg
if (node != null) {
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>();

acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, null));
acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null));

for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements = acknowledgementsMap.get(tip);
Expand Down Expand Up @@ -448,8 +476,6 @@ public CompletableFuture<Void> acknowledgeOnClose(final Map<TopicIdPartition, Ac
final AtomicInteger resultCount = new AtomicInteger();
final ResultHandler resultHandler = new ResultHandler(resultCount, Optional.empty());

closing = true;

sessionHandlers.forEach((nodeId, sessionHandler) -> {
Node node = cluster.nodeById(nodeId);
if (node != null) {
Expand All @@ -470,17 +496,17 @@ public CompletableFuture<Void> acknowledgeOnClose(final Map<TopicIdPartition, Ac
}
}

acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, null));
acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null));

// Ensure there is no commitSync()/close() request already present as they are blocking calls
// Ensure there is no close() request already present as they are blocking calls
// and only one request can be active at a time.
if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != null && !acknowledgeRequestStates.get(nodeId).getSyncRequest().isEmpty()) {
log.error("Attempt to call close() when there is an existing sync request for node {}-{}", node.id(), acknowledgeRequestStates.get(nodeId).getSyncRequest());
if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != null && !acknowledgeRequestStates.get(nodeId).getCloseRequest().isEmpty()) {
log.error("Attempt to call close() when there is an existing close request for node {}-{}", node.id(), acknowledgeRequestStates.get(nodeId).getSyncRequestQueue());
closeFuture.completeExceptionally(
new IllegalStateException("Attempt to call close() when there is an existing sync request for node : " + node.id()));
new IllegalStateException("Attempt to call close() when there is an existing close request for node : " + node.id()));
} else {
// There can only be one commitSync()/close() happening at a time. So per node, there will be one acknowledge request state.
acknowledgeRequestStates.get(nodeId).setSyncRequest(new AcknowledgeRequestState(logContext,
// There can only be one close() happening at a time. So per node, there will be one acknowledge request state.
acknowledgeRequestStates.get(nodeId).setCloseRequest(new AcknowledgeRequestState(logContext,
ShareConsumeRequestManager.class.getSimpleName() + ":3",
deadlineMs,
retryBackoffMs,
Expand Down Expand Up @@ -626,10 +652,6 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget,
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
acknowledgeRequestState.processingComplete();

if (!closeFuture.isDone()) {
closeFuture.complete(null);
}

metricsManager.recordLatency(resp.requestLatencyMs());
} else {
if (!acknowledgeRequestState.sessionHandler.handleResponse(response, resp.requestHeader().apiVersion())) {
Expand Down Expand Up @@ -927,6 +949,9 @@ ShareSessionHandler sessionHandler() {
void processingComplete() {
inFlightAcknowledgements.clear();
resultHandler.completeIfEmpty();
if (onClose() && !closeFuture.isDone()) {
closeFuture.complete(null);
}
}

/**
Expand Down Expand Up @@ -1011,33 +1036,47 @@ public void completeIfEmpty() {
}
}

static class Pair<V> {
static class Tuple<V> {
private V asyncRequest;
private V syncRequest;
private Queue<V> syncRequestQueue;
private V closeRequest;

public Pair(V asyncRequest, V syncRequest) {
public Tuple(V asyncRequest, Queue<V> syncRequestQueue, V closeRequest) {
this.asyncRequest = asyncRequest;
this.syncRequest = syncRequest;
this.syncRequestQueue = syncRequestQueue;
this.closeRequest = closeRequest;
}

public void setAsyncRequest(V asyncRequest) {
this.asyncRequest = asyncRequest;
}

public void setSyncRequest(V second) {
this.syncRequest = second;
public void setSyncRequestQueue(Queue<V> syncRequestQueue) {
this.syncRequestQueue = syncRequestQueue;
}

public void addSyncRequest(V syncRequest) {
this.syncRequestQueue.add(syncRequest);
}

public void setCloseRequest(V closeRequest) {
this.closeRequest = closeRequest;
}

public V getAsyncRequest() {
return asyncRequest;
}

public V getSyncRequest() {
return syncRequest;
public Queue<V> getSyncRequestQueue() {
return syncRequestQueue;
}

public V getCloseRequest() {
return closeRequest;
}
}

Pair<AcknowledgeRequestState> requestStates(int nodeId) {
Tuple<AcknowledgeRequestState> requestStates(int nodeId) {
return acknowledgeRequestStates.get(nodeId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,11 +710,11 @@ public Map<TopicIdPartition, Optional<KafkaException>> commitSync(final Duration
completedAcknowledgements.forEach((tip, acks) -> {
Errors ackErrorCode = acks.getAcknowledgeErrorCode();
if (ackErrorCode == null) {
result.put(tip, null);
result.put(tip, Optional.empty());
} else {
ApiException exception = ackErrorCode.exception();
if (exception == null) {
result.put(tip, null);
result.put(tip, Optional.empty());
} else {
result.put(tip, Optional.of(ackErrorCode.exception()));
}
Expand Down
Loading

0 comments on commit bfa0e0a

Please sign in to comment.