Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create topology before kafka streams start. #3172

Merged
merged 5 commits into from
Apr 4, 2024

Conversation

chickenchickenlove
Copy link
Contributor

@chickenchickenlove chickenchickenlove commented Mar 30, 2024

Introduction

Hi, Spring kafka Team.
This is chickenchickenlove, huge fan of you guys.
I saw this issue (#3020). i want to resolve this problem by myself!

Background

To reviewer

  • I have tiny PR to solve Issue(Allows to initialize/inject the Topology object without starting the Kafka streams. #3020).
  • It might be a fairly simple fix, but it could look bad. IMHO, it's a better idea to register StreamsBuilder as a Bean in a different scope, however, it will introduce many changes to StreamsBuilderFactoryBean, AbstractBuilderFactoryBean as well. I have made modifications to minimize the changes.
  • When you have some free time, Please take a look 🙇‍♂️.

Changes

  • Point of initializing kafka-streams topology move to StreamsBuilderFactoryBean#isAutoStartup().

@PostConstruct or StreamsBuilderFactoryBean#afterPropertiesSet() Does not work well in this case. Why?

  • Because they are called just before bean is initialized. however, codes to make topology will be injected after StreamsBuilderFactoryBean#afterPropertiesSet() are called.
  1. StreamBuilder is initialized in that time by this code. (StreamsBuilderFactoryBean#afterPropertiesSet() -> StreamsBuilderFactoryBean#createInstance() -> finally, StreamsBuilder are intialized!)
  2. However, codes to make topology, are not injected StreamsBuilder in this time. you can refer to this topology code below.
@Component
public class MyComponent {

    private static final Serde<String> STRING_SERDE = Serdes.String();
    private final StreamsBuilder streamsBuilder;

    public MyComponent(StreamsBuilder streamsBuilder) {
        this.streamsBuilder = streamsBuilder;
        buildPipeline();
    }

    public void buildPipeline() {
        KStream<String, String> messageStream = streamsBuilder
                .stream("input-topic", Consumed.with(STRING_SERDE, STRING_SERDE));
        ...
        wordCounts.toStream().to("output-topic");
    }
}
  1. MyComponent#buildPipeline()is executed afterStreamsBuilderFactoryBean#afterPropertiesSet()called. After this step,StreamsBuildercan return validtopology. Before this step, StreamsBuilderwill return invalidtopology` always.

### Why am i used to isAutoStart()?
- AFAIK, There is no hooking point for this. even, ContextRefreshedEvent will be called after SmartLifeCycle#isAutoStart()
- IMHO, this is best simple modification. In common use case, Spring will call SmartLifeCycle#isAutoStart() only once. I did send request to spring team whether if they can implement new hooking point such like it. (link)

Any Other workaround?

- IMHO, StreamsBuilder should be registered as bean other scope (not in StreamsBuilderFactoryBean scope). But it will involves a lot of changes to the StreamsBuilderFactoryBean.
- I was wrong. IMHO, there is no workaround, i guess 😢 .

  • Finally, i used SmartInitializingSingleton.afterSingletonsInstantiated() to resolve this issue.

@sobychacko
Copy link
Contributor

sobychacko commented Apr 1, 2024

@chickenchickenlove There are checkstyle issues and test failures in the build. Please take a look.

@chickenchickenlove
Copy link
Contributor Author

Aye, i will check it. thanks for your comment 🙇‍♂️

@chickenchickenlove
Copy link
Contributor Author

Hi, @sobychacko !
Thanks for your comments 🙇‍♂️
I fixed failed test and checkstyle!

Please take a look when you have some free time! 🙇‍♂️

}
catch (Exception e) {
throw new RuntimeException(e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to add this to the afterPropertiesSet() method instead of here. Doing it here adds more responsibilities to the isAutoStartup() method, which should technically only return the current autoStartup status. Also, we need to call the configureTopology(topology) method immediately after the topology creation so that some clients, such as the TopologyTestDriver, may use a fully configured topology.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, please add your name as an author of this class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@chickenchickenlove chickenchickenlove Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sobychacko, I'm so sorry, i missed this comments.

It might be better to add this to the afterPropertiesSet() method instead of here. Doing it here adds more responsibilities to the isAutoStartup() method, which should technically only return the current autoStartup status. Also, we need to call the configureTopology(topology) method immediately after the topology creation so that some clients, such as the TopologyTestDriver, may use a fully configured topology.

IMHO, afterPropertiesSet() is not proper workaround.
streamBuilder#stream() seems to be called after StreamsBuilderFactoryBean#afterPropertiesSet() are executed.

When StreamsBuilderFactoryBean#afterPropertiesSet() is called, no stream declarations are made in the StreamBuilder yet. Therefore, executing StreamsBuilder.build() at the time of the afterPropertiesSet() call does not create any topology.

And, Unfortunately, it seems that there is no appropriate hooking point. (spring-projects/spring-framework#32554)

Copy link
Contributor Author

@chickenchickenlove chickenchickenlove Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And i agree that isAutoStartup() has much responsibilities in this case.
If we move configureTopology(topology) to isAutoStartup(), much responsibilities as well.

However, as i said above, we cannot execute streamsBuilder#build() properly in scope of afterPropertiesSet().

image

Please refer to image above.

This is timeline of KafkaStreams with spring-kafka.
As you can see, In StreamsBuilderFactoryBean#afterPropertiesSet(), streamsBuilder is just about to be created and streamsBuilder does not have any stream DSL.

Thus streamsBuilder#build() is executed in StreamsBuilderFactoryBean#afterPropertiesSet(), there is no DSL to build on streamsBuilder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just commented on your issue in Spring Framework.
Let's see if SmartInitializingSingleton.afterSingletonsInstantiated() implementation help us somehow!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artembilan Hi!
Thanks your comment. SmartInitializingSingleton.afterSingletonsInstantiated() works well.
I modified code to use SmartInitializingSingleton.afterSingletonsInstantiated() and test code as well.

Please Take a look. 🙇‍♂️

CC. @sobychacko

@@ -92,20 +92,56 @@ public void testCleanupStreams() throws IOException {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add your name as an author of this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aye, i added my name classes below.

  1. StreamsBuilderFactoryBean
  2. StreamsBuilderFactoryBeanTests
  3. StreamsBuilderFactoryLateConfigTests
    Please take a look 🙇‍♂️

@@ -1,5 +1,5 @@
/*
* Copyright 2018-2020 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing author tag.

}
catch (Exception e) {
throw new RuntimeException(e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

};
streamsBuilderFactoryBean.afterPropertiesSet();
StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
builder.stream(Pattern.compile("foo"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try not to use names like foo in the new code that we add.

@sobychacko
Copy link
Contributor

@chickenchickenlove The new updates look good, but the PR build still has some checkstyle issues. See more details here: https://github.com/spring-projects/spring-kafka/actions/runs/8532654028/job/23411166278?pr=3172.
Once that is addressed, this is ready to be merged.

@chickenchickenlove
Copy link
Contributor Author

chickenchickenlove commented Apr 3, 2024

@sobychacko
Thanks for your time 🙇‍♂️
I have resolved the lint error and changed from foo to test-topic.
Could you please take a look?

@sobychacko sobychacko merged commit f1e48f4 into spring-projects:main Apr 4, 2024
3 checks passed
@sobychacko
Copy link
Contributor

@chickenchickenlove Thanks for the PR. It is now merged upstream.

@injae-kim
Copy link

Nice work @chickenchickenlove !!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants