Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
rreddy-22 authored Sep 11, 2023
2 parents e6b5172 + b72d929 commit 3abcfd5
Show file tree
Hide file tree
Showing 131 changed files with 6,305 additions and 1,862 deletions.
2 changes: 1 addition & 1 deletion bin/kafka-get-offsets.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

exec $(dirname $0)/kafka-run-class.sh kafka.tools.GetOffsetShell "$@"
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.GetOffsetShell "$@"
2 changes: 1 addition & 1 deletion bin/windows/kafka-get-offsets.bat
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

"%~dp0kafka-run-class.bat" kafka.tools.GetOffsetShell %*
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.GetOffsetShell %*
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2915,6 +2915,10 @@ project(':connect:file') {

testRuntimeOnly libs.slf4jlog4j
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':connect:runtime')
testImplementation project(':connect:runtime').sourceSets.test.output
testImplementation project(':core')
testImplementation project(':core').sourceSets.test.output
}

javadoc {
Expand Down
4 changes: 4 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@
<allow pkg="javax.rmi.ssl"/>
<allow pkg="kafka.utils" />
<allow pkg="scala.collection" />

<subpackage name="reassign">
<allow pkg="kafka.admin" />
</subpackage>
</subpackage>

<subpackage name="trogdor">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.clients;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.utils.Time;

Expand Down Expand Up @@ -114,4 +115,21 @@ public static ClientResponse sendAndReceive(KafkaClient client, ClientRequest re

}
}

/**
* Check if the code is disconnected and unavailable for immediate reconnection (i.e. if it is in
* reconnect backoff window following the disconnect).
*/
public static boolean isUnavailable(KafkaClient client, Node node, Time time) {
return client.connectionFailed(node) && client.connectionDelay(node, time.milliseconds()) > 0;
}

/**
* Check for an authentication error on a given node and raise the exception if there is one.
*/
public static void maybeThrowAuthFailure(KafkaClient client, Node node) {
AuthenticationException exception = client.authenticationException(node);
if (exception != null)
throw exception;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

/**
* Identifies the endpoint type, as specified by KIP-919.
*/
public enum EndpointType {
UNKNOWN((byte) 0),
BROKER((byte) 1),
CONTROLLER((byte) 2);

private final byte id;

EndpointType(byte id) {
this.id = id;
}

public byte id() {
return id;
}

public static EndpointType fromId(byte id) {
if (id == BROKER.id) {
return BROKER;
} else if (id == CONTROLLER.id) {
return CONTROLLER;
} else {
return UNKNOWN;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ListOffsetsHandler(
this.offsetTimestampsByPartition = offsetTimestampsByPartition;
this.options = options;
this.log = logContext.logger(ListOffsetsHandler.class);
this.lookupStrategy = new PartitionLeaderStrategy(logContext);
this.lookupStrategy = new PartitionLeaderStrategy(logContext, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@ public class PartitionLeaderStrategy implements AdminApiLookupStrategy<TopicPart
};

private final Logger log;
private final boolean tolerateUnknownTopics;

public PartitionLeaderStrategy(LogContext logContext) {
this(logContext, true);
}

public PartitionLeaderStrategy(LogContext logContext, boolean tolerateUnknownTopics) {
this.log = logContext.logger(PartitionLeaderStrategy.class);
this.tolerateUnknownTopics = tolerateUnknownTopics;
}

@Override
Expand All @@ -64,6 +70,7 @@ public MetadataRequest.Builder buildRequest(Set<TopicPartition> partitions) {
return new MetadataRequest.Builder(request);
}

@SuppressWarnings("fallthrough")
private void handleTopicError(
String topic,
Errors topicError,
Expand All @@ -72,6 +79,12 @@ private void handleTopicError(
) {
switch (topicError) {
case UNKNOWN_TOPIC_OR_PARTITION:
if (!tolerateUnknownTopics) {
log.error("Received unknown topic error for topic {}", topic, topicError.exception());
failAllPartitionsForTopic(topic, requestPartitions, failed, tp -> topicError.exception(
"Failed to fetch metadata for partition " + tp + " because metadata for topic `" + topic + "` could not be found"));
break;
}
case LEADER_NOT_AVAILABLE:
case BROKER_NOT_AVAILABLE:
log.debug("Metadata request for topic {} returned topic-level error {}. Will retry",
Expand Down Expand Up @@ -124,6 +137,7 @@ private void handlePartitionError(
case LEADER_NOT_AVAILABLE:
case BROKER_NOT_AVAILABLE:
case KAFKA_STORAGE_ERROR:
case UNKNOWN_TOPIC_OR_PARTITION:
log.debug("Metadata request for partition {} returned partition-level error {}. Will retry",
topicPartition, partitionError);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,9 +639,9 @@ private void maybeOverrideClientId(Map<String, Object> configs) {
}
}

protected static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs,
Deserializer<?> keyDeserializer,
Deserializer<?> valueDeserializer) {
public static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs,
Deserializer<?> keyDeserializer,
Deserializer<?> valueDeserializer) {
// validate deserializer configuration, if the passed deserializer instance is null, the user must explicitly set a valid deserializer configuration value
Map<String, Object> newConfigs = new HashMap<>(configs);
if (keyDeserializer != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2219,7 +2219,8 @@ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partition
* for example if there is no position yet, or if the end offset is not known yet.
*
* <p>
* This method uses locally cached metadata and never makes a remote call.
* This method uses locally cached metadata. If the log end offset is not known yet, it triggers a request to fetch
* the log end offset, but returns immediately.
*
* @param topicPartition The partition to get the lag for.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,11 @@ public Fetch<K, V> collectFetch() {

try {
while (recordsRemaining > 0) {
if (nextInLineFetch == null || nextInLineFetch.isConsumed) {
if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {
CompletedFetch<K, V> records = completedFetches.peek();
if (records == null) break;

if (!records.initialized) {
if (!records.isInitialized()) {
try {
nextInLineFetch = initializeCompletedFetch(records);
} catch (Exception e) {
Expand Down Expand Up @@ -336,18 +336,18 @@ private Fetch<K, V> fetchRecords(final int maxRecords) {
throw new IllegalStateException("Missing position for fetchable partition " + nextInLineFetch.partition);
}

if (nextInLineFetch.nextFetchOffset == position.offset) {
if (nextInLineFetch.nextFetchOffset() == position.offset) {
List<ConsumerRecord<K, V>> partRecords = nextInLineFetch.fetchRecords(maxRecords);

log.trace("Returning {} fetched records at offset {} for assigned partition {}",
partRecords.size(), position, nextInLineFetch.partition);

boolean positionAdvanced = false;

if (nextInLineFetch.nextFetchOffset > position.offset) {
if (nextInLineFetch.nextFetchOffset() > position.offset) {
SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset,
nextInLineFetch.lastEpoch,
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(),
position.currentLeader);
log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`",
position, nextPosition, nextInLineFetch.partition, partRecords.size());
Expand All @@ -369,7 +369,7 @@ private Fetch<K, V> fetchRecords(final int maxRecords) {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
nextInLineFetch.partition, nextInLineFetch.nextFetchOffset, position);
nextInLineFetch.partition, nextInLineFetch.nextFetchOffset(), position);
}
}

Expand All @@ -381,7 +381,7 @@ private Fetch<K, V> fetchRecords(final int maxRecords) {

private List<TopicPartition> fetchablePartitions() {
Set<TopicPartition> exclude = new HashSet<>();
if (nextInLineFetch != null && !nextInLineFetch.isConsumed) {
if (nextInLineFetch != null && !nextInLineFetch.isConsumed()) {
exclude.add(nextInLineFetch.partition);
}
for (CompletedFetch<K, V> completedFetch : completedFetches) {
Expand Down Expand Up @@ -528,7 +528,7 @@ private CompletedFetch<K, V> initializeCompletedFetch(final CompletedFetch<K, V>

private CompletedFetch<K, V> handleInitializeCompletedFetchSuccess(final CompletedFetch<K, V> completedFetch) {
final TopicPartition tp = completedFetch.partition;
final long fetchOffset = completedFetch.nextFetchOffset;
final long fetchOffset = completedFetch.nextFetchOffset();

// we are interested in this fetch only if the beginning offset matches the
// current consumed position
Expand Down Expand Up @@ -586,14 +586,14 @@ private CompletedFetch<K, V> handleInitializeCompletedFetchSuccess(final Complet
});
}

completedFetch.initialized = true;
completedFetch.setInitialized();
return completedFetch;
}

private void handleInitializeCompletedFetchErrors(final CompletedFetch<K, V> completedFetch,
final Errors error) {
final TopicPartition tp = completedFetch.partition;
final long fetchOffset = completedFetch.nextFetchOffset;
final long fetchOffset = completedFetch.nextFetchOffset();

if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
error == Errors.REPLICA_NOT_AVAILABLE ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public class CommitRequestManager implements RequestManager {
// TODO: current in ConsumerConfig but inaccessible in the internal package.
private static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = "internal.throw.on.fetch.stable.offset.unsupported";
// TODO: We will need to refactor the subscriptionState
private final SubscriptionState subscriptionState;
private final SubscriptionState subscriptions;
private final LogContext logContext;
private final Logger log;
private final Optional<AutoCommitState> autoCommitState;
private final CoordinatorRequestManager coordinatorRequestManager;
Expand All @@ -66,11 +67,12 @@ public class CommitRequestManager implements RequestManager {
public CommitRequestManager(
final Time time,
final LogContext logContext,
final SubscriptionState subscriptionState,
final SubscriptionState subscriptions,
final ConsumerConfig config,
final CoordinatorRequestManager coordinatorRequestManager,
final GroupState groupState) {
Objects.requireNonNull(coordinatorRequestManager, "Coordinator is needed upon committing offsets");
this.logContext = logContext;
this.log = logContext.logger(getClass());
this.pendingRequests = new PendingRequests();
if (config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
Expand All @@ -82,7 +84,7 @@ public CommitRequestManager(
}
this.coordinatorRequestManager = coordinatorRequestManager;
this.groupState = groupState;
this.subscriptionState = subscriptionState;
this.subscriptions = subscriptions;
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
this.throwOnFetchStableOffsetUnsupported = config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
Expand All @@ -99,7 +101,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList());
}

maybeAutoCommit(this.subscriptionState.allConsumed());
maybeAutoCommit(this.subscriptions.allConsumed());
if (!pendingRequests.hasUnsentRequests()) {
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList());
}
Expand Down Expand Up @@ -167,9 +169,9 @@ CompletableFuture<ClientResponse> sendAutoCommit(final Map<TopicPartition, Offse
})
.exceptionally(t -> {
if (t instanceof RetriableCommitFailedException) {
log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", allConsumedOffsets, t);
log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", allConsumedOffsets, t.getMessage());
} else {
log.warn("Asynchronous auto-commit of offsets {} failed: {}", allConsumedOffsets, t.getMessage());
log.warn("Asynchronous auto-commit of offsets {} failed", allConsumedOffsets, t);
}
return null;
});
Expand Down Expand Up @@ -241,7 +243,7 @@ public OffsetFetchRequestState(final Set<TopicPartition> partitions,
final GroupState.Generation generation,
final long retryBackoffMs,
final long retryBackoffMaxMs) {
super(retryBackoffMs, retryBackoffMaxMs);
super(logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, retryBackoffMaxMs);
this.requestedPartitions = partitions;
this.requestedGeneration = generation;
this.future = new CompletableFuture<>();
Expand Down Expand Up @@ -366,6 +368,16 @@ private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> chainFuture(fi
}
});
}

@Override
public String toString() {
return "OffsetFetchRequestState{" +
"requestedPartitions=" + requestedPartitions +
", requestedGeneration=" + requestedGeneration +
", future=" + future +
", " + toStringBase() +
'}';
}
}

/**
Expand Down
Loading

0 comments on commit 3abcfd5

Please sign in to comment.