Skip to content

Commit

Permalink
KAFKA-8421: Still return data during rebalance (#7312)
Browse files Browse the repository at this point in the history
Not wait until updateAssignmentMetadataIfNeeded returns true, but only call it once with 0 timeout. Also do not return empty if in rebalance.

Trim the pre-fetched records after long polling since assignment may have been changed.

Also need to update SubscriptionState to retain the state in assignFromSubscribed if it already exists (similar to assignFromUser), so that we do not need the transition of INITIALIZING to FETCHING.

Unit test: this actually took me the most time :)

Reviewers: John Roesler <[email protected]>, Bill Bejeck <[email protected]>, Bruno Cadonna <[email protected]>, Sophie Blee-Goldman <[email protected]>, Jason Gustafson <[email protected]>, Richard Yu <[email protected]>, dengziming <[email protected]>
  • Loading branch information
guozhangwang authored Jan 9, 2020
1 parent e94f5dc commit 505e824
Show file tree
Hide file tree
Showing 14 changed files with 331 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public class CommitFailedException extends KafkaException {

private static final long serialVersionUID = 1L;

public CommitFailedException(final String message) {
super(message);
}

public CommitFailedException() {
super("Commit cannot be completed since the group has already " +
"rebalanced and assigned the partitions to another member. This means that the time " +
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;

Expand All @@ -37,6 +38,9 @@ public interface OffsetCommitCallback {
* @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
* This can only occur if you are using automatic group management with {@link KafkaConsumer#subscribe(Collection)},
* or if there is an active group with the same groupId which is using group management.
* @throws org.apache.kafka.common.errors.RebalanceInProgressException if the commit failed because
* it is in the middle of a rebalance. In such cases
* commit could be retried after the rebalance is completed with the {@link #poll(Duration)} call.
* @throws org.apache.kafka.common.errors.WakeupException if {@link KafkaConsumer#wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ public class RetriableCommitFailedException extends RetriableException {

private static final long serialVersionUID = 1L;

public static RetriableCommitFailedException withUnderlyingMessage(String additionalMessage) {
return new RetriableCommitFailedException("Offset commit failed with a retriable exception. " +
"You should retry committing the latest consumed offsets. " +
"The underlying error was: " + additionalMessage);
}

public RetriableCommitFailedException(Throwable t) {
super("Offset commit failed with a retriable exception. You should retry committing " +
"the latest consumed offsets.", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
public abstract class AbstractCoordinator implements Closeable {
public static final String HEARTBEAT_THREAD_PREFIX = "kafka-coordinator-heartbeat-thread";

private enum MemberState {
protected enum MemberState {
UNJOINED, // the client is not part of a group
REBALANCING, // the client has begun rebalancing
STABLE, // the client has joined and is sending heartbeats
Expand All @@ -130,11 +130,12 @@ private enum MemberState {
private MemberState state = MemberState.UNJOINED;
private HeartbeatThread heartbeatThread = null;
private RequestFuture<ByteBuffer> joinFuture = null;
private RequestFuture<Void> findCoordinatorFuture = null;
volatile private RuntimeException findCoordinatorException = null;
private Generation generation = Generation.NO_GENERATION;
private long lastRebalanceStartMs = -1L;
private long lastRebalanceEndMs = -1L;

private RequestFuture<Void> findCoordinatorFuture = null;

/**
* Initialize the coordination manager.
Expand Down Expand Up @@ -226,6 +227,11 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
return true;

do {
if (findCoordinatorException != null && !(findCoordinatorException instanceof RetriableException)) {
final RuntimeException fatalException = findCoordinatorException;
findCoordinatorException = null;
throw fatalException;
}
final RequestFuture<Void> future = lookupCoordinator();
client.poll(future, timer);

Expand Down Expand Up @@ -258,8 +264,20 @@ protected synchronized RequestFuture<Void> lookupCoordinator() {
if (node == null) {
log.debug("No broker available to send FindCoordinator request");
return RequestFuture.noBrokersAvailable();
} else
} else {
findCoordinatorFuture = sendFindCoordinatorRequest(node);
// remember the exception even after the future is cleared so that
// it can still be thrown by the ensureCoordinatorReady caller
findCoordinatorFuture.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {} // do nothing

@Override
public void onFailure(RuntimeException e) {
findCoordinatorException = e;
}
});
}
}
return findCoordinatorFuture;
}
Expand Down Expand Up @@ -436,9 +454,9 @@ boolean joinGroupIfNeeded(final Timer timer) {
resetJoinGroupFuture();
final RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException ||
exception instanceof MemberIdRequiredException)
exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException ||
exception instanceof MemberIdRequiredException)
continue;
else if (!future.isRetriable())
throw exception;
Expand Down Expand Up @@ -842,6 +860,10 @@ protected synchronized Generation generationIfStable() {
return generation;
}

protected synchronized boolean rebalanceInProgress() {
return this.state == MemberState.REBALANCING;
}

protected synchronized String memberId() {
return generation.memberId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
Expand Down Expand Up @@ -438,7 +439,7 @@ public boolean poll(Timer timer) {

invokeCompletedOffsetCommitCallbacks();

if (subscriptions.partitionsAutoAssigned()) {
if (subscriptions.hasAutoAssignedPartitions()) {
if (protocol == null) {
throw new IllegalStateException("User configured " + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG +
" to empty while trying to subscribe for group protocol to auto assign partitions");
Expand Down Expand Up @@ -706,7 +707,7 @@ public void onLeavePrepare() {
// we should reset assignment and trigger the callback before leaving group
Set<TopicPartition> droppedPartitions = new HashSet<>(subscriptions.assignedPartitions());

if (subscriptions.partitionsAutoAssigned() && !droppedPartitions.isEmpty()) {
if (subscriptions.hasAutoAssignedPartitions() && !droppedPartitions.isEmpty()) {
final Exception e;
if (generation() != Generation.NO_GENERATION) {
e = invokePartitionsRevoked(droppedPartitions);
Expand All @@ -727,7 +728,7 @@ public void onLeavePrepare() {
*/
@Override
public boolean rejoinNeededOrPending() {
if (!subscriptions.partitionsAutoAssigned())
if (!subscriptions.hasAutoAssignedPartitions())
return false;

// we need to rejoin if we performed the assignment and metadata has changed;
Expand Down Expand Up @@ -759,14 +760,24 @@ public boolean refreshCommittedOffsetsIfNeeded(Timer timer) {
final TopicPartition tp = entry.getKey();
final OffsetAndMetadata offsetAndMetadata = entry.getValue();
if (offsetAndMetadata != null) {
final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(tp);
final SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(),
leaderAndEpoch);

log.info("Setting offset for partition {} to the committed offset {}", tp, position);
// first update the epoch if necessary
entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
this.subscriptions.seekUnvalidated(tp, position);

// it's possible that the partition is no longer assigned when the response is received,
// so we need to ignore seeking if that's the case
if (this.subscriptions.isAssigned(tp)) {
final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(tp);
final SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(),
leaderAndEpoch);

this.subscriptions.seekUnvalidated(tp, position);

log.info("Setting offset for partition {} to the committed offset {}", tp, position);
} else {
log.info("Ignoring the returned {} since its partition {} is no longer assigned",
offsetAndMetadata, tp);
}
}
}
return true;
Expand Down Expand Up @@ -986,7 +997,7 @@ private void doAutoCommitOffsetsAsync() {

commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
if (exception != null) {
if (exception instanceof RetriableException) {
if (exception instanceof RetriableCommitFailedException) {
log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
exception);
nextAutoCommitTimer.updateAndReset(rebalanceConfig.retryBackoffMs);
Expand Down Expand Up @@ -1066,16 +1077,28 @@ private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, Of
}

final Generation generation;
if (subscriptions.partitionsAutoAssigned()) {
if (subscriptions.hasAutoAssignedPartitions()) {
generation = generationIfStable();
// if the generation is null, we are not part of an active group (and we expect to be).
// the only thing we can do is fail the commit and let the user rejoin the group in poll()
// the only thing we can do is fail the commit and let the user rejoin the group in poll().
if (generation == null) {
log.info("Failing OffsetCommit request since the consumer is not part of an active group");
return RequestFuture.failure(new CommitFailedException());

if (rebalanceInProgress()) {
// if the client knows it is already rebalancing, we can use RebalanceInProgressException instead of
// CommitFailedException to indicate this is not a fatal error
return RequestFuture.failure(new RebalanceInProgressException("Offset commit cannot be completed since the " +
"consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance " +
"by calling poll() and then retry the operation."));
} else {
return RequestFuture.failure(new CommitFailedException("Offset commit cannot be completed since the " +
"consumer is not part of an active group for auto partition assignment; it is likely that the consumer " +
"was kicked out of the group."));
}
}
} else
} else {
generation = Generation.NO_GENERATION;
}

OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
Expand Down Expand Up @@ -1148,15 +1171,18 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> futu
future.raise(error);
return;
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
/* Consumer never tries to commit offset in between join-group and sync-group,
/* Consumer should not try to commit offset in between join-group and sync-group,
* and hence on broker-side it is not expected to see a commit offset request
* during CompletingRebalance phase; if it ever happens then broker would return
* this error. In this case we should just treat as a fatal CommitFailed exception.
* However, we do not need to reset generations and just request re-join, such that
* if the caller decides to proceed and poll, it would still try to proceed and re-join normally.
* this error to indicate that we are still in the middle of a rebalance.
* In this case we would throw a RebalanceInProgressException,
* request re-join but do not reset generations. If the callers decide to retry they
* can go ahead and call poll to finish up the rebalance first, and then try commit again.
*/
requestRejoin();
future.raise(new CommitFailedException());
future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
"consumer group is executing a rebalance at the moment. You can try completing the rebalance " +
"by calling poll() and then retry commit again"));
return;
} else if (error == Errors.UNKNOWN_MEMBER_ID
|| error == Errors.ILLEGAL_GENERATION) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,16 @@
import java.util.concurrent.TimeUnit;

public class KafkaConsumerMetrics implements AutoCloseable {
private final Metrics metrics;
private final MetricName lastPollMetricName;
private final Sensor timeBetweenPollSensor;
private final Sensor pollIdleSensor;
private final Metrics metrics;
private long lastPollMs;
private long pollStartMs;
private long timeSinceLastPollMs;

public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
this.metrics = metrics;

String metricGroupName = metricGrpPrefix + "-metrics";
Measurable lastPoll = (mConfig, now) -> {
if (lastPollMs == 0L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private boolean changeSubscription(Set<String> topicsToSubscribe) {
* @param topics The topics to add to the group subscription
*/
synchronized boolean groupSubscribe(Collection<String> topics) {
if (!partitionsAutoAssigned())
if (!hasAutoAssignedPartitions())
throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
groupSubscription = new HashSet<>(groupSubscription);
return groupSubscription.addAll(topics);
Expand All @@ -224,15 +224,18 @@ public synchronized boolean assignFromUser(Set<TopicPartition> partitions) {

assignmentId++;

// update the subscribed topics
Set<String> manualSubscribedTopics = new HashSet<>();
Map<TopicPartition, TopicPartitionState> partitionToState = new HashMap<>();
for (TopicPartition partition : partitions) {
TopicPartitionState state = assignment.stateValue(partition);
if (state == null)
state = new TopicPartitionState();
partitionToState.put(partition, state);

manualSubscribedTopics.add(partition.topic());
}

this.assignment.set(partitionToState);
return changeSubscription(manualSubscribedTopics);
}
Expand Down Expand Up @@ -267,7 +270,7 @@ public synchronized boolean checkAssignmentMatchedSubscription(Collection<TopicP
* different from {@link #assignFromUser(Set)} which directly set the assignment from user inputs.
*/
public synchronized void assignFromSubscribed(Collection<TopicPartition> assignments) {
if (!this.partitionsAutoAssigned())
if (!this.hasAutoAssignedPartitions())
throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use");

Map<TopicPartition, TopicPartitionState> assignedPartitionStates = new HashMap<>(assignments.size());
Expand Down Expand Up @@ -322,7 +325,7 @@ synchronized boolean matchesSubscribedPattern(String topic) {
}

public synchronized Set<String> subscription() {
if (partitionsAutoAssigned())
if (hasAutoAssignedPartitions())
return this.subscription;
return Collections.emptySet();
}
Expand Down Expand Up @@ -416,7 +419,7 @@ synchronized List<TopicPartition> fetchablePartitions(Predicate<TopicPartition>
.collect(Collectors.toList());
}

synchronized boolean partitionsAutoAssigned() {
public synchronized boolean hasAutoAssignedPartitions() {
return this.subscriptionType == SubscriptionType.AUTO_TOPICS || this.subscriptionType == SubscriptionType.AUTO_PATTERN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ private long elapsedTimeMs(long currentTimeMs, long startTimeMs) {
return Math.max(0, currentTimeMs - startTimeMs);
}


private void checkTimeoutOfPendingRequests(long nowMs) {
ClientRequest request = requests.peek();
while (request != null && elapsedTimeMs(nowMs, request.createdTimeMs()) > request.requestTimeoutMs()) {
Expand Down Expand Up @@ -334,7 +333,6 @@ public void respond(RequestMatcher matcher, AbstractResponse response) {

// Utility method to enable out of order responses
public void respondToRequest(ClientRequest clientRequest, AbstractResponse response) {
AbstractRequest request = clientRequest.requestBuilder().build();
requests.remove(clientRequest);
short version = clientRequest.requestBuilder().latestAllowedVersion();
responses.add(new ClientResponse(clientRequest.makeHeader(version), clientRequest.callback(), clientRequest.destination(),
Expand Down
Loading

0 comments on commit 505e824

Please sign in to comment.