Skip to content

Commit

Permalink
spring-projectsGH-1736: Depr. StreamsBuilderFactoryBeanCustomizer
Browse files Browse the repository at this point in the history
Resolves spring-projects#1736

**cherry-pick to 2.6.x**
  • Loading branch information
garyrussell committed Mar 16, 2021
1 parent 73d853b commit 87a5215
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 10 deletions.
6 changes: 3 additions & 3 deletions spring-kafka-docs/src/main/asciidoc/streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ To avoid boilerplate code for most cases, especially when you develop microservi
All you need is to declare a `KafkaStreamsConfiguration` bean named `defaultKafkaStreamsConfig`.
A `StreamsBuilderFactoryBean` bean, named `defaultKafkaStreamsBuilder`, is automatically declared in the application context.
You can declare and use any additional `StreamsBuilderFactoryBean` beans as well.
Starting with version 2.3, you can perform additional customization of that bean, by providing a bean that implements `StreamsBuilderFactoryBeanCustomizer`.
There must only be one such bean, or one must be marked `@Primary`.
You can perform additional customization of that bean, by providing a bean that implements `StreamsBuilderFactoryBeanConfigurer`.
If there are multiple such beans, they will be applied according to their `Ordered.order` property.

By default, when the factory bean is stopped, the `KafkaStreams.cleanUp()` method is called.
Starting with version 2.1.2, the factory bean has additional constructors, taking a `CleanupConfig` object that has properties to let you control whether the `cleanUp()` method is called during `start()` or `stop()` or neither.
Expand Down Expand Up @@ -339,7 +339,7 @@ public static class KafkaStreamsConfig {
}
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,14 +16,17 @@

package org.springframework.kafka.annotation;

import java.util.HashSet;
import java.util.Set;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.UnsatisfiedDependencyException;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;

/**
* {@code @Configuration} class that registers a {@link StreamsBuilderFactoryBean}
Expand Down Expand Up @@ -53,17 +56,33 @@ public class KafkaStreamsDefaultConfiguration {
*/
public static final String DEFAULT_STREAMS_BUILDER_BEAN_NAME = "defaultKafkaStreamsBuilder";

/**
* Bean for the default {@link StreamsBuilderFactoryBean}.
* @param streamsConfigProvider the streams config.
* @param customizerProvider the customizer.
* @param configurerProvider the configurer.
*
* @return the factory bean.
*/
@SuppressWarnings("deprecation")
@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(
@Qualifier(DEFAULT_STREAMS_CONFIG_BEAN_NAME)
ObjectProvider<KafkaStreamsConfiguration> streamsConfigProvider,
ObjectProvider<StreamsBuilderFactoryBeanCustomizer> customizerProvider) {
ObjectProvider<org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer> customizerProvider,
ObjectProvider<StreamsBuilderFactoryBeanConfigurer> configurerProvider) {

KafkaStreamsConfiguration streamsConfig = streamsConfigProvider.getIfAvailable();
if (streamsConfig != null) {
StreamsBuilderFactoryBean fb = new StreamsBuilderFactoryBean(streamsConfig);
StreamsBuilderFactoryBeanCustomizer customizer = customizerProvider.getIfUnique();
if (customizer != null) {
Set<StreamsBuilderFactoryBeanConfigurer> configuredBy = new HashSet<>();
configurerProvider.orderedStream().forEach(configurer -> {
configurer.configure(fb);
configuredBy.add(configurer);
});
org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer customizer = customizerProvider
.getIfUnique();
if (customizer != null && !configuredBy.contains(customizer)) {
customizer.configure(fb);
}
return fb;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.config;

import org.springframework.core.Ordered;

/**
* A configurer for {@link StreamsBuilderFactoryBean}. Applied, in order, to the single
* {@link StreamsBuilderFactoryBean} configured by the framework. Invoked after the bean
* is created and before it is started. Default order is 0.
*
* @author Gary Russell
* @since 2.6.7
*
*/
@FunctionalInterface
@SuppressWarnings("deprecation")
public interface StreamsBuilderFactoryBeanConfigurer extends StreamsBuilderFactoryBeanCustomizer, Ordered {

/**
* Overridden to avoid deprecation warnings.
*/
@Override
void configure(StreamsBuilderFactoryBean factoryBean);

@Override
default int getOrder() {
return 0;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,11 +22,14 @@
* implementation of this interface is found in the application context (or one is marked
* as {@link org.springframework.context.annotation.Primary}, it will be invoked after the
* factory bean has been created and before it is started.
* @deprecated in favor of {@code StreamsBuilderFactoryBeanConfigurer} due to a name
* clash with a similar class in Spring Boot.
*
* @author Gary Russell
* @since 2.3
*
*/
@Deprecated
@FunctionalInterface
public interface StreamsBuilderFactoryBeanCustomizer {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.streams;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Gary Russell
* @since 2.7
*
*/
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(partitions = 1,
topics = Configurer1Tests.STREAMING_TOPIC1,
brokerProperties = {
"auto.create.topics.enable=${topics.autoCreate:false}",
"delete.topic.enable=${topic.delete:true}" },
brokerPropertiesLocation = "classpath:/${broker.filename:broker}.properties")
public class Configurer1Tests {

public static final String STREAMING_TOPIC1 = "Configurer1Tests1";

@Test
void appliedInOrder(@Autowired List<Integer> callOrder) {
assertThat(callOrder).containsExactly(1, 2, 3);
}

@Configuration
@EnableKafkaStreams
public static class Config {

@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private String brokerAddresses;

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "configurer1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100");
return new KafkaStreamsConfiguration(props);
}

@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
stream.foreach((K, v) -> { });
return stream;
}

@Bean
List<Integer> callOrder() {
return new ArrayList<>();
}

@Bean
StreamsBuilderFactoryBeanConfigurer three(List<Integer> callOrder) {
return new StreamsBuilderFactoryBeanConfigurer() {

@Override
public void configure(StreamsBuilderFactoryBean factoryBean) {
callOrder.add(3);
}

@Override
public int getOrder() {
return Integer.MAX_VALUE;
}

};
}

@Bean
StreamsBuilderFactoryBeanConfigurer one(List<Integer> callOrder) {
return new StreamsBuilderFactoryBeanConfigurer() {

@Override
public void configure(StreamsBuilderFactoryBean factoryBean) {
callOrder.add(1);
}

@Override
public int getOrder() {
return Integer.MIN_VALUE;
}

};
}

@Bean
StreamsBuilderFactoryBeanConfigurer two(List<Integer> callOrder) {
return new StreamsBuilderFactoryBeanConfigurer() {

@Override
public void configure(StreamsBuilderFactoryBean factoryBean) {
callOrder.add(2);
}

};
}

@SuppressWarnings("deprecation")
@Bean
public StreamsBuilderFactoryBeanCustomizer wontBeFoundNotUnique(List<Integer> callOrder) {
return fb -> fb.setStateListener((newState, oldState) -> {
callOrder.add(4);
});
}

}

}
Loading

0 comments on commit 87a5215

Please sign in to comment.