-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: remove unnecessary changelog for topics #3987
feat: remove unnecessary changelog for topics #3987
Conversation
BREAKING CHANGE: KSQL will now, by default, not create duplicate changelog for table sources. fixes: confluentinc#3621 Now that Kafka Steams has a `KTable.transformValues` we no longer need to create a table by first creating a stream, then doing a select/groupby/aggregate on it. Instead, we can just use `StreamBuilder.table`. This change makes the switch, removing the `StreamToTable` types and calls and replaces them with either `TableSource` or `WindowedTableSource`, copying the existing pattern for `StreamSource` and `WindowedStreamSource`. It also reinstates a change in `KsqlConfig` that ensures topology optimisations are on by default. This was the case for 5.4.x, but was inadvertently turned off. With the optimisation config turned on, and the new builder step used, KSQL no longer creates a changelog topic to back the tables state store. This is not needed as the source topic is itself the changelog. The change includes new tests in `table.json` to confirm the change log topic is not created by default and is created if the user turns off optimisations. This change also removes the line in the `TestExecutor` that explicitly sets topology optimisations to `all`. The test should _not_ of being doing tis. This may been why the bug turning off optimisations was not detected.
The state store backing a table no longer requires explicit instantiation, so no value serde is being created, and its the creating of the value serde that results in a line in the expected schemas. To put this another way, the state store now uses the same serde as the source topic.
The expected topology for tables has changed and needs updating.
missed this previously
Conflicting files ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KSPlanBuilder.java ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSourceBuilder.java ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSourceBuilderTest.java ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamToTableBuilderTest.java
if (getDataSourceType() == DataSourceType.KSTREAM) { | ||
return schemaKStream; | ||
} | ||
final Stacker reduceContextStacker = contextStacker.push(REDUCE_OP_NAME); | ||
return schemaKStream.toTable( | ||
dataSource.getKsqlTopic().getKeyFormat(), | ||
dataSource.getKsqlTopic().getValueFormat(), | ||
reduceContextStacker | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No longer needed as the factory now creates both streams and tables.
KafkaStreams buildKafkaStreams(StreamsBuilder builder, Map<String, Object> conf); | ||
BuildResult buildKafkaStreams(StreamsBuilder builder, Map<String, Object> conf); | ||
|
||
class BuildResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this class to avoid the caller having to also build the topology, (which was happening in most places)
final Properties props = new Properties(); | ||
props.putAll(conf); | ||
return new KafkaStreams(builder.build(), props, clientSupplier); | ||
final Topology topology = builder.build(props); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Importantly, we now pass stream props to the topology building, so that it can read topology.optimize
prop
/** | ||
* Factor class used to create stream and table sources | ||
*/ | ||
public final class SchemaKSourceFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pulled logic out of ScheamKStream
into this factory and made it also support tables. This made more sense that adding factory methods to SchemaKTable
as a lot of the code is shared.
@@ -98,66 +86,6 @@ | |||
private final ExecutionStep<KStreamHolder<K>> sourceStep; | |||
private final ExecutionStepProperties sourceProperties; | |||
|
|||
private static <K> SchemaKStream<K> forSource( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All this stuff has been moved to the new SchemaKSourceFactory
assertThat(returned, is(table)); | ||
} | ||
|
||
@Test | ||
public void shouldBuildTableWithCorrectContext() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no longer possible to test this here. Instead test added to SourceBuilderTest
called shouldBuildTableWithCorrectStoreName
.
@@ -80,7 +80,6 @@ | |||
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") | |||
.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0) | |||
.put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "some.ksql.service.id") | |||
.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, "all") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was nasty: test case was explicitly turning on optimizations. Meaning the bug introduced when the optimization was turned of in KsqlConfig
was not found.
} | ||
|
||
public static void main(final String[] args) throws Exception { | ||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now runs as a test so that is has the correct class path. If run from main
it won't find the test UDFs and fails.
|
||
public final class StreamSourceBuilder { | ||
private StreamSourceBuilder() { | ||
public final class SourceBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
named as it now handles tables too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Big change, but unless I'm missing something it seems pretty straightforward.
ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableSource.java
Outdated
Show resolved
Hide resolved
ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKSourceFactory.java
Outdated
Show resolved
Hide resolved
ksql-execution/src/main/java/io/confluent/ksql/execution/plan/WindowedTableSource.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, couple things inline.
Conflicting files ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSourceBuilder.java ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamToTableBuilder.java ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSourceBuilderTest.java ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamToTableBuilderTest.java
Description
BREAKING CHANGE: KSQL will now, by default, not create duplicate changelog for table sources.
For example, given SQL such as:
KSQL would previously have created a changelog topic called
<service-id>query_<querid>_0-KsqlTopic-reduce-changelog
that backed the statestore where the table was materialized.With this change this is no longer the case. Kafka Streams has a 'topology.optimization' setting. When this is set to
all
KS won't back the state store with a topic as the state store can be rebuilt form the source topic!fixes: #3621
Now that Kafka Streams has a
KTable.transformValues
we no longer need to create a table by first creating a stream, then doing a select/groupby/aggregate on it. Instead, we can just useStreamBuilder.table
.This change makes the switch, removing the
StreamToTable
types and calls and replaces them with eitherTableSource
orWindowedTableSource
, copying the existing pattern forStreamSource
andWindowedStreamSource
.It also reinstates a change in
KsqlConfig
that ensures topology optimisations are on by default. This was the case for 5.4.x and 0.6.0, but was inadvertently turned off in master.With the optimisation config turned on, and the new builder step used, KSQL no longer creates a changelog topic to back the tables state store. This is not needed as the source topic is itself the changelog. The change includes new tests in
table.json
to confirm the change log topic is not created by default and is created if the user turns off optimisations.This change also removes the line in the
TestExecutor
that explicitly sets topology optimisations toall
. The test should not of being doing tis. This may been why the bug turning off optimisations was not detected.Reviewing notes:
I've split the PR into several commits to make it easier to ignore the bulk changes. Notable commits are:
KsqlTopic(_left|_right)_reduece
'expected schema' as the state store now uses the same schema as the source topic.Testing done
usual
Reviewer checklist