Skip to content

Commit

Permalink
KAFKA-7747; Check for truncation after leader changes [KIP-320] (#6371)
Browse files Browse the repository at this point in the history
After the client detects a leader change we need to check the offset of the current leader for truncation. These changes were part of KIP-320: https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation.

Reviewers: Jason Gustafson <[email protected]>
  • Loading branch information
mumrah authored and hachikuji committed Apr 21, 2019
1 parent e56ebbf commit 409fabc
Show file tree
Hide file tree
Showing 23 changed files with 1,574 additions and 240 deletions.
58 changes: 56 additions & 2 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
Expand Down Expand Up @@ -314,6 +315,8 @@ private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse,
if (metadata.isInternal())
internalTopics.add(metadata.topic());
for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {

// Even if the partition's metadata includes an error, we need to handle the update to catch new epochs
updatePartitionInfo(metadata.topic(), partitionMetadata, partitionInfo -> {
int epoch = partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH);
partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfo, epoch));
Expand Down Expand Up @@ -358,8 +361,8 @@ private void updatePartitionInfo(String topic,
}
}
} else {
// Old cluster format (no epochs)
lastSeenLeaderEpochs.clear();
// Handle old cluster formats as well as error responses where leader and epoch are missing
lastSeenLeaderEpochs.remove(tp);
partitionInfoConsumer.accept(MetadataResponse.partitionMetaToInfo(topic, partitionMetadata));
}
}
Expand Down Expand Up @@ -444,4 +447,55 @@ private MetadataRequestAndVersion(MetadataRequest.Builder requestBuilder,
}
}

public synchronized LeaderAndEpoch leaderAndEpoch(TopicPartition tp) {
return partitionInfoIfCurrent(tp)
.map(infoAndEpoch -> {
Node leader = infoAndEpoch.partitionInfo().leader();
return new LeaderAndEpoch(leader == null ? Node.noNode() : leader, Optional.of(infoAndEpoch.epoch()));
})
.orElse(new LeaderAndEpoch(Node.noNode(), lastSeenLeaderEpoch(tp)));
}

public static class LeaderAndEpoch {

public static final LeaderAndEpoch NO_LEADER_OR_EPOCH = new LeaderAndEpoch(Node.noNode(), Optional.empty());

public final Node leader;
public final Optional<Integer> epoch;

public LeaderAndEpoch(Node leader, Optional<Integer> epoch) {
this.leader = Objects.requireNonNull(leader);
this.epoch = Objects.requireNonNull(epoch);
}

public static LeaderAndEpoch noLeaderOrEpoch() {
return NO_LEADER_OR_EPOCH;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

LeaderAndEpoch that = (LeaderAndEpoch) o;

if (!leader.equals(that.leader)) return false;
return epoch.equals(that.epoch);
}

@Override
public int hashCode() {
int result = leader.hashCode();
result = 31 * result + epoch.hashCode();
return result;
}

@Override
public String toString() {
return "LeaderAndEpoch{" +
"leader=" + leader +
", epoch=" + epoch.map(Number::toString).orElse("absent") +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
Expand Down Expand Up @@ -69,6 +70,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -1508,7 +1510,20 @@ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, Of
*/
@Override
public void seek(TopicPartition partition, long offset) {
seek(partition, new OffsetAndMetadata(offset, null));
if (offset < 0)
throw new IllegalArgumentException("seek offset must not be a negative number");

acquireAndEnsureOpen();
try {
log.info("Seeking to offset {} for partition {}", offset, partition);
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
offset,
Optional.empty(), // This will ensure we skip validation
this.metadata.leaderAndEpoch(partition));
this.subscriptions.seek(partition, newPosition);
} finally {
release();
}
}

/**
Expand All @@ -1535,8 +1550,13 @@ public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)
} else {
log.info("Seeking to offset {} for partition {}", offset, partition);
}
Metadata.LeaderAndEpoch currentLeaderAndEpoch = this.metadata.leaderAndEpoch(partition);
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
offsetAndMetadata.offset(),
offsetAndMetadata.leaderEpoch(),
currentLeaderAndEpoch);
this.updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
this.subscriptions.seek(partition, offset);
this.subscriptions.seekAndValidate(partition, newPosition);
} finally {
release();
}
Expand Down Expand Up @@ -1658,9 +1678,9 @@ public long position(TopicPartition partition, final Duration timeout) {

Timer timer = time.timer(timeout);
do {
Long offset = this.subscriptions.position(partition);
if (offset != null)
return offset;
SubscriptionState.FetchPosition position = this.subscriptions.validPosition(partition);
if (position != null)
return position.offset;

updateFetchPositions(timer);
client.poll(timer);
Expand Down Expand Up @@ -2196,6 +2216,9 @@ private void close(long timeoutMs, boolean swallowException) {
* @return true iff the operation completed without timing out
*/
private boolean updateFetchPositions(final Timer timer) {
// If any partitions have been truncated due to a leader change, we need to validate the offsets
fetcher.validateOffsetsIfNeeded();

cachedSubscriptionHashAllFetchPositions = subscriptions.hasAllFetchPositions();
if (cachedSubscriptionHashAllFetchPositions) return true;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;

import java.util.Collections;
import java.util.Map;
import java.util.function.Function;

/**
* In the even of unclean leader election, the log will be truncated,
* previously committed data will be lost, and new data will be written
* over these offsets. When this happens, the consumer will detect the
* truncation and raise this exception (if no automatic reset policy
* has been defined) with the first offset to diverge from what the
* consumer read.
*/
public class LogTruncationException extends OffsetOutOfRangeException {

private final Map<TopicPartition, OffsetAndMetadata> divergentOffsets;

public LogTruncationException(Map<TopicPartition, OffsetAndMetadata> divergentOffsets) {
super(Utils.transformMap(divergentOffsets, Function.identity(), OffsetAndMetadata::offset));
this.divergentOffsets = Collections.unmodifiableMap(divergentOffsets);
}

/**
* Get the offsets for the partitions which were truncated. This is the first offset which is known to diverge
* from what the consumer read.
*/
public Map<TopicPartition, OffsetAndMetadata> divergentOffsets() {
return divergentOffsets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
*/
package org.apache.kafka.clients.consumer;

import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
Expand Down Expand Up @@ -189,13 +191,17 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
if (!subscriptions.isPaused(entry.getKey())) {
final List<ConsumerRecord<K, V>> recs = entry.getValue();
for (final ConsumerRecord<K, V> rec : recs) {
if (beginningOffsets.get(entry.getKey()) != null && beginningOffsets.get(entry.getKey()) > subscriptions.position(entry.getKey())) {
throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), subscriptions.position(entry.getKey())));
long position = subscriptions.position(entry.getKey()).offset;

if (beginningOffsets.get(entry.getKey()) != null && beginningOffsets.get(entry.getKey()) > position) {
throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), position));
}

if (assignment().contains(entry.getKey()) && rec.offset() >= subscriptions.position(entry.getKey())) {
if (assignment().contains(entry.getKey()) && rec.offset() >= position) {
results.computeIfAbsent(entry.getKey(), partition -> new ArrayList<>()).add(rec);
subscriptions.position(entry.getKey(), rec.offset() + 1);
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
rec.offset() + 1, rec.leaderEpoch(), new Metadata.LeaderAndEpoch(Node.noNode(), rec.leaderEpoch()));
subscriptions.position(entry.getKey(), newPosition);
}
}
}
Expand Down Expand Up @@ -290,12 +296,12 @@ public synchronized long position(TopicPartition partition) {
ensureNotClosed();
if (!this.subscriptions.isAssigned(partition))
throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
Long offset = this.subscriptions.position(partition);
if (offset == null) {
SubscriptionState.FetchPosition position = this.subscriptions.position(partition);
if (position == null) {
updateFetchPosition(partition);
offset = this.subscriptions.position(partition);
position = this.subscriptions.position(partition);
}
return offset;
return position.offset;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

public abstract class AsyncClient<T1, Req extends AbstractRequest, Resp extends AbstractResponse, T2> {

private final Logger log;
private final ConsumerNetworkClient client;

AsyncClient(ConsumerNetworkClient client, LogContext logContext) {
this.client = client;
this.log = logContext.logger(getClass());
}

public RequestFuture<T2> sendAsyncRequest(Node node, T1 requestData) {
AbstractRequest.Builder<Req> requestBuilder = prepareRequest(node, requestData);

return client.send(node, requestBuilder).compose(new RequestFutureAdapter<ClientResponse, T2>() {
@Override
@SuppressWarnings("unchecked")
public void onSuccess(ClientResponse value, RequestFuture<T2> future) {
Resp resp;
try {
resp = (Resp) value.responseBody();
} catch (ClassCastException cce) {
log.error("Could not cast response body", cce);
future.raise(cce);
return;
}
log.trace("Received {} {} from broker {}", resp.getClass().getSimpleName(), resp, node);
try {
future.complete(handleResponse(node, requestData, resp));
} catch (RuntimeException e) {
if (!future.isDone()) {
future.raise(e);
}
}
}

@Override
public void onFailure(RuntimeException e, RequestFuture<T2> future1) {
future1.raise(e);
}
});
}

protected Logger logger() {
return log;
}

protected abstract AbstractRequest.Builder<Req> prepareRequest(Node node, T1 requestData);

protected abstract T2 handleResponse(Node node, T1 requestData, Resp response);
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -507,10 +508,15 @@ public boolean refreshCommittedOffsetsIfNeeded(Timer timer) {

for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
final TopicPartition tp = entry.getKey();
final long offset = entry.getValue().offset();
log.info("Setting offset for partition {} to the committed offset {}", tp, offset);
final OffsetAndMetadata offsetAndMetadata = entry.getValue();
final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(tp);
final SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(),
new ConsumerMetadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.empty()));

log.info("Setting offset for partition {} to the committed offset {}", tp, position);
entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
this.subscriptions.seek(tp, offset);
this.subscriptions.seekAndValidate(tp, position);
}
return true;
}
Expand Down
Loading

0 comments on commit 409fabc

Please sign in to comment.