diff --git a/spring-kafka-docs/src/main/asciidoc/streams.adoc b/spring-kafka-docs/src/main/asciidoc/streams.adoc index c3390e5f81..2838a9959e 100644 --- a/spring-kafka-docs/src/main/asciidoc/streams.adoc +++ b/spring-kafka-docs/src/main/asciidoc/streams.adoc @@ -229,8 +229,8 @@ To avoid boilerplate code for most cases, especially when you develop microservi All you need is to declare a `KafkaStreamsConfiguration` bean named `defaultKafkaStreamsConfig`. A `StreamsBuilderFactoryBean` bean, named `defaultKafkaStreamsBuilder`, is automatically declared in the application context. You can declare and use any additional `StreamsBuilderFactoryBean` beans as well. -Starting with version 2.3, you can perform additional customization of that bean, by providing a bean that implements `StreamsBuilderFactoryBeanCustomizer`. -There must only be one such bean, or one must be marked `@Primary`. +You can perform additional customization of that bean, by providing a bean that implements `StreamsBuilderFactoryBeanConfigurer`. +If there are multiple such beans, they will be applied according to their `Ordered.order` property. By default, when the factory bean is stopped, the `KafkaStreams.cleanUp()` method is called. Starting with version 2.1.2, the factory bean has additional constructors, taking a `CleanupConfig` object that has properties to let you control whether the `cleanUp()` method is called during `start()` or `stop()` or neither. @@ -339,7 +339,7 @@ public static class KafkaStreamsConfig { } @Bean - public StreamsBuilderFactoryBeanCustomizer customizer() { + public StreamsBuilderFactoryBeanConfigurer configurer() { return fb -> fb.setStateListener((newState, oldState) -> { System.out.println("State transition from " + oldState + " to " + newState); }); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.java index 7883cd839e..559cbd3db7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,9 @@ package org.springframework.kafka.annotation; +import java.util.HashSet; +import java.util.Set; + import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.UnsatisfiedDependencyException; import org.springframework.beans.factory.annotation.Qualifier; @@ -23,7 +26,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; -import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer; +import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; /** * {@code @Configuration} class that registers a {@link StreamsBuilderFactoryBean} @@ -53,17 +56,33 @@ public class KafkaStreamsDefaultConfiguration { */ public static final String DEFAULT_STREAMS_BUILDER_BEAN_NAME = "defaultKafkaStreamsBuilder"; + /** + * Bean for the default {@link StreamsBuilderFactoryBean}. + * @param streamsConfigProvider the streams config. + * @param customizerProvider the customizer. + * @param configurerProvider the configurer. + * + * @return the factory bean. + */ + @SuppressWarnings("deprecation") @Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME) public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder( @Qualifier(DEFAULT_STREAMS_CONFIG_BEAN_NAME) ObjectProvider streamsConfigProvider, - ObjectProvider customizerProvider) { + ObjectProvider customizerProvider, + ObjectProvider configurerProvider) { KafkaStreamsConfiguration streamsConfig = streamsConfigProvider.getIfAvailable(); if (streamsConfig != null) { StreamsBuilderFactoryBean fb = new StreamsBuilderFactoryBean(streamsConfig); - StreamsBuilderFactoryBeanCustomizer customizer = customizerProvider.getIfUnique(); - if (customizer != null) { + Set configuredBy = new HashSet<>(); + configurerProvider.orderedStream().forEach(configurer -> { + configurer.configure(fb); + configuredBy.add(configurer); + }); + org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer customizer = customizerProvider + .getIfUnique(); + if (customizer != null && !configuredBy.contains(customizer)) { customizer.configure(fb); } return fb; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanConfigurer.java new file mode 100644 index 0000000000..e8837da667 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanConfigurer.java @@ -0,0 +1,45 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.config; + +import org.springframework.core.Ordered; + +/** + * A configurer for {@link StreamsBuilderFactoryBean}. Applied, in order, to the single + * {@link StreamsBuilderFactoryBean} configured by the framework. Invoked after the bean + * is created and before it is started. Default order is 0. + * + * @author Gary Russell + * @since 2.6.7 + * + */ +@FunctionalInterface +@SuppressWarnings("deprecation") +public interface StreamsBuilderFactoryBeanConfigurer extends StreamsBuilderFactoryBeanCustomizer, Ordered { + + /** + * Overridden to avoid deprecation warnings. + */ + @Override + void configure(StreamsBuilderFactoryBean factoryBean); + + @Override + default int getOrder() { + return 0; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanCustomizer.java b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanCustomizer.java index 571e364a0c..1ad24065d5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanCustomizer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanCustomizer.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 the original author or authors. + * Copyright 2020-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,11 +22,14 @@ * implementation of this interface is found in the application context (or one is marked * as {@link org.springframework.context.annotation.Primary}, it will be invoked after the * factory bean has been created and before it is started. + * @deprecated in favor of {@code StreamsBuilderFactoryBeanConfigurer} due to a name + * clash with a similar class in Spring Boot. * * @author Gary Russell * @since 2.3 * */ +@Deprecated @FunctionalInterface public interface StreamsBuilderFactoryBeanCustomizer { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/Configurer1Tests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/Configurer1Tests.java new file mode 100644 index 0000000000..a6d0f72e95 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/Configurer1Tests.java @@ -0,0 +1,158 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.streams; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.WallclockTimestampExtractor; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafkaStreams; +import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; +import org.springframework.kafka.config.KafkaStreamsConfiguration; +import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; +import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Gary Russell + * @since 2.7 + * + */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(partitions = 1, + topics = Configurer1Tests.STREAMING_TOPIC1, + brokerProperties = { + "auto.create.topics.enable=${topics.autoCreate:false}", + "delete.topic.enable=${topic.delete:true}" }, + brokerPropertiesLocation = "classpath:/${broker.filename:broker}.properties") +public class Configurer1Tests { + + public static final String STREAMING_TOPIC1 = "Configurer1Tests1"; + + @Test + void appliedInOrder(@Autowired List callOrder) { + assertThat(callOrder).containsExactly(1, 2, 3); + } + + @Configuration + @EnableKafkaStreams + public static class Config { + + @Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}") + private String brokerAddresses; + + @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) + public KafkaStreamsConfiguration kStreamsConfigs() { + Map props = new HashMap<>(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "configurer1"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + WallclockTimestampExtractor.class.getName()); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"); + return new KafkaStreamsConfiguration(props); + } + + @Bean + public KStream kStream(StreamsBuilder kStreamBuilder) { + KStream stream = kStreamBuilder.stream(STREAMING_TOPIC1); + stream.foreach((K, v) -> { }); + return stream; + } + + @Bean + List callOrder() { + return new ArrayList<>(); + } + + @Bean + StreamsBuilderFactoryBeanConfigurer three(List callOrder) { + return new StreamsBuilderFactoryBeanConfigurer() { + + @Override + public void configure(StreamsBuilderFactoryBean factoryBean) { + callOrder.add(3); + } + + @Override + public int getOrder() { + return Integer.MAX_VALUE; + } + + }; + } + + @Bean + StreamsBuilderFactoryBeanConfigurer one(List callOrder) { + return new StreamsBuilderFactoryBeanConfigurer() { + + @Override + public void configure(StreamsBuilderFactoryBean factoryBean) { + callOrder.add(1); + } + + @Override + public int getOrder() { + return Integer.MIN_VALUE; + } + + }; + } + + @Bean + StreamsBuilderFactoryBeanConfigurer two(List callOrder) { + return new StreamsBuilderFactoryBeanConfigurer() { + + @Override + public void configure(StreamsBuilderFactoryBean factoryBean) { + callOrder.add(2); + } + + }; + } + + @SuppressWarnings("deprecation") + @Bean + public StreamsBuilderFactoryBeanCustomizer wontBeFoundNotUnique(List callOrder) { + return fb -> fb.setStateListener((newState, oldState) -> { + callOrder.add(4); + }); + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/Configurer2Tests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/Configurer2Tests.java new file mode 100644 index 0000000000..b1e7ffe71a --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/Configurer2Tests.java @@ -0,0 +1,115 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.streams; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.WallclockTimestampExtractor; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafkaStreams; +import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; +import org.springframework.kafka.config.KafkaStreamsConfiguration; +import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Gary Russell + * @since 2.7 + * + */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(partitions = 1, + topics = Configurer2Tests.STREAMING_TOPIC1, + brokerProperties = { + "auto.create.topics.enable=${topics.autoCreate:false}", + "delete.topic.enable=${topic.delete:true}" }, + brokerPropertiesLocation = "classpath:/${broker.filename:broker}.properties") +public class Configurer2Tests { + + public static final String STREAMING_TOPIC1 = "Configurer2Tests1"; + + @Test + void appliedInOrder(@Autowired List callOrder) { + assertThat(callOrder).containsExactly(1); + } + + @Configuration + @EnableKafkaStreams + public static class Config { + + @Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}") + private String brokerAddresses; + + @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) + public KafkaStreamsConfiguration kStreamsConfigs() { + Map props = new HashMap<>(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "configurer2"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + WallclockTimestampExtractor.class.getName()); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"); + return new KafkaStreamsConfiguration(props); + } + + @Bean + public KStream kStream(StreamsBuilder kStreamBuilder) { + KStream stream = kStreamBuilder.stream(STREAMING_TOPIC1); + stream.foreach((K, v) -> { }); + return stream; + } + + @Bean + List callOrder() { + return new ArrayList<>(); + } + + @Bean + StreamsBuilderFactoryBeanConfigurer onlyAppliedOnce(List callOrder) { + return new StreamsBuilderFactoryBeanConfigurer() { + + @Override + public void configure(StreamsBuilderFactoryBean factoryBean) { + callOrder.add(1); + } + + }; + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java index ce0146494c..c6d33124c8 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -208,6 +208,7 @@ public AtomicBoolean stateChangeCalled() { return new AtomicBoolean(); } + @SuppressWarnings("deprecation") @Bean public StreamsBuilderFactoryBeanCustomizer customizer() { return fb -> fb.setStateListener((newState, oldState) -> {