diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 2e8ad2cf7173b..07a6fb48a2a20 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -92,6 +92,11 @@ public interface Consumer extends Closeable { */ void commitSync(); + /** + * @see KafkaConsumer#commitSync(Duration) + */ + void commitSync(Duration timeout); + /** * @see KafkaConsumer#commitSync(Map) */ @@ -219,7 +224,7 @@ public interface Consumer extends Closeable { /** * @see KafkaConsumer#endOffsets(Collection, Duration) */ - Map endOffsets(Collection partitions, Duration timeoutMs); + Map endOffsets(Collection partitions, Duration timeout); /** * @see KafkaConsumer#close() diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index feaadd164ce6a..5bd6b935b3972 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1103,7 +1103,8 @@ public void assign(Collection partitions) { * partitions to consume from * * - * @deprecated Since 2.0. Use {@link #poll(Duration)} to poll for records. + * @deprecated Since 2.0. Use {@link #poll(Duration)}, which does not block beyond the timeout awaiting partition + * assignment. See KIP-266 for more information. */ @Deprecated @Override @@ -1119,6 +1120,11 @@ public ConsumerRecords poll(final long timeout) { * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed * offset for the subscribed list of partitions * + *

+ * This method returns immediately if there are records available. Otherwise, it will await the passed timeout. + * If the timeout expires, an empty record set will be returned. Note that this method may block beyond the + * timeout in order to execute custom {@link ConsumerRebalanceListener} callbacks. + * * * @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE} milliseconds) * @@ -1283,9 +1289,46 @@ private long remainingTimeAtLeastZero(final long timeoutMs, final long elapsedTi */ @Override public void commitSync() { + commitSync(Duration.ofMillis(Long.MAX_VALUE)); + } + + /** + * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and + * partitions. + *

+ * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after + * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API + * should not be used. + *

+ * This is a synchronous commits and will block until either the commit succeeds, an unrecoverable error is + * encountered (in which case it is thrown to the caller), or the passed timeout expires. + *

+ * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)} + * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method. + * + * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried. + * This can only occur if you are using automatic group management with {@link #subscribe(Collection)}, + * or if there is an active group with the same groupId which is using group management. + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId. See the exception for more details + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata + * is too large or if the topic does not exist). + * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion + * of the offset commit + */ + @Override + public void commitSync(Duration timeout) { acquireAndEnsureOpen(); try { - coordinator.commitOffsetsSync(subscriptions.allConsumed(), Long.MAX_VALUE); + if (!coordinator.commitOffsetsSync(subscriptions.allConsumed(), timeout.toMillis())) { + throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " + + "committing the current consumed offsets"); + } } finally { release(); } @@ -1355,14 +1398,15 @@ public void commitSync(final Map offsets) { * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata * is too large or if the topic does not exist). * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion - * of the offset commit + * of the offset commit */ @Override public void commitSync(final Map offsets, final Duration timeout) { acquireAndEnsureOpen(); try { if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), timeout.toMillis())) { - throw new TimeoutException("Committing offsets synchronously took too long."); + throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " + + "committing offsets " + offsets); } } finally { release(); @@ -1584,7 +1628,9 @@ public long position(TopicPartition partition, final Duration timeout) { offset = this.subscriptions.position(partition); finishMs = time.milliseconds(); } - if (offset == null) throw new TimeoutException("request timed out, position is unable to be acquired."); + if (offset == null) + throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position " + + "for partition " + partition + " could be determined"); return offset; } finally { release(); @@ -1640,7 +1686,8 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time Map offsets = coordinator.fetchCommittedOffsets( Collections.singleton(partition), timeout.toMillis()); if (offsets == null) { - throw new TimeoutException("Unable to find committed offsets for partition within set duration."); + throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " + + "committed offset for partition " + partition + " could be determined"); } return offsets.get(partition); } finally { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 3502156d4d844..cf1b07fabe21b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -251,6 +251,11 @@ public synchronized void commitSync() { commitSync(this.subscriptions.allConsumed()); } + @Override + public synchronized void commitSync(Duration timeout) { + commitSync(this.subscriptions.allConsumed()); + } + @Override public void commitSync(Map offsets, final Duration timeout) { commitSync(offsets); @@ -508,7 +513,7 @@ public Map beginningOffsets(Collection par } @Override - public Map endOffsets(Collection partitions, Duration duration) { + public Map endOffsets(Collection partitions, Duration timeout) { return endOffsets(partitions); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 93d408131174f..9c19af1703779 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -57,6 +57,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -87,29 +88,25 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private MetadataSnapshot assignmentSnapshot; private long nextAutoCommitDeadline; - // hold onto request&future for commited offset requests to enable async calls. + // hold onto request&future for committed offset requests to enable async calls. private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null; private static class PendingCommittedOffsetRequest { - private final Set request; - private final Generation generation; + private final Set requestedPartitions; + private final Generation requestedGeneration; private final RequestFuture> response; - private PendingCommittedOffsetRequest(final Set request, + private PendingCommittedOffsetRequest(final Set requestedPartitions, final Generation generationAtRequestTime, - final RequestFuture> response - ) { - if (request == null) throw new NullPointerException(); - if (response == null) throw new NullPointerException(); - - this.request = request; - this.generation = generationAtRequestTime; - this.response = response; + final RequestFuture> response) { + this.requestedPartitions = Objects.requireNonNull(requestedPartitions); + this.response = Objects.requireNonNull(response); + this.requestedGeneration = generationAtRequestTime; } private boolean sameRequest(final Set currentRequest, final Generation currentGeneration) { - return (generation == null ? currentGeneration == null : generation.equals(currentGeneration)) - && request.equals(currentRequest); + return (requestedGeneration == null ? currentGeneration == null : requestedGeneration.equals(currentGeneration)) + && requestedPartitions.equals(currentRequest); } } @@ -303,6 +300,7 @@ protected void onJoinComplete(int generation, */ public boolean poll(final long timeoutMs) { final long startTime = time.milliseconds(); + long currentTime = startTime; long elapsed = 0L; invokeCompletedOffsetCommitCallbacks(); @@ -312,7 +310,9 @@ public boolean poll(final long timeoutMs) { if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsed))) { return false; } - elapsed = time.milliseconds() - startTime; + currentTime = time.milliseconds(); + elapsed = currentTime - startTime; + } if (rejoinNeededOrPending()) { @@ -323,15 +323,18 @@ public boolean poll(final long timeoutMs) { if (!client.ensureFreshMetadata(remainingTimeAtLeastZero(timeoutMs, elapsed))) { return false; } - elapsed = time.milliseconds() - startTime; + currentTime = time.milliseconds(); + elapsed = currentTime - startTime; } if (!ensureActiveGroup(remainingTimeAtLeastZero(timeoutMs, elapsed))) { return false; } + + currentTime = time.milliseconds(); } - pollHeartbeat(startTime); + pollHeartbeat(currentTime); } else { // For manually assigned partitions, if there are no ready nodes, await metadata. // If connections to all nodes fail, wakeups triggered while attempting to send fetch @@ -345,10 +348,12 @@ public boolean poll(final long timeoutMs) { if (!metadataUpdated && !client.hasReadyNodes(time.milliseconds())) { return false; } + + currentTime = time.milliseconds(); } } - maybeAutoCommitOffsetsAsync(startTime); + maybeAutoCommitOffsetsAsync(currentTime); return true; } diff --git a/docs/upgrade.html b/docs/upgrade.html index 0954a250004a0..ba2d93024f948 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -60,7 +60,7 @@

Upgrading from 0.8.x, 0.9.x, 0.1
  • Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after. Similarly for the message format version.
  • If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguties. - Hot-swaping the jar-file only might not work.
  • + Hot-swapping the jar-file only might not work.
    Notable changes in 2.0.0
    @@ -91,6 +91,10 @@

    Notable changes in 2 internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter.schemas.enable=false +
  • KIP-266 adds overloads to the consumer to support + timeout behavior for blocking APIs. In particular, a new poll(Duration) API has been added which + does not block for dynamic partition assignment. The old poll(long) API has been deprecated and + will be removed in a future version.
  • New Protocol Versions