partition
* for example if there is no position yet, or if the end offset is not known yet.
*
*
- * This method uses locally cached metadata and never makes a remote call.
+ * This method uses locally cached metadata. If the log end offset is not known yet, it triggers a request to fetch
+ * the log end offset, but returns immediately.
*
* @param topicPartition The partition to get the lag for.
*
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
index 5ab804ccb7ae5..8d6390ca94da5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
@@ -274,11 +274,11 @@ public Fetch collectFetch() {
try {
while (recordsRemaining > 0) {
- if (nextInLineFetch == null || nextInLineFetch.isConsumed) {
+ if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {
CompletedFetch records = completedFetches.peek();
if (records == null) break;
- if (!records.initialized) {
+ if (!records.isInitialized()) {
try {
nextInLineFetch = initializeCompletedFetch(records);
} catch (Exception e) {
@@ -336,7 +336,7 @@ private Fetch fetchRecords(final int maxRecords) {
throw new IllegalStateException("Missing position for fetchable partition " + nextInLineFetch.partition);
}
- if (nextInLineFetch.nextFetchOffset == position.offset) {
+ if (nextInLineFetch.nextFetchOffset() == position.offset) {
List> partRecords = nextInLineFetch.fetchRecords(maxRecords);
log.trace("Returning {} fetched records at offset {} for assigned partition {}",
@@ -344,10 +344,10 @@ private Fetch fetchRecords(final int maxRecords) {
boolean positionAdvanced = false;
- if (nextInLineFetch.nextFetchOffset > position.offset) {
+ if (nextInLineFetch.nextFetchOffset() > position.offset) {
SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(
- nextInLineFetch.nextFetchOffset,
- nextInLineFetch.lastEpoch,
+ nextInLineFetch.nextFetchOffset(),
+ nextInLineFetch.lastEpoch(),
position.currentLeader);
log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`",
position, nextPosition, nextInLineFetch.partition, partRecords.size());
@@ -369,7 +369,7 @@ private Fetch fetchRecords(final int maxRecords) {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
- nextInLineFetch.partition, nextInLineFetch.nextFetchOffset, position);
+ nextInLineFetch.partition, nextInLineFetch.nextFetchOffset(), position);
}
}
@@ -381,7 +381,7 @@ private Fetch fetchRecords(final int maxRecords) {
private List fetchablePartitions() {
Set exclude = new HashSet<>();
- if (nextInLineFetch != null && !nextInLineFetch.isConsumed) {
+ if (nextInLineFetch != null && !nextInLineFetch.isConsumed()) {
exclude.add(nextInLineFetch.partition);
}
for (CompletedFetch completedFetch : completedFetches) {
@@ -528,7 +528,7 @@ private CompletedFetch initializeCompletedFetch(final CompletedFetch
private CompletedFetch handleInitializeCompletedFetchSuccess(final CompletedFetch completedFetch) {
final TopicPartition tp = completedFetch.partition;
- final long fetchOffset = completedFetch.nextFetchOffset;
+ final long fetchOffset = completedFetch.nextFetchOffset();
// we are interested in this fetch only if the beginning offset matches the
// current consumed position
@@ -586,14 +586,14 @@ private CompletedFetch handleInitializeCompletedFetchSuccess(final Complet
});
}
- completedFetch.initialized = true;
+ completedFetch.setInitialized();
return completedFetch;
}
private void handleInitializeCompletedFetchErrors(final CompletedFetch completedFetch,
final Errors error) {
final TopicPartition tp = completedFetch.partition;
- final long fetchOffset = completedFetch.nextFetchOffset;
+ final long fetchOffset = completedFetch.nextFetchOffset();
if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
error == Errors.REPLICA_NOT_AVAILABLE ||
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 9d23b9a9c2473..83672fbc080ad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -53,7 +53,8 @@ public class CommitRequestManager implements RequestManager {
// TODO: current in ConsumerConfig but inaccessible in the internal package.
private static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = "internal.throw.on.fetch.stable.offset.unsupported";
// TODO: We will need to refactor the subscriptionState
- private final SubscriptionState subscriptionState;
+ private final SubscriptionState subscriptions;
+ private final LogContext logContext;
private final Logger log;
private final Optional autoCommitState;
private final CoordinatorRequestManager coordinatorRequestManager;
@@ -66,11 +67,12 @@ public class CommitRequestManager implements RequestManager {
public CommitRequestManager(
final Time time,
final LogContext logContext,
- final SubscriptionState subscriptionState,
+ final SubscriptionState subscriptions,
final ConsumerConfig config,
final CoordinatorRequestManager coordinatorRequestManager,
final GroupState groupState) {
Objects.requireNonNull(coordinatorRequestManager, "Coordinator is needed upon committing offsets");
+ this.logContext = logContext;
this.log = logContext.logger(getClass());
this.pendingRequests = new PendingRequests();
if (config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
@@ -82,7 +84,7 @@ public CommitRequestManager(
}
this.coordinatorRequestManager = coordinatorRequestManager;
this.groupState = groupState;
- this.subscriptionState = subscriptionState;
+ this.subscriptions = subscriptions;
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
this.throwOnFetchStableOffsetUnsupported = config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
@@ -99,7 +101,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList());
}
- maybeAutoCommit(this.subscriptionState.allConsumed());
+ maybeAutoCommit(this.subscriptions.allConsumed());
if (!pendingRequests.hasUnsentRequests()) {
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList());
}
@@ -167,9 +169,9 @@ CompletableFuture sendAutoCommit(final Map {
if (t instanceof RetriableCommitFailedException) {
- log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", allConsumedOffsets, t);
+ log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", allConsumedOffsets, t.getMessage());
} else {
- log.warn("Asynchronous auto-commit of offsets {} failed: {}", allConsumedOffsets, t.getMessage());
+ log.warn("Asynchronous auto-commit of offsets {} failed", allConsumedOffsets, t);
}
return null;
});
@@ -241,7 +243,7 @@ public OffsetFetchRequestState(final Set partitions,
final GroupState.Generation generation,
final long retryBackoffMs,
final long retryBackoffMaxMs) {
- super(retryBackoffMs, retryBackoffMaxMs);
+ super(logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, retryBackoffMaxMs);
this.requestedPartitions = partitions;
this.requestedGeneration = generation;
this.future = new CompletableFuture<>();
@@ -366,6 +368,16 @@ private CompletableFuture