diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index a98076d60b1a..1ace791b1b94 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -118,10 +118,10 @@ + files="(Sender|Fetcher|FetchRequestManager|ShareConsumeRequestManager|OffsetFetcher|KafkaConsumer|KafkaShareConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/> + files="(ConsumerCoordinator|KafkaConsumer|KafkaShareConsumer|RequestResponse|Fetcher|FetchRequestManager|ShareConsumeRequestManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 10269e255072..d54df2533173 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -54,7 +54,7 @@ public class RequestManagers implements Closeable { public final OffsetsRequestManager offsetsRequestManager; public final TopicMetadataRequestManager topicMetadataRequestManager; public final FetchRequestManager fetchRequestManager; - public final ShareFetchRequestManager shareFetchRequestManager; + public final Optional shareConsumeRequestManager; private final List> entries; private final IdempotentCloser closer = new IdempotentCloser(); @@ -72,7 +72,7 @@ public RequestManagers(LogContext logContext, this.commitRequestManager = commitRequestManager; this.topicMetadataRequestManager = topicMetadataRequestManager; this.fetchRequestManager = fetchRequestManager; - this.shareFetchRequestManager = null; + this.shareConsumeRequestManager = null; this.heartbeatRequestManager = heartbeatRequestManager; this.shareHeartbeatRequestManager = Optional.empty(); this.membershipManager = membershipManager; @@ -90,12 +90,12 @@ public RequestManagers(LogContext logContext, } public RequestManagers(LogContext logContext, - ShareFetchRequestManager shareFetchRequestManager, + ShareConsumeRequestManager shareConsumeRequestManager, Optional coordinatorRequestManager, Optional shareHeartbeatRequestManager, Optional shareMembershipManager) { this.log = logContext.logger(RequestManagers.class); - this.shareFetchRequestManager = shareFetchRequestManager; + this.shareConsumeRequestManager = Optional.of(shareConsumeRequestManager); this.coordinatorRequestManager = coordinatorRequestManager; this.commitRequestManager = Optional.empty(); this.heartbeatRequestManager = Optional.empty(); @@ -110,7 +110,7 @@ public RequestManagers(LogContext logContext, list.add(coordinatorRequestManager); list.add(shareHeartbeatRequestManager); list.add(shareMembershipManager); - list.add(Optional.of(shareFetchRequestManager)); + list.add(Optional.of(shareConsumeRequestManager)); entries = Collections.unmodifiableList(list); } @@ -299,7 +299,7 @@ protected RequestManagers create() { shareMembershipManager, backgroundEventHandler, metrics); - ShareFetchRequestManager shareFetchRequestManager = new ShareFetchRequestManager( + ShareConsumeRequestManager shareConsumeRequestManager = new ShareConsumeRequestManager( logContext, groupRebalanceConfig.groupId, metadata, @@ -307,11 +307,11 @@ protected RequestManagers create() { fetchConfig, fetchBuffer, shareFetchMetricsManager); - shareMembershipManager.registerStateListener(shareFetchRequestManager); + shareMembershipManager.registerStateListener(shareConsumeRequestManager); return new RequestManagers( logContext, - shareFetchRequestManager, + shareConsumeRequestManager, Optional.of(coordinator), Optional.of(shareHeartbeatRequestManager), Optional.of(shareMembershipManager) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java similarity index 56% rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchRequestManager.java rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index f18b50af0bed..019b76c6a40c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -41,20 +41,22 @@ import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; -import java.util.function.Predicate; import java.util.stream.Collectors; /** - * {@code ShareFetchRequestManager} is responsible for generating {@link ShareFetchRequest} that - * represent the {@link SubscriptionState#fetchablePartitions(Predicate)} based on the share group - * consumer's assignment. It also uses {@link ShareAcknowledgeRequest} to close the share session. + * {@code ShareConsumeRequestManager} is responsible for generating {@link ShareFetchRequest} and + * {@link ShareAcknowledgeRequest} to fetch and acknowledge records being delivered for a consumer + * in a share group. */ -public class ShareFetchRequestManager implements RequestManager, MemberStateListener { +public class ShareConsumeRequestManager implements RequestManager, MemberStateListener { private final Logger log; private final LogContext logContext; @@ -65,18 +67,20 @@ public class ShareFetchRequestManager implements RequestManager, MemberStateList protected final ShareFetchBuffer shareFetchBuffer; private final Map sessionHandlers; private final Set nodesWithPendingRequests; + private final List pendingRequests; private final ShareFetchMetricsManager metricsManager; private final IdempotentCloser idempotentCloser = new IdempotentCloser(); private Uuid memberId; - - ShareFetchRequestManager(final LogContext logContext, - final String groupId, - final ConsumerMetadata metadata, - final SubscriptionState subscriptions, - final FetchConfig fetchConfig, - final ShareFetchBuffer shareFetchBuffer, - final ShareFetchMetricsManager metricsManager) { - this.log = logContext.logger(ShareFetchRequestManager.class); + private boolean fetchMoreRecords = false; + + ShareConsumeRequestManager(final LogContext logContext, + final String groupId, + final ConsumerMetadata metadata, + final SubscriptionState subscriptions, + final FetchConfig fetchConfig, + final ShareFetchBuffer shareFetchBuffer, + final ShareFetchMetricsManager metricsManager) { + this.log = logContext.logger(ShareConsumeRequestManager.class); this.logContext = logContext; this.groupId = groupId; this.metadata = metadata; @@ -86,6 +90,7 @@ public class ShareFetchRequestManager implements RequestManager, MemberStateList this.metricsManager = metricsManager; this.sessionHandlers = new HashMap<>(); this.nodesWithPendingRequests = new HashSet<>(); + this.pendingRequests = new LinkedList<>(); } @Override @@ -94,6 +99,16 @@ public PollResult poll(long currentTimeMs) { return PollResult.EMPTY; } + if (!pendingRequests.isEmpty()) { + List inFlightRequests = new LinkedList<>(pendingRequests); + pendingRequests.clear(); + return new PollResult(inFlightRequests); + } + + if (!fetchMoreRecords) { + return PollResult.EMPTY; + } + Map handlerMap = new HashMap<>(); Map topicIds = metadata.topicIds(); @@ -160,6 +175,12 @@ public PollResult pollOnClose() { return PollResult.EMPTY; } + if (!pendingRequests.isEmpty()) { + List inFlightRequests = new LinkedList<>(pendingRequests); + pendingRequests.clear(); + return new PollResult(inFlightRequests); + } + final Cluster cluster = metadata.fetch(); Map handlerMap = new HashMap<>(); @@ -211,6 +232,75 @@ public PollResult pollOnClose() { return new PollResult(requests); } + public void fetch() { + if (!fetchMoreRecords) { + log.debug("Fetch more data"); + fetchMoreRecords = true; + } + } + + public CompletableFuture commitSync(final long retryExpirationTimeMs) { + final CompletableFuture result = new CompletableFuture<>(); + + // Build a list of ShareAcknowledge requests to be picked up on the next poll + final Cluster cluster = metadata.fetch(); + + Map handlerMap = new HashMap<>(); + + sessionHandlers.forEach((nodeId, sessionHandler) -> { + Node node = cluster.nodeById(nodeId); + if (node != null) { + for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { + Acknowledgements acknowledgements = shareFetchBuffer.getAcknowledgementsToSend(tip); + if (acknowledgements != null) { + metricsManager.recordAcknowledgementSent(acknowledgements.size()); + sessionHandler.addPartitionToFetch(tip, acknowledgements); + handlerMap.put(node, sessionHandler); + + log.debug("Added acknowledge request for partition {} to node {}", tip.topicPartition(), node); + } + } + } + }); + + Map builderMap = new LinkedHashMap<>(); + for (Map.Entry entry : handlerMap.entrySet()) { + Node target = entry.getKey(); + ShareSessionHandler handler = entry.getValue(); + ShareAcknowledgeRequest.Builder builder = handler.newShareAcknowledgeBuilder(groupId, fetchConfig); + if (builder != null) { + builderMap.put(target, builder); + } + } + + final AtomicInteger inFlightRequestCount = new AtomicInteger(); + + List requests = builderMap.entrySet().stream().map(entry -> { + Node target = entry.getKey(); + ShareAcknowledgeRequest.Builder requestBuilder = entry.getValue(); + + nodesWithPendingRequests.add(target.id()); + + BiConsumer responseHandler = (clientResponse, error) -> { + if (error != null) { + handleShareAcknowledgeFailure(target, requestBuilder.data(), inFlightRequestCount, result, error); + } else { + handleShareAcknowledgeSuccess(target, requestBuilder.data(), inFlightRequestCount, result, clientResponse); + } + }; + return new UnsentRequest(requestBuilder, Optional.of(target)).whenComplete(responseHandler); + }).collect(Collectors.toList()); + + if (requests.isEmpty()) { + result.complete(null); + } else { + inFlightRequestCount.set(requests.size()); + pendingRequests.addAll(requests); + } + + return result; + } + private void handleShareFetchSuccess(Node fetchTarget, ShareFetchRequestData requestData, ClientResponse resp) { @@ -230,7 +320,6 @@ private void handleShareFetchSuccess(Node fetchTarget, if (response.error() == Errors.UNKNOWN_TOPIC_ID) { metadata.requestUpdate(false); } - return; } @@ -239,8 +328,8 @@ private void handleShareFetchSuccess(Node fetchTarget, response.data().responses().forEach(topicResponse -> topicResponse.partitions().forEach(partition -> responseData.put(new TopicIdPartition(topicResponse.topicId(), - partition.partitionIndex(), - metadata.topicNames().get(topicResponse.topicId())), partition))); + partition.partitionIndex(), + metadata.topicNames().get(topicResponse.topicId())), partition))); final Set partitions = responseData.keySet().stream().map(TopicIdPartition::topicPartition).collect(Collectors.toSet()); final ShareFetchMetricsAggregator shareFetchMetricsAggregator = new ShareFetchMetricsAggregator(metricsManager, partitions); @@ -265,6 +354,10 @@ private void handleShareFetchSuccess(Node fetchTarget, shareFetchMetricsAggregator, requestVersion); shareFetchBuffer.add(completedFetch); + + if (!partitionData.acquiredRecords().isEmpty()) { + fetchMoreRecords = false; + } } metricsManager.recordLatency(resp.requestLatencyMs()); @@ -283,18 +376,73 @@ private void handleShareFetchFailure(Node fetchTarget, handler.handleError(error); } - requestData.topics().forEach(topic -> { - topic.partitions().forEach(partition -> { - TopicIdPartition tip = new TopicIdPartition(topic.topicId(), - partition.partitionIndex(), - metadata.topicNames().get(topic.topicId())); + requestData.topics().forEach(topic -> topic.partitions().forEach(partition -> { + TopicIdPartition tip = new TopicIdPartition(topic.topicId(), + partition.partitionIndex(), + metadata.topicNames().get(topic.topicId())); + metricsManager.recordFailedAcknowledgements(shareFetchBuffer.getPendingAcknowledgementsCount(tip)); + shareFetchBuffer.handleAcknowledgementResponses(tip, Errors.forException(error)); + })); + } finally { + log.debug("Removing pending request for node {} - failed", fetchTarget); + nodesWithPendingRequests.remove(fetchTarget.id()); + } + } + + private void handleShareAcknowledgeSuccess(Node fetchTarget, + ShareAcknowledgeRequestData requestData, + AtomicInteger inFlightRequestCount, + CompletableFuture future, + ClientResponse resp) { + try { + final ShareAcknowledgeResponse response = (ShareAcknowledgeResponse) resp.responseBody(); + + response.data().responses().forEach(topic -> topic.partitions().forEach(partition -> { + TopicIdPartition tip = new TopicIdPartition(topic.topicId(), + partition.partitionIndex(), + metadata.topicNames().get(topic.topicId())); + if (partition.errorCode() != 0) { metricsManager.recordFailedAcknowledgements(shareFetchBuffer.getPendingAcknowledgementsCount(tip)); - shareFetchBuffer.handleAcknowledgementResponses(tip, Errors.forException(error)); - }); - }); + } + shareFetchBuffer.handleAcknowledgementResponses(tip, Errors.forCode(partition.errorCode())); + })); + + metricsManager.recordLatency(resp.requestLatencyMs()); + } finally { + log.debug("Removing pending request for node {} - success", fetchTarget); + nodesWithPendingRequests.remove(fetchTarget.id()); + + if (inFlightRequestCount.decrementAndGet() == 0) { + future.complete(null); + } + } + } + + private void handleShareAcknowledgeFailure(Node fetchTarget, + ShareAcknowledgeRequestData requestData, + AtomicInteger inFlightRequestCount, + CompletableFuture future, + Throwable error) { + try { + final ShareSessionHandler handler = sessionHandler(fetchTarget.id()); + if (handler != null) { + handler.handleError(error); + } + + requestData.topics().forEach(topic -> topic.partitions().forEach(partition -> { + TopicIdPartition tip = new TopicIdPartition(topic.topicId(), + partition.partitionIndex(), + metadata.topicNames().get(topic.topicId())); + metricsManager.recordAcknowledgementSent(shareFetchBuffer.getPendingAcknowledgementsCount(tip)); + shareFetchBuffer.handleAcknowledgementResponses(tip, Errors.forException(error)); + })); } finally { log.debug("Removing pending request for node {} - failed", fetchTarget); nodesWithPendingRequests.remove(fetchTarget.id()); + + if (inFlightRequestCount.decrementAndGet() == 0) { + future.complete(null); + } } } @@ -304,17 +452,15 @@ private void handleShareAcknowledgeCloseSuccess(Node fetchTarget, try { final ShareAcknowledgeResponse response = (ShareAcknowledgeResponse) resp.responseBody(); - response.data().responses().forEach(topic -> { - topic.partitions().forEach(partition -> { - TopicIdPartition tip = new TopicIdPartition(topic.topicId(), - partition.partitionIndex(), - metadata.topicNames().get(topic.topicId())); - if (partition.errorCode() != 0) { - metricsManager.recordFailedAcknowledgements(shareFetchBuffer.getPendingAcknowledgementsCount(tip)); - } - shareFetchBuffer.handleAcknowledgementResponses(tip, Errors.forCode(partition.errorCode())); - }); - }); + response.data().responses().forEach(topic -> topic.partitions().forEach(partition -> { + TopicIdPartition tip = new TopicIdPartition(topic.topicId(), + partition.partitionIndex(), + metadata.topicNames().get(topic.topicId())); + if (partition.errorCode() != 0) { + metricsManager.recordFailedAcknowledgements(shareFetchBuffer.getPendingAcknowledgementsCount(tip)); + } + shareFetchBuffer.handleAcknowledgementResponses(tip, Errors.forCode(partition.errorCode())); + })); metricsManager.recordLatency(resp.requestLatencyMs()); } finally { @@ -333,15 +479,13 @@ private void handleShareAcknowledgeCloseFailure(Node fetchTarget, handler.handleError(error); } - requestData.topics().forEach(topic -> { - topic.partitions().forEach(partition -> { - TopicIdPartition tip = new TopicIdPartition(topic.topicId(), - partition.partitionIndex(), - metadata.topicNames().get(topic.topicId())); - metricsManager.recordAcknowledgementSent(shareFetchBuffer.getPendingAcknowledgementsCount(tip)); - shareFetchBuffer.handleAcknowledgementResponses(tip, Errors.forException(error)); - }); - }); + requestData.topics().forEach(topic -> topic.partitions().forEach(partition -> { + TopicIdPartition tip = new TopicIdPartition(topic.topicId(), + partition.partitionIndex(), + metadata.topicNames().get(topic.topicId())); + metricsManager.recordAcknowledgementSent(shareFetchBuffer.getPendingAcknowledgementsCount(tip)); + shareFetchBuffer.handleAcknowledgementResponses(tip, Errors.forException(error)); + })); } finally { log.debug("Removing pending request for node {} - failed", fetchTarget); nodesWithPendingRequests.remove(fetchTarget.id()); @@ -373,4 +517,53 @@ public void close() { public void onMemberEpochUpdated(Optional memberEpochOpt, Optional memberIdOpt) { memberIdOpt.ifPresent(s -> memberId = Uuid.fromString(s)); } + + /** + * Represents a request to acknowledge delivery that can be retried or aborted. + * ** UNDER CONSTRUCTION ** + */ + static class AcknowledgeRequestState extends RequestState { + + /** + * The node to send the request to. + */ + private final int node; + + /** + * The map of acknowledgements to send + */ + private final Map acknowledgementsMap; + + /** + * Future with the result of the request. + */ + private final CompletableFuture future; + + /** + * Time until which the request should be retried if it fails with retriable + * errors. If not present, the request is triggered without waiting for a response or + * retrying. + */ + private final Optional expirationTimeMs; + + AcknowledgeRequestState(LogContext logContext, String owner, + long retryBackoffMs, long retryBackoffMaxMs, + Optional expirationTimeMs, + int node, + Map acknowledgementsMap) { + super(logContext, owner, retryBackoffMs, retryBackoffMaxMs); + this.expirationTimeMs = expirationTimeMs; + this.node = node; + this.acknowledgementsMap = acknowledgementsMap; + this.future = new CompletableFuture<>(); + } + + UnsentRequest buildRequest() { + return null; + } + + boolean retryTimeoutExpired(long currentTimeMs) { + return expirationTimeMs.isPresent() && expirationTimeMs.get() <= currentTimeMs; + } + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 6e7c7094c2f1..ea757bf97173 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -37,21 +37,25 @@ import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.PollEvent; +import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent; import org.apache.kafka.clients.consumer.internals.events.ShareLeaveOnCloseApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.SyncShareAcknowledgeEvent; import org.apache.kafka.clients.consumer.internals.metrics.KafkaShareConsumerMetrics; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; @@ -69,6 +73,7 @@ import java.util.Collection; import java.util.Collections; import java.util.ConcurrentModificationException; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -201,6 +206,7 @@ private enum AcknowledgementMode { private final SubscriptionState subscriptions; private final ConsumerMetadata metadata; private final Metrics metrics; + private final int defaultApiTimeoutMs; private volatile boolean closed = false; private final Optional clientTelemetryReporter; @@ -245,6 +251,7 @@ private enum AcknowledgementMode { this.log = logContext.logger(getClass()); log.debug("Initializing the Kafka share consumer"); + this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); this.time = time; List reporters = CommonClientConfigs.metricsReporters(clientId, config); this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); @@ -356,6 +363,7 @@ private enum AcknowledgementMode { this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer); this.subscriptions = subscriptions; this.metadata = metadata; + this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); this.fetchBuffer = new ShareFetchBuffer(logContext); ShareConsumerMetrics metricsRegistry = new ShareConsumerMetrics(CONSUMER_SHARE_METRIC_GROUP_PREFIX); @@ -583,10 +591,16 @@ private ShareFetch pollForFetches(final Timer timer) { } private ShareFetch collect() { - // Notify the network thread to wake up and start the next round of fetching - applicationEventHandler.wakeupNetworkThread(); + final ShareFetch fetch = fetchCollector.collect(fetchBuffer); + if (fetch.isEmpty()) { + // Make sure the network thread can tell the application is actively polling + applicationEventHandler.add(new ShareFetchEvent()); - return fetchCollector.collect(fetchBuffer); + // Notify the network thread to wake up and start the next round of fetching + applicationEventHandler.wakeupNetworkThread(); + } + + return fetch; } /** @@ -620,7 +634,7 @@ public void acknowledge(final ConsumerRecord record, final AcknowledgeType */ @Override public Map> commitSync() { - throw new UnsupportedOperationException(); + return this.commitSync(Duration.ofMillis(defaultApiTimeoutMs)); } /** @@ -628,7 +642,43 @@ public Map> commitSync() { */ @Override public Map> commitSync(final Duration timeout) { - throw new UnsupportedOperationException(); + acquireAndEnsureOpen(); + try { + // Handle any completed acknowledgements for which we already have the responses + handleCompletedAcknowledgements(); + + // Acknowledge the previously fetched records + sendAcknowledgements(); + + Timer requestTimer = time.timer(timeout.toMillis()); + SyncShareAcknowledgeEvent event = new SyncShareAcknowledgeEvent(requestTimer); + applicationEventHandler.add(event); + CompletableFuture commitFuture = event.future(); + wakeupTrigger.setActiveTask(commitFuture); + try { + Map> result = new HashMap<>(); + ConsumerUtils.getResult(commitFuture, requestTimer); + Map completedAcknowledgements = fetchBuffer.getCompletedAcknowledgements(); + completedAcknowledgements.forEach((tip, acks) -> { + Errors ackErrorCode = acks.getAcknowledgeErrorCode(); + if (ackErrorCode == null) { + result.put(tip, null); + } else { + ApiException exception = ackErrorCode.exception(); + if (exception == null) { + result.put(tip, null); + } else { + result.put(tip, Optional.of(ackErrorCode.exception())); + } + } + }); + return result; + } finally { + wakeupTrigger.clearTask(); + } + } finally { + release(); + } } /** @@ -856,6 +906,28 @@ private void maybeSendAcknowledgements() { } } + /** + * If the acknowledgement mode is IMPLICIT, acknowledges the current batch and puts them into the fetch + * buffer for the background thread to pick up. + * If the acknowledgement mode is EXPLICIT, puts any ready acknowledgements into the fetch buffer for the + * background thread to pick up. + */ + private void sendAcknowledgements() { + if (currentFetch != null) { + // If IMPLICIT, acknowledge all records and send + if ((acknowledgementMode == AcknowledgementMode.PENDING) || + (acknowledgementMode == AcknowledgementMode.IMPLICIT)) { + currentFetch.acknowledgeAll(AcknowledgeType.ACCEPT); + fetchBuffer.acknowledgementsReadyToSend(currentFetch.acknowledgementsByPartition()); + } else if (acknowledgementMode == AcknowledgementMode.EXPLICIT) { + // If EXPLICIT, send any acknowledgements which are ready + fetchBuffer.acknowledgementsReadyToSend(currentFetch.acknowledgementsByPartition()); + } + + currentFetch = null; + } + } + /** * Called to send any outstanding acknowledgements during close. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index 07b9daeb7ec8..cd14f2823a52 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -32,6 +32,7 @@ public enum Type { LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE, UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, COMMIT_ON_CLOSE, LEAVE_ON_CLOSE, + SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC, SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE, SHARE_LEAVE_ON_CLOSE } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index a27c56da99ba..f4d0564ad75a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; import org.apache.kafka.clients.consumer.internals.MembershipManager; import org.apache.kafka.clients.consumer.internals.RequestManagers; +import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager; import org.apache.kafka.clients.consumer.internals.ShareMembershipManager; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; @@ -136,6 +137,14 @@ public void process(ApplicationEvent event) { process((LeaveOnCloseEvent) event); return; + case SHARE_FETCH: + process((ShareFetchEvent) event); + return; + + case SHARE_ACKNOWLEDGE_SYNC: + process((SyncShareAcknowledgeEvent) event); + return; + case SHARE_SUBSCRIPTION_CHANGE: process((ShareSubscriptionChangeApplicationEvent) event); return; @@ -305,6 +314,26 @@ private void process(final LeaveOnCloseEvent event) { future.whenComplete(complete(event.future())); } + /** + * Process event that tells the share consume request manager to fetch more records. + */ + private void process(final ShareFetchEvent event) { + requestManagers.shareConsumeRequestManager.ifPresent(scrm -> scrm.fetch()); + } + + /** + * Process event that indicates the consumer acknowledged delivery of records synchronously. + */ + private void process(final SyncShareAcknowledgeEvent event) { + if (!requestManagers.shareConsumeRequestManager.isPresent()) { + return; + } + + ShareConsumeRequestManager manager = requestManagers.shareConsumeRequestManager.get(); + CompletableFuture future = manager.commitSync(event.deadlineMs()); + future.whenComplete(complete(event.future())); + } + /** * Process event that indicates that the subscription changed for a share group. This will make the * consumer join the share group if it is not part of it yet, or send the updated subscription if diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncShareAcknowledgeEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncShareAcknowledgeEvent.java new file mode 100644 index 000000000000..cb4de5c8f246 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncShareAcknowledgeEvent.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.common.utils.Timer; + +public class AsyncShareAcknowledgeEvent extends ShareAcknowledgeEvent { + + public AsyncShareAcknowledgeEvent(final Timer timer) { + super(Type.SHARE_ACKNOWLEDGE_ASYNC, timer); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeEvent.java new file mode 100644 index 000000000000..3c9d9acbb3d2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeEvent.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.common.utils.Timer; + +public abstract class ShareAcknowledgeEvent extends CompletableApplicationEvent { + + public ShareAcknowledgeEvent(final Type type, final Timer timer) { + super(type, timer); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java new file mode 100644 index 000000000000..0f99514297a8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +public class ShareFetchEvent extends ApplicationEvent { + + public ShareFetchEvent() { + super(Type.SHARE_FETCH); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncShareAcknowledgeEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncShareAcknowledgeEvent.java new file mode 100644 index 000000000000..a181fdf6536d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncShareAcknowledgeEvent.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.common.utils.Timer; + +public class SyncShareAcknowledgeEvent extends ShareAcknowledgeEvent { + + public SyncShareAcknowledgeEvent(final Timer timer) { + super(Type.SHARE_ACKNOWLEDGE_SYNC, timer); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java similarity index 95% rename from clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchRequestManagerTest.java rename to clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index 22a57f55aaf0..23d2c542014d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -102,7 +102,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -public class ShareFetchRequestManagerTest { +public class ShareConsumeRequestManagerTest { private final String topicName = "test"; private final String groupId = "test-group"; @@ -129,7 +129,7 @@ public class ShareFetchRequestManagerTest { private ShareFetchMetricsManager metricsManager; private MockClient client; private Metrics metrics; - private TestableShareFetchRequestManager fetcher; + private TestableShareConsumeRequestManager fetcher; private TestableNetworkClientDelegate networkClientDelegate; private MemoryRecords records; private List acquiredRecords; @@ -649,12 +649,12 @@ private ShareFetchResponse fullFetchResponse(TopicIdPartition tp, } private ShareFetchResponse fullFetchResponse(TopicIdPartition tp, - MemoryRecords records, - List acquiredRecords, - Errors error, - Errors acknowledgeError) { + MemoryRecords records, + List acquiredRecords, + Errors error, + Errors acknowledgeError) { Map partitions = Collections.singletonMap(tp, - partitionData(tp, records, acquiredRecords, error, acknowledgeError)); + partitionData(tp, records, acquiredRecords, error, acknowledgeError)); return ShareFetchResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), Collections.emptyList()); } @@ -664,11 +664,11 @@ private ShareFetchResponseData.PartitionData partitionData(TopicIdPartition tp, Errors error, Errors acknowledgeError) { return new ShareFetchResponseData.PartitionData() - .setPartitionIndex(tp.topicPartition().partition()) - .setErrorCode(error.code()) - .setAcknowledgeErrorCode(acknowledgeError.code()) - .setRecords(records) - .setAcquiredRecords(acquiredRecords); + .setPartitionIndex(tp.topicPartition().partition()) + .setErrorCode(error.code()) + .setAcknowledgeErrorCode(acknowledgeError.code()) + .setRecords(records) + .setAcquiredRecords(acquiredRecords); } /** @@ -738,7 +738,7 @@ private void buildFetcher(MetricConfig metricConfig, subscriptions, fetchConfig, deserializers); - fetcher = spy(new TestableShareFetchRequestManager<>( + fetcher = spy(new TestableShareConsumeRequestManager<>( logContext, groupId, metadata, @@ -747,7 +747,7 @@ private void buildFetcher(MetricConfig metricConfig, new ShareFetchBuffer(logContext), metricsManager, shareFetchCollector - )); + )); } private void buildDependencies(MetricConfig metricConfig, @@ -771,18 +771,18 @@ private void buildDependencies(MetricConfig metricConfig, networkClientDelegate = spy(new TestableNetworkClientDelegate(time, config, logContext, client)); } - private class TestableShareFetchRequestManager extends ShareFetchRequestManager { + private class TestableShareConsumeRequestManager extends ShareConsumeRequestManager { private final ShareFetchCollector shareFetchCollector; - public TestableShareFetchRequestManager(LogContext logContext, - String groupId, - ConsumerMetadata metadata, - SubscriptionState subscriptions, - FetchConfig fetchConfig, - ShareFetchBuffer shareFetchBuffer, - ShareFetchMetricsManager metricsManager, - ShareFetchCollector fetchCollector) { + public TestableShareConsumeRequestManager(LogContext logContext, + String groupId, + ConsumerMetadata metadata, + SubscriptionState subscriptions, + FetchConfig fetchConfig, + ShareFetchBuffer shareFetchBuffer, + ShareFetchMetricsManager metricsManager, + ShareFetchCollector fetchCollector) { super(logContext, groupId, metadata, subscriptions, fetchConfig, shareFetchBuffer, metricsManager); this.shareFetchCollector = fetchCollector; onMemberEpochUpdated(Optional.empty(), Optional.of(Uuid.randomUuid().toString())); @@ -793,6 +793,7 @@ private ShareFetch collectFetch() { } private int sendFetches() { + fetch(); NetworkClientDelegate.PollResult pollResult = poll(time.milliseconds()); networkClientDelegate.addAll(pollResult.unsentRequests); return pollResult.unsentRequests.size(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerTestBuilder.java index 584794b86190..8a61827699c1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerTestBuilder.java @@ -79,7 +79,7 @@ public class ShareConsumerTestBuilder implements Closeable { final Timer pollTimer; final ShareFetchMetricsManager metricsManager; final NetworkClientDelegate networkClientDelegate; - final Optional shareFetchRequestManager; + final Optional shareConsumeRequestManager; final Optional coordinatorRequestManager; final Optional heartbeatRequestManager; final Optional shareMembershipManager; @@ -145,7 +145,7 @@ public ShareConsumerTestBuilder(GroupInformation groupInfo, boolean enableAutoTi client)); ShareFetchBuffer fetchBuffer = new ShareFetchBuffer(logContext); - ShareFetchRequestManager fetchRequestManager = spy(new ShareFetchRequestManager( + ShareConsumeRequestManager consumeRequestManager = spy(new ShareConsumeRequestManager( logContext, groupRebalanceConfig.groupId, metadata, @@ -193,7 +193,7 @@ public ShareConsumerTestBuilder(GroupInformation groupInfo, boolean enableAutoTi backgroundEventHandler, metrics)); - this.shareFetchRequestManager = Optional.of(fetchRequestManager); + this.shareConsumeRequestManager = Optional.of(consumeRequestManager); this.coordinatorRequestManager = Optional.of(coordinator); this.heartbeatRequestManager = Optional.of(heartbeat); this.heartbeatState = Optional.of(heartbeatState); @@ -201,7 +201,7 @@ public ShareConsumerTestBuilder(GroupInformation groupInfo, boolean enableAutoTi this.shareMembershipManager = Optional.of(membershipManager); this.requestManagers = new RequestManagers(logContext, - fetchRequestManager, + consumeRequestManager, coordinatorRequestManager, heartbeatRequestManager, shareMembershipManager); diff --git a/core/src/test/java/kafka/test/api/PlaintextShareConsumerTest.java b/core/src/test/java/kafka/test/api/PlaintextShareConsumerTest.java index 5ba9ea0876c8..6369967b4483 100644 --- a/core/src/test/java/kafka/test/api/PlaintextShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/PlaintextShareConsumerTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; @@ -56,6 +57,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -422,6 +424,26 @@ public void testExplicitAcknowledgeSuccess(String quorum) throws Exception { shareConsumer.close(); } + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"kraft+kip932"}) + public void testExplicitAcknowledgeCommitSuccess(String quorum) throws Exception { + ProducerRecord record = new ProducerRecord<>(tp().topic(), tp().partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer(), new Properties()); + producer.send(record); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), + new Properties(), CollectionConverters.asScala(Collections.emptyList()).toList()); + shareConsumer.subscribe(Collections.singleton(tp().topic())); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord)); + producer.send(record); + Map> result = shareConsumer.commitSync(); + assertEquals(1, result.size()); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + shareConsumer.close(); + } + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"kraft+kip932"}) public void testExplicitAcknowledgeReleasePollAccept(String quorum) throws Exception { @@ -513,6 +535,26 @@ public void testImplicitAcknowledgeFailsExplicit(String quorum) throws Exception shareConsumer.close(); } + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"kraft+kip932"}) + public void testImplicitAcknowledgeCommitSync(String quorum) throws Exception { + ProducerRecord record = new ProducerRecord<>(tp().topic(), tp().partition(), null, "key".getBytes(), "value".getBytes()); + KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer(), new Properties()); + producer.send(record); + KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), + new Properties(), CollectionConverters.asScala(Collections.emptyList()).toList()); + shareConsumer.subscribe(Collections.singleton(tp().topic())); + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + Map> result = shareConsumer.commitSync(); + assertEquals(1, result.size()); + result = shareConsumer.commitSync(); + assertEquals(0, result.size()); + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(0, records.count()); + shareConsumer.close(); + } + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"kraft+kip932"}) public void testFetchRecordLargerThanMaxPartitionFetchBytes(String quorum) throws Exception {