Skip to content

Commit

Permalink
KAFKA-18016: Modified handling of piggyback acknowledgements in Share…
Browse files Browse the repository at this point in the history
…ConsumeRequestManager. (#17824)

What
There was a bug in handling piggyback acknowledgements in ShareConsumeRequestManager, where the fetchAcknowledgementsMap could be updated when the request was in flight and when the ShareFetch response is received, we were removing any acknowledgements(without actually sending them) which came when the request was in flight.

Fix
Now we are maintaining 2 separate maps(one which has the acknowledgements to send and one which keeps track of the acknowledgements in flight).

 Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal <[email protected]>,  Manikumar Reddy <[email protected]>
  • Loading branch information
ShivsundarR authored Nov 18, 2024
1 parent cd1bf19 commit eafa78d
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
private final IdempotentCloser idempotentCloser = new IdempotentCloser();
private Uuid memberId;
private boolean fetchMoreRecords = false;
private final Map<TopicIdPartition, Acknowledgements> fetchAcknowledgementsMap;
private final Map<TopicIdPartition, Acknowledgements> fetchAcknowledgementsToSend;
private final Map<TopicIdPartition, Acknowledgements> fetchAcknowledgementsInFlight;
private final Map<Integer, Tuple<AcknowledgeRequestState>> acknowledgeRequestStates;
private final long retryBackoffMs;
private final long retryBackoffMaxMs;
private boolean closing = false;
private final CompletableFuture<Void> closeFuture;
private boolean isAcknowledgementCommitCallbackRegistered = false;
private final Map<IdAndPartition, String> forgottenTopicNames = new HashMap<>();
private final Map<IdAndPartition, String> topicNamesMap = new HashMap<>();

ShareConsumeRequestManager(final Time time,
final LogContext logContext,
Expand Down Expand Up @@ -122,7 +123,8 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
this.sessionHandlers = new HashMap<>();
this.nodesWithPendingRequests = new HashSet<>();
this.acknowledgeRequestStates = new HashMap<>();
this.fetchAcknowledgementsMap = new HashMap<>();
this.fetchAcknowledgementsToSend = new HashMap<>();
this.fetchAcknowledgementsInFlight = new HashMap<>();
this.closeFuture = new CompletableFuture<>();
}

Expand Down Expand Up @@ -170,12 +172,14 @@ public PollResult poll(long currentTimeMs) {
k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId)));

TopicIdPartition tip = new TopicIdPartition(topicId, partition);
Acknowledgements acknowledgementsToSend = fetchAcknowledgementsMap.get(tip);
Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip);
if (acknowledgementsToSend != null) {
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend);
}
handler.addPartitionToFetch(tip, acknowledgementsToSend);
fetchedPartitions.add(tip);
topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic());

log.debug("Added fetch request for partition {} to node {}", tip, node.id());
}
Expand All @@ -194,15 +198,18 @@ public PollResult poll(long currentTimeMs) {
} else {
for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
if (!fetchedPartitions.contains(tip)) {
Acknowledgements acknowledgementsToSend = fetchAcknowledgementsMap.get(tip);
Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip);

if (acknowledgementsToSend != null) {
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend);
}

sessionHandler.addPartitionToFetch(tip, acknowledgementsToSend);
partitionsToForgetMap.putIfAbsent(node, new ArrayList<>());
partitionsToForgetMap.get(node).add(tip);

forgottenTopicNames.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
fetchedPartitions.add(tip);
log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, node.id());
}
Expand Down Expand Up @@ -253,7 +260,7 @@ public void fetch(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
}

// The acknowledgements sent via ShareFetch are stored in this map.
acknowledgementsMap.forEach((tip, acks) -> fetchAcknowledgementsMap.merge(tip, acks, Acknowledgements::merge));
acknowledgementsMap.forEach((tip, acks) -> fetchAcknowledgementsToSend.merge(tip, acks, Acknowledgements::merge));
}

/**
Expand Down Expand Up @@ -566,8 +573,10 @@ public CompletableFuture<Void> acknowledgeOnClose(final Map<TopicIdPartition, Ac
for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements = acknowledgementsMap.getOrDefault(tip, Acknowledgements.empty());

if (fetchAcknowledgementsMap.get(tip) != null) {
acknowledgements.merge(fetchAcknowledgementsMap.remove(tip));
Acknowledgements acksFromShareFetch = fetchAcknowledgementsToSend.remove(tip);

if (acksFromShareFetch != null) {
acknowledgements.merge(acksFromShareFetch);
}

if (acknowledgements != null && !acknowledgements.isEmpty()) {
Expand Down Expand Up @@ -639,7 +648,7 @@ private void handleShareFetchSuccess(Node fetchTarget,
responseData.put(new TopicIdPartition(topicResponse.topicId(),
partition.partitionIndex(),
metadata.topicNames().getOrDefault(topicResponse.topicId(),
forgottenTopicNames.remove(new IdAndPartition(topicResponse.topicId(), partition.partitionIndex())))), partition))
topicNamesMap.remove(new IdAndPartition(topicResponse.topicId(), partition.partitionIndex())))), partition))
);

final Set<TopicPartition> partitions = responseData.keySet().stream().map(TopicIdPartition::topicPartition).collect(Collectors.toSet());
Expand All @@ -653,7 +662,7 @@ private void handleShareFetchSuccess(Node fetchTarget,

log.debug("ShareFetch for partition {} returned fetch data {}", tip, partitionData);

Acknowledgements acks = fetchAcknowledgementsMap.remove(tip);
Acknowledgements acks = fetchAcknowledgementsInFlight.remove(tip);
if (acks != null) {
if (partitionData.acknowledgeErrorCode() != Errors.NONE.code()) {
metricsManager.recordFailedAcknowledgements(acks.size());
Expand Down Expand Up @@ -716,7 +725,7 @@ private void handleShareFetchFailure(Node fetchTarget,
partition.partitionIndex(),
metadata.topicNames().get(topic.topicId()));

Acknowledgements acks = fetchAcknowledgementsMap.remove(tip);
Acknowledgements acks = fetchAcknowledgementsInFlight.remove(tip);
if (acks != null) {
metricsManager.recordFailedAcknowledgements(acks.size());
acks.setAcknowledgeErrorCode(Errors.forException(error));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,52 @@ public void testRetryAcknowledgements() throws InterruptedException {
assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0));
}

@Test
public void testPiggybackAcknowledgementsInFlight() {
buildRequestManager();

assignFromSubscribed(Collections.singleton(tp0));

// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());

client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());

Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);

// Reading records from the share fetch buffer.
fetchRecords();

// Piggyback acknowledgements
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements));

assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());

assertEquals(2.0,
metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue());

Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(3L, AcknowledgeType.ACCEPT);
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements2));

client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());

fetchRecords();

assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
assertEquals(3.0,
metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue());
}

@Test
public void testCommitAsyncWithSubscriptionChange() {
buildRequestManager();
Expand Down

0 comments on commit eafa78d

Please sign in to comment.