-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: move groupBy into plan builders #3359
feat: move groupBy into plan builders #3359
Conversation
e39dfc9
to
f6212f9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rodesai
As per other PRs there are issues with the generics and use of raw types in this that I'd like to see cleaned up before I can approve.
@@ -152,6 +153,28 @@ public KsqlQueryBuilder withKsqlConfig(final KsqlConfig newConfig) { | |||
); | |||
} | |||
|
|||
@SuppressWarnings("unchecked") | |||
public KeySerde<Object> buildKeySerde( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per other PRs - the generics are incorrect
import java.util.Objects; | ||
|
||
@Immutable | ||
public class StreamGroupByKey<S, G> implements ExecutionStep<G> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to get some javadocs on what the generic parameters are for each step type.
import java.util.Objects; | ||
|
||
@Immutable | ||
public class StreamGroupByKey<S, G> implements ExecutionStep<G> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're grouping by the existing key, would not S
and G
be the same by definition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These were originally meant to be the full stream and table types. I've switched it to just be key types now (in this PR for just the grouped steps, but I'll do that for all the steps). So this will go away.
ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableGroupBy.java
Show resolved
Hide resolved
import org.apache.kafka.streams.kstream.KeyValueMapper; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
class GroupByMapper implements KeyValueMapper<Object, GenericRow, Struct> { | ||
class GroupByMapper implements KeyValueMapper<Object, GenericRow, Object> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we changing the return value on this mapper? It's always going to be a Struct
...
.withMetaAndKeyColsInValue(); | ||
private static final PhysicalSchema PHYSICAL_SCHEMA = PhysicalSchema.from(SCHEMA, SerdeOption.none()); | ||
|
||
private final List<Expression> groupByExpressions = ImmutableList.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could be static.
private final List<Expression> groupByExpressions = ImmutableList.of( | |
private static final List<Expression> GROUP_BY_EXPRESSIONS = ImmutableList.of( |
Likely others could too.
@Mock | ||
private FunctionRegistry functionRegistry; | ||
@Mock | ||
private GroupedFactory groupedFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use of raw types - can we please use generic types.
} | ||
|
||
@Test | ||
@SuppressWarnings("unchecked") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider moving to class scope?
.withMetaAndKeyColsInValue(); | ||
private static final PhysicalSchema PHYSICAL_SCHEMA = PhysicalSchema.from(SCHEMA, SerdeOption.none()); | ||
|
||
private final List<Expression> groupByExpressions = ImmutableList.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can be static, likely others too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can be static, likely others too.
@Mock | ||
private FunctionRegistry functionRegistry; | ||
@Mock | ||
private GroupedFactory groupedFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raw types - can we please use generic type.s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raw types - can we please use generic types. Same for ExecutionStep
ea95693
to
2ab4aa3
Compare
6c6fefc
to
9b7d2a2
Compare
This patch moves the code for regrouping streams/tables into plan builders. This also required adding a new execution step for groupByKey, which we missed the first go-round.
9b7d2a2
to
d7579a0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rodesai LGTM 'cept a few nits
@@ -66,7 +78,7 @@ public boolean equals(final Object o) { | |||
if (o == null || getClass() != o.getClass()) { | |||
return false; | |||
} | |||
final StreamGroupBy<?, ?> that = (StreamGroupBy<?, ?>) o; | |||
final StreamGroupBy that = (StreamGroupBy) o; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raw type...
|
||
@Override | ||
public int hashCode() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: blank line
ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableGroupBy.java
Show resolved
Hide resolved
|
||
@Immutable | ||
public class TableGroupBy<T, G> implements ExecutionStep<G> { | ||
public class TableGroupBy<K> implements ExecutionStep<KGroupedTable<Struct, GenericRow>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like others, I think we can drop the generic K
from this type and just use ?
. K
is the generic type of the source for this step, it doesn't make sense to have this on the type itself.
.withMetaAndKeyColsInValue(); | ||
private static final PhysicalSchema PHYSICAL_SCHEMA = PhysicalSchema.from(SCHEMA, SerdeOption.none()); | ||
|
||
private final List<Expression> groupByExpressions = ImmutableList.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can be static, likely others too.
@Mock | ||
private FunctionRegistry functionRegistry; | ||
@Mock | ||
private GroupedFactory groupedFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raw types - can we please use generic types. Same for ExecutionStep
Description
This patch moves the code for regrouping streams/tables into plan
builders. This also required adding a new execution step for
groupByKey, which we missed the first go-round.
Testing done
Adds tests for the new grouped step builders.
Reviewer checklist