Skip to content

Commit

Permalink
KAFKA-9274: Gracefully handle timeout exception (#8060)
Browse files Browse the repository at this point in the history
1. Delay the initialization (producer.initTxn) from construction to maybeInitialize; if it times out we just swallow and retry in the next iteration.

2. If completeRestoration (consumer.committed) times out, just swallow and retry in the next iteration.

3. For other calls (producer.partitionsFor, producer.commitTxn, consumer.commit), treat the timeout exception as fatal.

Reviewers: Matthias J. Sax <[email protected]>
  • Loading branch information
guozhangwang authored Feb 15, 2020
1 parent 16ee326 commit d8756e8
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
Expand Down Expand Up @@ -749,7 +750,7 @@ public void testPartitionsForWithNullTopic() {
public void testInitTransactionTimeout() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 500);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");

Time time = new MockTime(1);
Expand All @@ -761,7 +762,18 @@ public void testInitTransactionTimeout() {

try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
new StringSerializer(), metadata, client, null, time)) {
client.prepareResponse(
request -> request instanceof FindCoordinatorRequest &&
((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
FindCoordinatorResponse.prepareResponse(Errors.NONE, host1));

assertThrows(TimeoutException.class, producer::initTransactions);

client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1));
client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));

// retry initialization should work
producer.initTransactions();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ <K, V> void send(final String topic,

void commit(final Map<TopicPartition, OffsetAndMetadata> offsets);

/**
* Initialize the internal {@link Producer}; note this function should be made idempotent
*
* @throws org.apache.kafka.common.errors.TimeoutException if producer initializing txn id timed out
*/
void initialize();

/**
* Flush the internal {@link Producer}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class RecordCollectorImpl implements RecordCollector {

// used when eosEnabled is true only
private boolean transactionInFlight = false;
private boolean transactionInitialized = false;
private Producer<byte[], byte[]> producer;
private volatile KafkaException sendException;

Expand All @@ -95,24 +97,30 @@ public RecordCollectorImpl(final TaskId taskId,
this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), streamsMetrics);

producer = producerSupplier.get(taskId);
}

@Override
public void initialize() {
maybeInitTxns();
}

private void maybeInitTxns() {
if (eosEnabled) {
if (eosEnabled && !transactionInitialized) {
// initialize transactions if eos is turned on, which will block if the previous transaction has not
// completed yet; do not start the first transaction until the topology has been initialized later
try {
producer.initTransactions();

transactionInitialized = true;
} catch (final TimeoutException exception) {
final String errorMessage = "Timeout exception caught when initializing transactions for task " + taskId + ". " +
log.warn("Timeout exception caught when initializing transactions for task {}. " +
"\nThe broker is either slow or in bad state (like not having enough replicas) in responding to the request, " +
"or the connection to broker was interrupted sending the request or receiving the response. " +
"\n Consider overwriting `max.block.ms` to a larger value to avoid timeout errors";
"Would retry initializing the task in the next loop." +
"\nConsider overwriting producer config {} to a larger value to avoid timeout errors",
ProducerConfig.MAX_BLOCK_MS_CONFIG, taskId);

// TODO K9113: we do NOT try to handle timeout exception here but throw it as a fatal error
throw new StreamsException(errorMessage, exception);
throw exception;
} catch (final KafkaException exception) {
throw new StreamsException("Error encountered while initializing transactions for task " + taskId, exception);
}
Expand Down Expand Up @@ -163,7 +171,7 @@ public void commit(final Map<TopicPartition, OffsetAndMetadata> offsets) {
} catch (final ProducerFencedException error) {
throw new TaskMigratedException(taskId, "Producer get fenced trying to commit a transaction", error);
} catch (final TimeoutException error) {
// TODO K9113: currently handle timeout exception as a fatal error, should discuss whether we want to handle it
// TODO KIP-447: we can consider treating it as non-fatal and retry on the thread level
throw new StreamsException("Timed out while committing transaction via producer for task " + taskId, error);
} catch (final KafkaException error) {
throw new StreamsException("Error encountered sending offsets and committing transaction " +
Expand All @@ -176,7 +184,7 @@ public void commit(final Map<TopicPartition, OffsetAndMetadata> offsets) {
throw new TaskMigratedException(taskId, "Consumer committing offsets failed, " +
"indicating the corresponding thread is no longer part of the group.", error);
} catch (final TimeoutException error) {
// TODO K9113: currently handle timeout exception as a fatal error
// TODO KIP-447: we can consider treating it as non-fatal and retry on the thread level
throw new StreamsException("Timed out while committing offsets via consumer for task " + taskId, error);
} catch (final KafkaException error) {
throw new StreamsException("Error encountered committing offsets via consumer for task " + taskId, error);
Expand Down Expand Up @@ -244,9 +252,16 @@ public <K, V> void send(final String topic,
final StreamPartitioner<? super K, ? super V> partitioner) {
final Integer partition;

// TODO K9113: we need to decide how to handle exceptions from partitionsFor
if (partitioner != null) {
final List<PartitionInfo> partitions = producer.partitionsFor(topic);
final List<PartitionInfo> partitions;
try {
partitions = producer.partitionsFor(topic);
} catch (final KafkaException e) {
// here we cannot drop the message on the floor even if it is a transient timeout exception,
// so we treat everything the same as a fatal exception
throw new StreamsException("Could not determine the number of partitions for topic '" + topic +
"' for task " + taskId + " due to " + e.toString());
}
if (partitions.size() > 0) {
partition = partitioner.partition(topic, key, value, partitions.size());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
Expand Down Expand Up @@ -186,11 +187,14 @@ public boolean isActive() {

/**
* @throws LockException could happen when multi-threads within the single instance, could retry
* @throws TimeoutException if initializing record collector timed out
* @throws StreamsException fatal error, should close the thread
*/
@Override
public void initializeIfNeeded() {
if (state() == State.CREATED) {
recordCollector.initialize();

StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory, processorContext);

transitionTo(State.RESTORING);
Expand All @@ -199,6 +203,9 @@ public void initializeIfNeeded() {
}
}

/**
* @throws TimeoutException if fetching committed offsets timed out
*/
@Override
public void completeRestoration() {
if (state() == State.RESTORING) {
Expand Down Expand Up @@ -612,6 +619,12 @@ private void initializeMetadata() {
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
initializeTaskTime(offsetsAndMetadata);
} catch (final TimeoutException e) {
log.warn("Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +
"\nConsider overwriting consumer config {} to a larger value to avoid timeout errors",
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);

throw e;
} catch (final KafkaException e) {
throw new StreamsException(format("task [%s] Failed to initialize offsets for %s", id, partitions), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
Expand Down Expand Up @@ -214,7 +215,7 @@ boolean checkForCompletedRestoration() {
if (task.state() == CREATED) {
try {
task.initializeIfNeeded();
} catch (final LockException e) {
} catch (final LockException | TimeoutException e) {
// it is possible that if there are multiple threads within the instance that one thread
// trying to grab the task from the other, while the other has not released the lock since
// it did not participate in the rebalance. In this case we can just retry in the next iteration
Expand All @@ -232,7 +233,13 @@ boolean checkForCompletedRestoration() {
final Set<TopicPartition> restored = changelogReader.completedChangelogs();
for (final Task task : restoringTasks) {
if (restored.containsAll(task.changelogPartitions())) {
task.completeRestoration();
try {
task.completeRestoration();
} catch (final TimeoutException e) {
log.debug("Cloud complete restoration for {} due to {}; will retry", task.id(), e.toString());

allRunning = false;
}
} else {
// we found a restoring task that isn't done restoring, which is evidence that
// not all tasks are running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,27 +454,27 @@ public synchronized Future<RecordMetadata> send(final ProducerRecord record, fin
}

@Test
public void shouldThrowStreamsExceptionOnEOSInitializeTimeout() {
public void shouldRethrowOnEOSInitializeTimeout() {
final KafkaException exception = new TimeoutException("KABOOM!");
final Properties props = StreamsTestUtils.getStreamsConfig("test");
props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

final StreamsException thrown = assertThrows(StreamsException.class, () ->
new RecordCollectorImpl(
taskId,
new StreamsConfig(props),
logContext,
streamsMetrics,
null,
id -> new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public void initTransactions() {
throw exception;
}
final RecordCollector recordCollector = new RecordCollectorImpl(
taskId,
new StreamsConfig(props),
logContext,
streamsMetrics,
null,
id -> new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public void initTransactions() {
throw exception;
}
)
}
);
assertEquals(exception, thrown.getCause());

final TimeoutException thrown = assertThrows(TimeoutException.class, recordCollector::initialize);
assertEquals(exception, thrown);
}

@Test
Expand All @@ -483,21 +483,21 @@ public void shouldThrowStreamsExceptionOnEOSInitializeError() {
final Properties props = StreamsTestUtils.getStreamsConfig("test");
props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

final StreamsException thrown = assertThrows(StreamsException.class, () ->
new RecordCollectorImpl(
taskId,
new StreamsConfig(props),
logContext,
streamsMetrics,
null,
id -> new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public void initTransactions() {
throw exception;
}
final RecordCollector recordCollector = new RecordCollectorImpl(
taskId,
new StreamsConfig(props),
logContext,
streamsMetrics,
null,
id -> new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public void initTransactions() {
throw exception;
}
)
}
);

final StreamsException thrown = assertThrows(StreamsException.class, recordCollector::initialize);
assertEquals(exception, thrown.getCause());
}

Expand Down Expand Up @@ -625,6 +625,7 @@ public void sendOffsetsToTransaction(final Map<TopicPartition, OffsetAndMetadata
}
}
);
collector.initialize();

assertThrows(TaskMigratedException.class, () -> collector.commit(null));
}
Expand All @@ -646,6 +647,7 @@ public void commitTransaction() {
}
}
);
collector.initialize();

assertThrows(TaskMigratedException.class, () -> collector.commit(Collections.emptyMap()));
}
Expand Down Expand Up @@ -688,6 +690,7 @@ public void sendOffsetsToTransaction(final Map<TopicPartition, OffsetAndMetadata
}
}
);
collector.initialize();

assertThrows(StreamsException.class, () -> collector.commit(null));
}
Expand All @@ -709,6 +712,7 @@ public void commitTransaction() {
}
}
);
collector.initialize();

assertThrows(StreamsException.class, () -> collector.commit(Collections.emptyMap()));
}
Expand Down Expand Up @@ -780,7 +784,7 @@ public void abortTransaction() {
}
}
);

collector.initialize();
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
collector.commit(Collections.emptyMap());

Expand All @@ -807,7 +811,7 @@ public void abortTransaction() {
}
}
);

collector.initialize();
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);

final StreamsException thrown = assertThrows(StreamsException.class, collector::close);
Expand All @@ -831,6 +835,7 @@ public void abortTransaction() {
}
}
);
collector.initialize();

// this call is to begin an inflight txn
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,10 +712,6 @@ public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedE
thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);

for (final Task task : thread.activeTasks()) {
assertTrue(((MockProducer) ((RecordCollectorImpl) ((StreamTask) task).recordCollector()).producer()).transactionInitialized());
}

thread.shutdown();
TestUtils.waitForCondition(
() -> thread.state() == StreamThread.State.DEAD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public <K, V> void send(final String topic,
headers));
}

@Override
public void initialize() {}

@Override
public void commit(final Map<TopicPartition, OffsetAndMetadata> offsets) {
committed.add(offsets);
Expand Down

0 comments on commit d8756e8

Please sign in to comment.