-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: build ks app from an execution plan visitor #3418
feat: build ks app from an execution plan visitor #3418
Conversation
final StreamSource<?> streamSource | ||
) { | ||
if (streamSource.getFormats().getKeyFormat().isWindowed()) { | ||
return (StreamWithKeySerdeFactory) buildWindowed(queryBuilder,streamSource); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not ideal. Planning to clean this up in a follow-on that adds a separate step type for a windowed source so we can have it strictly typed (similar to aggregates).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
ec682fe
to
d688767
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Just a few small comments, mainly on naming.
ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java
Show resolved
Hide resolved
if (sourceType == DataSourceType.KTABLE) { | ||
((SchemaKTable) schemaKStream).getSourceTableStep().build(planBuilder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above
import org.apache.kafka.streams.kstream.KGroupedTable; | ||
import org.apache.kafka.streams.kstream.Windowed; | ||
|
||
public interface PlanBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to have a few lines of class header comment explaining what the purpose of this interface is (Not expecting full javadoc!)
import java.util.Objects; | ||
import org.apache.kafka.streams.kstream.KStream; | ||
|
||
public final class StreamWithKeySerdeFactory<K> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class doesn't seems to be a StreamWithKeySerdeFactory (i.e. it doesn't create instances of StreamWithKeySerde) but more of a holder class for a stream and a KeySerdeFactory. Perhaps it could have a better name? Maybe StreamHolder?
import java.util.Objects; | ||
import org.apache.kafka.streams.kstream.KTable; | ||
|
||
public final class TableWithKeySerdeFactory<K> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming again. Maybe TableHolder?
testCase.getGeneratedSchemas().get(0).split(System.lineSeparator())); | ||
assertThat("Schemas used by topology differ " | ||
+ "from those used by previous versions" | ||
+ " of KSQL - this likely means there is a non-backwards compatible change.\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"... this is likely to mean..."
import org.apache.kafka.streams.kstream.KGroupedTable; | ||
import org.apache.kafka.streams.kstream.Windowed; | ||
|
||
public final class KSPlanBuilder implements PlanBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few lines of class header comment would be awesome "This is an implementation of PlanBuilder which uses Kafka streams...."
public static <K> KStream<K, GenericRow> build( | ||
final KStream<K, GenericRow> kstream, | ||
final StreamFilter<KStream<K, GenericRow>> step, | ||
public static <K> StreamWithKeySerdeFactory<K> build( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of a general point. I notice that we use "build" in the codebase a lot. And often it's used where the builder pattern is not actually being used, rather, something is just being created. I'd suggest only using "build" where there's a real builder pattern in place, and using "create" in the case of a simple factory method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough. I'll defer this to another PR though to keep the scope of the changes here at a minimum.
final StreamSource<?> streamSource | ||
) { | ||
if (streamSource.getFormats().getKeyFormat().isWindowed()) { | ||
return (StreamWithKeySerdeFactory) buildWindowed(queryBuilder,streamSource); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -79,15 +79,20 @@ private StreamStreamJoinBuilder() { | |||
); | |||
final KsqlValueJoiner joiner = new KsqlValueJoiner(leftSchema, rightSchema); | |||
final JoinWindows joinWindows = JoinWindows.of(join.getBefore()).after(join.getAfter()); | |||
final KStream<K, GenericRow> result; | |||
switch (join.getJoinType()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of tangential - but I wonder why we don't support RIGHT joins? It would be a simple matter for us to just swap left and right here, rather than forcing the user to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure. What's the value over just doing a left join?
d688767
to
8f41e9d
Compare
This patch implements a visitor that iterates over the execution plan and builds the final kstreams app. In addition to defining and implementing the visitor, this required updating the type built by many of the plan nodes to a wrapper class that includes both a kstream/ ktable, and a factory for building key serdes. Now that we have this visior, we no longer need the code in SchemaKX that makes calls into kafka streams, so that's all cleaned up. Finally, we need to actually call the visitor to build the streams app. For now that's happening in PhysicalPlanBuilder, but that will get moved very soon.
8f41e9d
to
edfa8c9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, I think it looks good, left 2 minor comments.
) { | ||
final DataSourceType sourceType = (schemaKStream instanceof SchemaKTable) | ||
? DataSourceType.KTABLE | ||
: DataSourceType.KSTREAM; | ||
|
||
final DataSource<?> sinkDataSource; | ||
final PlanBuilder planBuilder = new KSPlanBuilder(ksqlQueryBuilder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (nit): Since a new new KSPlanBuilder() object is created in both buildPlanForStructuredOutputNode() and buildPlanForBareQuery(), this new object could instead be created in buildPlan() and passed into both methods. I don't feel too strongly about this though.
import org.apache.kafka.streams.kstream.KeyValueMapper; | ||
|
||
public final class TableGroupByBuilder { | ||
private TableGroupByBuilder() { | ||
} | ||
|
||
public static <K> KGroupedTable<Struct, GenericRow> build( | ||
final KTable<K, GenericRow> ktable, | ||
final KTableHolder<K> table, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Could this be renamed to tableHolder? This goes for other holder variables in all the other builders. It seems weird to have table.getTable() and stream.getStream() (it looks like we already have a table available, why would we need to call getTable() from it?)
Description
This patch implements a visitor that iterates over the execution plan
and builds the final kstreams app. In addition to defining and
implementing the visitor, this required updating the type built by
many of the plan nodes to a wrapper class that includes both a kstream/
ktable, and a factory for building key serdes.
Now that we have this visior, we no longer need the code in SchemaKX
that makes calls into kafka streams, so that's all cleaned up.
Finally, we need to actually call the visitor to build the streams app.
For now that's happening in PhysicalPlanBuilder, but that will get moved
very soon.
Testing done
Adapted execution plan builder tests to incorporate the new visitor.
Reviewer checklist