Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

Commit

Permalink
Fix Kafka Send Timeout - Back Port
Browse files Browse the repository at this point in the history
Resolves spring-attic/spring-cloud-stream-binder-kafka#928

Kafka has a longer default send timeout; this means a send could be successful long
after Spring has timed out the send.
  • Loading branch information
garyrussell committed Jul 16, 2020
1 parent 2ae732d commit 5c27eb2
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
Expand Down Expand Up @@ -66,7 +67,6 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
Expand Down Expand Up @@ -102,7 +102,10 @@
public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMessageHandler
implements Lifecycle {

private static final long DEFAULT_SEND_TIMEOUT = 10000;
/**
* Buffer added to ensure our timeout is longer than Kafka's.
*/
private static final int TIMEOUT_BUFFER = 5000;

private final Map<String, Set<Integer>> replyTopicsAndPartitions = new HashMap<>();

Expand All @@ -116,6 +119,8 @@ public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMes

private final AtomicBoolean running = new AtomicBoolean();

private final long deliveryTimeoutMsProperty;

private EvaluationContext evaluationContext;

private Expression topicExpression;
Expand All @@ -131,7 +136,7 @@ public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMes

private boolean sync;

private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT);
private Expression sendTimeoutExpression;

private KafkaHeaderMapper headerMapper;

Expand All @@ -151,7 +156,7 @@ public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMes

private ProducerRecordCreator<K, V> producerRecordCreator =
(message, topic, partition, timestamp, key, value, headers) ->
new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
new ProducerRecord<>(topic, partition, timestamp, key, value, headers);

private volatile byte[] singleReplyTopic;

Expand All @@ -176,6 +181,27 @@ public KafkaProducerMessageHandler(final KafkaTemplate<K, V> kafkaTemplate) {
logger.warn("The KafkaTemplate is transactional; this gateway will only work if the consumer is "
+ "configured to read uncommitted records");
}
determineSendTimeout();
this.deliveryTimeoutMsProperty =
this.sendTimeoutExpression.getValue(Long.class) // NOSONAR - never null after determineSendTimeout()
- TIMEOUT_BUFFER;
}

private void determineSendTimeout() {
Map<String, Object> props = this.kafkaTemplate.getProducerFactory().getConfigurationProperties();
Object dt = props.get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
if (dt == null) {
dt = ProducerConfig.configDef().defaultValues().get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
}
if (dt instanceof Long) {
setSendTimeout(((Long) dt) + TIMEOUT_BUFFER);
}
else if (dt instanceof Integer) {
setSendTimeout(Long.valueOf((Integer) dt) + TIMEOUT_BUFFER);
}
else if (dt instanceof String) {
setSendTimeout(Long.parseLong((String) dt) + TIMEOUT_BUFFER);
}
}

public void setTopicExpression(Expression topicExpression) {
Expand Down Expand Up @@ -244,24 +270,25 @@ public void setSync(boolean sync) {

/**
* Specify a timeout in milliseconds for how long this
* {@link KafkaProducerMessageHandler} should wait wait for send operation
* results. Defaults to 10 seconds. The timeout is applied only in {@link #sync} mode.
* Also applies when sending to the success or failure channels.
* @param sendTimeout the timeout to wait for result fo send operation.
* {@link KafkaProducerMessageHandler} should wait wait for send operation results.
* Defaults to the kafka {@code delivery.timeout.ms} property + 5 seconds. The timeout
* is applied Also applies when sending to the success or failure channels.
* @param sendTimeout the timeout to wait for result for a send operation.
* @since 2.0.1
*/
@Override
public void setSendTimeout(long sendTimeout) {
public final void setSendTimeout(long sendTimeout) {
super.setSendTimeout(sendTimeout);
setSendTimeoutExpression(new ValueExpression<>(sendTimeout));
}

/**
* Specify a SpEL expression to evaluate a timeout in milliseconds for how long this
* {@link KafkaProducerMessageHandler} should wait wait for send operation
* results. Defaults to 10 seconds. The timeout is applied only in {@link #sync} mode.
* {@link KafkaProducerMessageHandler} should wait wait for send operation results.
* Defaults to the kafka {@code delivery.timeout.ms} property + 5 seconds. The timeout
* is applied only in {@link #sync} mode.
* @param sendTimeoutExpression the {@link Expression} for timeout to wait for result
* fo send operation.
* for a send operation.
* @since 2.1.1
*/
public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
Expand All @@ -270,7 +297,8 @@ public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
}

/**
* Set the failure channel. After a send failure, an {@link ErrorMessage} will be sent
* Set the failure channel. After a send failure, an
* {@link org.springframework.messaging.support.ErrorMessage} will be sent
* to this channel with a payload of a {@link KafkaSendFailureException} with the
* failed message and cause.
* @param sendFailureChannel the failure channel.
Expand All @@ -281,7 +309,8 @@ public void setSendFailureChannel(MessageChannel sendFailureChannel) {
}

/**
* Set the failure channel name. After a send failure, an {@link ErrorMessage} will be
* Set the failure channel name. After a send failure, an
* {@link org.springframework.messaging.support.ErrorMessage} will be
* sent to this channel name with a payload of a {@link KafkaSendFailureException}
* with the failed message and cause.
* @param sendFailureChannelName the failure channel name.
Expand Down Expand Up @@ -392,10 +421,8 @@ public void start() {

@Override
public void stop() {
if (this.running.compareAndSet(true, false)) {
if (!this.transactional || this.allowNonTransactional) {
this.kafkaTemplate.flush();
}
if (this.running.compareAndSet(true, false) && (!this.transactional || this.allowNonTransactional)) {
this.kafkaTemplate.flush();
}
}

Expand All @@ -404,11 +431,12 @@ public boolean isRunning() {
return this.running.get();
}

@SuppressWarnings("unchecked")
@SuppressWarnings("unchecked") // NOSONAR - complexity
@Override
protected Object handleRequestMessage(final Message<?> message) {
final ProducerRecord<K, V> producerRecord;
boolean flush = this.flushExpression.getValue(this.evaluationContext, message, Boolean.class);
boolean flush =
Boolean.TRUE.equals(this.flushExpression.getValue(this.evaluationContext, message, Boolean.class));
boolean preBuilt = message.getPayload() instanceof ProducerRecord;
if (preBuilt) {
producerRecord = (ProducerRecord<K, V>) message.getPayload();
Expand All @@ -430,9 +458,7 @@ protected Object handleRequestMessage(final Message<?> message) {
if (this.transactional
&& TransactionSynchronizationManager.getResource(this.kafkaTemplate.getProducerFactory()) == null
&& !this.allowNonTransactional) {
sendFuture = this.kafkaTemplate.executeInTransaction(template -> {
return template.send(producerRecord);
});
sendFuture = this.kafkaTemplate.executeInTransaction(template -> template.send(producerRecord));
}
else {
sendFuture = this.kafkaTemplate.send(producerRecord);
Expand All @@ -446,7 +472,7 @@ protected Object handleRequestMessage(final Message<?> message) {
throw new MessageHandlingException(message, e);
}
catch (ExecutionException e) {
throw new MessageHandlingException(message, e.getCause());
throw new MessageHandlingException(message, e.getCause()); // NOSONAR
}
if (flush) {
this.kafkaTemplate.flush();
Expand Down Expand Up @@ -488,12 +514,11 @@ private ProducerRecord<K, V> createProducerRecord(final Message<?> message) {
headers = new RecordHeaders();
this.headerMapper.fromHeaders(messageHeaders, headers);
}
final ProducerRecord<K, V> producerRecord = this.producerRecordCreator.create(message, topic, partitionId,
timestamp, (K) messageKey, payload, headers);
return producerRecord;
return this.producerRecordCreator.create(message, topic, partitionId, timestamp, (K) messageKey, payload,
headers);
}

private byte[] getReplyTopic(final Message<?> message) {
private byte[] getReplyTopic(Message<?> message) { // NOSONAR
if (this.replyTopicsAndPartitions.isEmpty()) {
determineValidReplyTopicsAndPartitions();
}
Expand Down Expand Up @@ -569,8 +594,9 @@ public void processSendResult(final Message<?> message, final ProducerRecord<K,
ListenableFuture<SendResult<K, V>> future, MessageChannel metadataChannel)
throws InterruptedException, ExecutionException {

if (getSendFailureChannel() != null || metadataChannel != null) {
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() {
final MessageChannel failureChannel = getSendFailureChannel();
if (failureChannel != null || metadataChannel != null) {
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() { // NOSONAR

@Override
public void onSuccess(SendResult<K, V> result) {
Expand All @@ -583,8 +609,8 @@ public void onSuccess(SendResult<K, V> result) {

@Override
public void onFailure(Throwable ex) {
if (getSendFailureChannel() != null) {
KafkaProducerMessageHandler.this.messagingTemplate.send(getSendFailureChannel(),
if (failureChannel != null) {
KafkaProducerMessageHandler.this.messagingTemplate.send(failureChannel,
KafkaProducerMessageHandler.this.errorMessageStrategy.buildErrorMessage(
new KafkaSendFailureException(message, producerRecord, ex), null));
}
Expand All @@ -593,8 +619,15 @@ public void onFailure(Throwable ex) {
});
}

if (this.sync) {
if (this.sync || this.isGateway) {
Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
if (sendTimeout != null && sendTimeout <= this.deliveryTimeoutMsProperty) {
this.logger.debug("'sendTimeout' increased to "
+ (this.deliveryTimeoutMsProperty + TIMEOUT_BUFFER)
+ "ms; it must be greater than the 'delivery.timeout.ms' Kafka producer "
+ "property to avoid false failures");
sendTimeout = this.deliveryTimeoutMsProperty + TIMEOUT_BUFFER;
}
if (sendTimeout == null || sendTimeout < 0) {
future.get();
}
Expand Down Expand Up @@ -623,7 +656,7 @@ private final class ConvertingReplyFuture extends SettableListenableFuture<Objec
}

private void addCallback(final RequestReplyFuture<?, ?, Object> future) {
future.addCallback(new ListenableFutureCallback<ConsumerRecord<?, Object>>() {
future.addCallback(new ListenableFutureCallback<ConsumerRecord<?, Object>>() { // NOSONAR

@Override
public void onSuccess(ConsumerRecord<?, Object> result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd">
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/kafka https://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd">

<int-kafka:outbound-gateway
id="allProps"
Expand Down Expand Up @@ -38,9 +38,6 @@

<bean id="ems" class="org.springframework.integration.kafka.config.xml.KafkaOutboundGatewayParserTests$EMS"/>

<bean id="template" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.kafka.requestreply.ReplyingKafkaTemplate"/>
</bean>
<bean id="customHeaderMapper" class="org.springframework.kafka.support.DefaultKafkaHeaderMapper"/>

<bean id="customHeaderMapper" class="org.springframework.kafka.support.DefaultKafkaHeaderMapper" />
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,24 @@
package org.springframework.integration.kafka.config.xml;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

import java.util.HashMap;
import java.util.Map;

import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Component;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

Expand Down Expand Up @@ -73,8 +82,20 @@ public void testProps() {
.isSameAs(this.context.getBean("customHeaderMapper"));
}

@Component
public static class EMS extends DefaultErrorMessageStrategy {

@SuppressWarnings("rawtypes")
@Bean
public KafkaTemplate template() {
ProducerFactory pf = mock(ProducerFactory.class);
Map<String, Object> props = new HashMap<>();
given(pf.getConfigurationProperties()).willReturn(props);
KafkaTemplate template = mock(KafkaTemplate.class);
given(template.getProducerFactory()).willReturn(pf);
return template;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -147,11 +148,14 @@ static void tearDown() {

@Test
void testOutbound() {
DefaultKafkaProducerFactory<Integer, String> producerFactory = new DefaultKafkaProducerFactory<>(
KafkaTestUtils.producerProps(embeddedKafka));
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 40_000);
DefaultKafkaProducerFactory<Integer, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(producerFactory);
KafkaProducerMessageHandler<Integer, String> handler = new KafkaProducerMessageHandler<>(template);
handler.setBeanFactory(mock(BeanFactory.class));
handler.setSendTimeout(50_000);
handler.setSync(true);
handler.afterPropertiesSet();

Message<?> message = MessageBuilder.withPayload("foo")
Expand Down

0 comments on commit 5c27eb2

Please sign in to comment.