Skip to content

Commit

Permalink
KAFKA-8805; Bump producer epoch on recoverable errors (#7389)
Browse files Browse the repository at this point in the history
This change is the client-side part of KIP-360. It identifies cases where it is safe to abort a transaction, bump the producer epoch, and allow the application to continue without closing the producer. In these cases, when KafkaProducer.abortTransaction() is called, the producer sends an InitProducerId following the transaction abort, which causes the producer epoch to be bumped. The application can then start a new transaction and continue processing.

For recoverable errors in the idempotent producer, the epoch is bumped locally. In-flight requests for partitions with an error are rewritten to reflect the new epoch, and in-flights of all other partitions are allowed to complete using the old epoch. 

Reviewers: Boyang Chen <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
bob-barrett authored Feb 16, 2020
1 parent d8756e8 commit 937f1f7
Show file tree
Hide file tree
Showing 13 changed files with 1,449 additions and 432 deletions.
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

<!-- Clients -->
<suppress checks="ClassFanOutComplexity"
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin).java"/>
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin).java"/>
<suppress checks="ClassFanOutComplexity"
files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
<suppress checks="ClassFanOutComplexity"
Expand Down Expand Up @@ -59,7 +59,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>

<suppress checks="CyclomaticComplexity"
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator).java"/>
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager).java"/>

<suppress checks="JavaNCSS"
files="(AbstractRequest|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,10 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.transactionManager = configureTransactionState(config, logContext, log);
int deliveryTimeoutMs = configureDeliveryTimeout(config, log);

this.apiVersions = new ApiVersions();
this.transactionManager = configureTransactionState(config, logContext);
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
Expand Down Expand Up @@ -505,7 +505,8 @@ private static int configureDeliveryTimeout(ProducerConfig config, Logger log) {
return deliveryTimeoutMs;
}

private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) {
private TransactionManager configureTransactionState(ProducerConfig config,
LogContext logContext) {

TransactionManager transactionManager = null;

Expand All @@ -519,7 +520,8 @@ private static TransactionManager configureTransactionState(ProducerConfig confi
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs);
transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs,
retryBackoffMs, apiVersions);
if (transactionManager.isTransactional())
log.info("Instantiated a transactional producer.");
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequ
}

public void resetProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) {
log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}",
this.baseSequence(), this.topicPartition, baseSequence);
reopened = true;
recordsBuilder.reopenAndRewriteProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional);
}
Expand Down Expand Up @@ -454,6 +456,10 @@ public int baseSequence() {
return recordsBuilder.baseSequence();
}

public int lastSequence() {
return recordsBuilder.baseSequence() + recordsBuilder.numRecords() - 1;
}

public boolean hasSequence() {
return baseSequence() != RecordBatch.NO_SEQUENCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,11 +525,24 @@ private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
// we cannot send the batch until we have refreshed the producer id
return true;

if (!first.hasSequence() && transactionManager.hasUnresolvedSequence(first.topicPartition))
// Don't drain any new batches while the state of previous sequence numbers
// is unknown. The previous batches would be unknown if they were aborted
// on the client after being sent to the broker at least once.
return true;
if (!first.hasSequence()) {
if (transactionManager.hasInflightBatches(tp)) {
// Don't drain any new batches while the partition has in-flight batches with a different epoch
// and/or producer ID. Otherwise, a batch with a new epoch and sequence number
// 0 could be written before earlier batches complete, which would cause out of sequence errors
ProducerBatch firstInFlightBatch = transactionManager.nextBatchBySequence(tp);

if (firstInFlightBatch != null && !transactionManager.matchesProducerIdAndEpoch(firstInFlightBatch)) {
return true;
}
}

if (transactionManager.hasUnresolvedSequence(first.topicPartition))
// Don't drain any new batches while the state of previous sequence numbers
// is unknown. The previous batches would be unknown if they were aborted
// on the client after being sent to the broker at least once.
return true;
}

int firstInFlightSequence = transactionManager.firstInFlightSequence(first.topicPartition);
if (firstInFlightSequence != RecordBatch.NO_SEQUENCE && first.hasSequence()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
Expand Down Expand Up @@ -295,13 +294,7 @@ public void run() {
void runOnce() {
if (transactionManager != null) {
try {
if (transactionManager.isTransactional()
&& transactionManager.hasUnresolvedSequences()
&& !transactionManager.hasFatalError()) {
transactionManager.transitionToFatalError(
new KafkaException("The client hasn't received acknowledgment for " +
"some previously sent messages and can no longer retry them. It isn't safe to continue."));
}
transactionManager.maybeResolveSequences();

// do not continue sending if the transaction manager is in a failed state
if (transactionManager.hasFatalError()) {
Expand All @@ -314,14 +307,14 @@ void runOnce() {

// Check whether we need a new producerId. If so, we will enqueue an InitProducerId
// request which will be sent below
transactionManager.resetIdempotentProducerIdIfNeeded();
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();

if (maybeSendAndPollTransactionalRequest()) {
return;
}
} catch (AuthenticationException e) {
// This is already logged as error, but propagated here to perform any clean ups.
log.trace("Authentication exception while processing transactional request: {}", e);
log.trace("Authentication exception while processing transactional request", e);
transactionManager.authenticationFailed(e);
}
}
Expand Down Expand Up @@ -387,7 +380,7 @@ private long sendProducerData(long now) {
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
transactionManager.markSequenceUnresolved(expiredBatch);
}
}
sensors.updateProduceRequestMetrics(batches);
Expand Down Expand Up @@ -459,7 +452,7 @@ private boolean maybeSendAndPollTransactionalRequest() {
long currentTimeMs = time.milliseconds();
ClientRequest clientRequest = client.newClientRequest(
targetNode.idString(), requestBuilder, currentTimeMs, true, requestTimeoutMs, nextRequestHandler);
log.debug("Sending transactional request {} to node {}", requestBuilder, targetNode);
log.debug("Sending transactional request {} to node {} with correlation ID {}", requestBuilder, targetNode, clientRequest.correlationId());
client.send(clientRequest, currentTimeMs);
transactionManager.setInFlightCorrelationId(clientRequest.correlationId());
client.poll(retryBackoffMs, time.milliseconds());
Expand Down Expand Up @@ -521,6 +514,12 @@ private Node awaitNodeReady(FindCoordinatorRequest.CoordinatorType coordinatorTy
client.leastLoadedNode(time.milliseconds());

if (node != null && NetworkClientUtils.awaitReady(client, node, time, requestTimeoutMs)) {
if (coordinatorType == FindCoordinatorRequest.CoordinatorType.TRANSACTION) {
// Indicate to the transaction manager that the coordinator is ready, allowing it to check ApiVersions
// This allows us to bump transactional epochs even if the coordinator is temporarily unavailable at
// the time when the abortable error is handled
transactionManager.handleCoordinatorReady();
}
return node;
}
return null;
Expand Down Expand Up @@ -599,19 +598,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons
batch.topicPartition,
this.retries - batch.attempts() - 1,
error);
if (transactionManager == null) {
reenqueueBatch(batch, now);
} else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
// If idempotence is enabled only retry the request if the current producer id is the same as
// the producer id of the batch.
log.debug("Retrying batch to topic-partition {}. ProducerId: {}; Sequence number : {}",
batch.topicPartition, batch.producerId(), batch.baseSequence());
reenqueueBatch(batch, now);
} else {
failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +
"batch but the producer id changed from " + batch.producerId() + " to " +
transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."), false);
}
reenqueueBatch(batch, now);
} else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
// If we have received a duplicate sequence error, it means that the sequence number has advanced beyond
// the sequence of the current batch, and we haven't retained batch metadata on the broker to return
Expand Down Expand Up @@ -700,8 +687,9 @@ private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse
return !batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now) &&
batch.attempts() < this.retries &&
!batch.isDone() &&
((response.error.exception() instanceof RetriableException) ||
(transactionManager != null && transactionManager.canRetry(response, batch)));
(transactionManager == null ?
response.error.exception() instanceof RetriableException :
transactionManager.canRetry(response, batch));
}

/**
Expand Down
Loading

0 comments on commit 937f1f7

Please sign in to comment.