Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create topology before kafka streams start. #3172

Merged
merged 5 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,6 +56,7 @@
* @author Denis Washington
* @author Gary Russell
* @author Julien Wittouck
* @author Sanghyeok An
*
* @since 1.1.4
*/
Expand Down Expand Up @@ -337,6 +338,12 @@ protected StreamsBuilder createInstance() {

@Override
public boolean isAutoStartup() {
try {
this.topology = getObject().build(this.properties);
}
catch (Exception e) {
throw new RuntimeException(e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to add this to the afterPropertiesSet() method instead of here. Doing it here adds more responsibilities to the isAutoStartup() method, which should technically only return the current autoStartup status. Also, we need to call the configureTopology(topology) method immediately after the topology creation so that some clients, such as the TopologyTestDriver, may use a fully configured topology.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, please add your name as an author of this class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@chickenchickenlove chickenchickenlove Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sobychacko, I'm so sorry, i missed this comments.

It might be better to add this to the afterPropertiesSet() method instead of here. Doing it here adds more responsibilities to the isAutoStartup() method, which should technically only return the current autoStartup status. Also, we need to call the configureTopology(topology) method immediately after the topology creation so that some clients, such as the TopologyTestDriver, may use a fully configured topology.

IMHO, afterPropertiesSet() is not proper workaround.
streamBuilder#stream() seems to be called after StreamsBuilderFactoryBean#afterPropertiesSet() are executed.

When StreamsBuilderFactoryBean#afterPropertiesSet() is called, no stream declarations are made in the StreamBuilder yet. Therefore, executing StreamsBuilder.build() at the time of the afterPropertiesSet() call does not create any topology.

And, Unfortunately, it seems that there is no appropriate hooking point. (spring-projects/spring-framework#32554)

Copy link
Contributor Author

@chickenchickenlove chickenchickenlove Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And i agree that isAutoStartup() has much responsibilities in this case.
If we move configureTopology(topology) to isAutoStartup(), much responsibilities as well.

However, as i said above, we cannot execute streamsBuilder#build() properly in scope of afterPropertiesSet().

image

Please refer to image above.

This is timeline of KafkaStreams with spring-kafka.
As you can see, In StreamsBuilderFactoryBean#afterPropertiesSet(), streamsBuilder is just about to be created and streamsBuilder does not have any stream DSL.

Thus streamsBuilder#build() is executed in StreamsBuilderFactoryBean#afterPropertiesSet(), there is no DSL to build on streamsBuilder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just commented on your issue in Spring Framework.
Let's see if SmartInitializingSingleton.afterSingletonsInstantiated() implementation help us somehow!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artembilan Hi!
Thanks your comment. SmartInitializingSingleton.afterSingletonsInstantiated() works well.
I modified code to use SmartInitializingSingleton.afterSingletonsInstantiated() and test code as well.

Please Take a look. 🙇‍♂️

CC. @sobychacko

return this.autoStartup;
}

Expand All @@ -356,11 +363,9 @@ public void start() {
try {
Assert.state(this.properties != null,
"streams configuration properties must not be null");
Topology topol = getObject().build(this.properties); // NOSONAR: getObject() cannot return null
this.infrastructureCustomizer.configureTopology(topol);
this.topology = topol;
LOGGER.debug(() -> topol.describe().toString());
this.kafkaStreams = new KafkaStreams(topol, this.properties, this.clientSupplier);
this.infrastructureCustomizer.configureTopology(this.topology);
LOGGER.debug(() -> this.topology.describe().toString());
this.kafkaStreams = new KafkaStreams(this.topology, this.properties, this.clientSupplier);
this.kafkaStreams.setStateListener(this.stateListener);
this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener);
if (this.streamsUncaughtExceptionHandler != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2020 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,6 +56,7 @@
* @author Gary Russell
* @author Denis Washington
* @author Soby Chacko
* @author Sanghyeok An
*/
@SpringJUnitConfig
@DirtiesContext
Expand Down Expand Up @@ -92,20 +93,56 @@ public void testCleanupStreams() throws IOException {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add your name as an author of this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aye, i added my name classes below.

  1. StreamsBuilderFactoryBean
  2. StreamsBuilderFactoryBeanTests
  3. StreamsBuilderFactoryLateConfigTests
    Please take a look 🙇‍♂️


@Test
public void testBuildWithProperties() throws Exception {
public void testBuildWithPropertiesAndAutoStartUp() throws Exception {
boolean autoStartUp = true;
streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) {
@Override
protected StreamsBuilder createInstance() {
return spy(super.createInstance());
}
};
streamsBuilderFactoryBean.setAutoStartup(autoStartUp);
streamsBuilderFactoryBean.afterPropertiesSet();
StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
builder.stream(Pattern.compile("foo"));
streamsBuilderFactoryBean.start();


boolean isAutoStartUp = streamsBuilderFactoryBean.isAutoStartup();
if (isAutoStartUp) {
streamsBuilderFactoryBean.start();
}

StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject();
verify(streamsBuilder).build(kafkaStreamsConfiguration.asProperties());
assertThat(streamsBuilderFactoryBean.getTopology()).isNotNull();
assertThat(isAutoStartUp).isTrue();
assertThat(streamsBuilderFactoryBean.isRunning()).isTrue();
}

@Test
public void testBuildWithPropertiesAndNoAutoStartUp() throws Exception {
boolean autoStartUp = false;
streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) {
@Override
protected StreamsBuilder createInstance() {
return spy(super.createInstance());
}
};
streamsBuilderFactoryBean.setAutoStartup(autoStartUp);
streamsBuilderFactoryBean.afterPropertiesSet();
StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
builder.stream(Pattern.compile("foo"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try not to use names like foo in the new code that we add.


boolean isAutoStartUp = streamsBuilderFactoryBean.isAutoStartup();
if (isAutoStartUp) {
streamsBuilderFactoryBean.start();
}

StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject();
verify(streamsBuilder).build(kafkaStreamsConfiguration.asProperties());
assertThat(streamsBuilderFactoryBean.getTopology()).isNotNull();
assertThat(isAutoStartUp).isFalse();
assertThat(streamsBuilderFactoryBean.isRunning()).isFalse();
}

@Configuration
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2020 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing author tag.

Expand Down Expand Up @@ -43,6 +43,7 @@
* @author Soby Chacko
* @author Artem Bilan
* @author Gary Russell
* @author Sanghyeok An
*/
@SpringJUnitConfig
@DirtiesContext
Expand Down Expand Up @@ -72,15 +73,19 @@ public void testStreamBuilderFactoryCannotBeInstantiatedWhenAutoStart() {

@Test
public void testStreamsBuilderFactoryWithConfigProvidedLater() throws Exception {
boolean isAutoStartUp = true;
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
streamsBuilderFactoryBean.setStreamsConfiguration(props);
streamsBuilderFactoryBean.setAutoStartup(isAutoStartUp);
streamsBuilderFactoryBean.getObject().stream(Pattern.compile("foo"));

assertThat(streamsBuilderFactoryBean.isRunning()).isFalse();
boolean shouldAutoStartUp = streamsBuilderFactoryBean.isAutoStartup();
streamsBuilderFactoryBean.start();
assertThat(streamsBuilderFactoryBean.isRunning()).isTrue();
assertThat(shouldAutoStartUp).isEqualTo(isAutoStartUp);
}

@Configuration
Expand Down