Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move setup of the sink to plan builders #3360

Merged
merged 3 commits into from
Sep 20, 2019

Conversation

rodesai
Copy link
Contributor

@rodesai rodesai commented Sep 16, 2019

Description

This patch moves stream and table sinks (SchemaKX.into) into execution
plan builders. This change also moves computation of the rowkey/rowtime
indexes to be excluded from the output into the plan builder.

Testing done

Added unit tests for step builders.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@rodesai rodesai requested a review from a team as a code owner September 16, 2019 18:32
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rodesai

Few nits, but my main concern is the incorrect use of generics in this new code. I think we need to think about how we want the generics on these new types, and on ExecutionStep to work.

I hope you agree we shouldn't be committing new code with incorrect use of generics and raw types.

Happy to jump on a call and talk it through with you if you want.

@@ -135,6 +136,28 @@ public KsqlQueryBuilder withKsqlConfig(final KsqlConfig newConfig) {
.push(context);
}

@SuppressWarnings("unchecked")
public KeySerde<Object> buildKeySerde(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per comment in other PR the generics on this are wrong.

#3361 (comment)

}

public static void build(
final KStream<Object, GenericRow> kstream,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong generics: kstream would be either KStream<Windowed<Struct>, GenericRow> or KStream<Struct, GenericRow>. Neither of which can be cast to KStream<Object, GenericRow>.

Maybe this method should be something like:

  public static <K> void build(
      final KStream<K, GenericRow> kstream,
      final StreamSink<KStream<K, GenericRow>> streamSink,
      final KsqlQueryBuilder queryBuilder) {

???

private TableSinkBuilder() {
}

public static void build(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, wrong generics.

@Mock
private KsqlQueryBuilder queryBuilder;
@Mock
private KStream stream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test using raw types - can we please use generic types.

@Mock
private KsqlQueryBuilder queryBuilder;
@Mock
private KTable table;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test using raw types - can we please use generic types.

@rodesai rodesai force-pushed the move-sink-to-plan-builder branch from 26c94e3 to 0729c81 Compare September 18, 2019 08:24
@rodesai rodesai force-pushed the move-sink-to-plan-builder branch 2 times, most recently from 54af8fa to 5d1a002 Compare September 19, 2019 04:46
StreamSinkBuilder.build(
kstream,
step,
(fmt, schema, ctx) -> keySerde,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we build the streams app from a visitor, we'll actually get a factory from the source node. For now, we can just pass in the serde we have.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think that the Logical model shouldn't know anything about serializers. The source node should know about schemas, format and serde options only. The physical builder should be able to convert these to a serde, or a serde factory.

Copy link
Contributor Author

@rodesai rodesai Sep 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@big-andy-coates that's what I mean. By source node I mean the "source execution step of this step". So the physical builder will do something like:

class KStreamAndKeySerdeFactory<K> {
    KStream<K, GenericRow> kstream;
    KeySerdeFactory<K> factory;
}

KStreamAndKeySerdeFactory<K> visitStreamSink(final StreamSink<K> streamSink) {
    final KStreamAndKeySerdeFactory<K> k = streamSink.getSource().accept(this);
    StreamSinkBuilder.build(streamSink.kstream, streamSink, streamSink.factory, ...);
    ...
}

@rodesai rodesai force-pushed the move-sink-to-plan-builder branch from 5d1a002 to 20a0905 Compare September 19, 2019 05:15
@big-andy-coates big-andy-coates requested a review from a team September 19, 2019 15:55
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rodesai looks much better!

return null;
}
final List<Object> columns = new ArrayList<>();
for (int i = 0; i < row.getColumns().size(); i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No pertaining to this PR directly, but sometimes rowkeyIndexes is empty. In such a situation we don't actually need the map step. Removing this would obviously change out topology, which is a pain, but maybe we could do:

 mapValues(row -> {
              if (row == null || rowkeyIndexes.isEmpty()) {
                return row;
              }

             ...
}

To avoid the unnecessary copy of the row into another list? Just a small optimisation.

@Captor
private ArgumentCaptor<ValueMapper<GenericRow, GenericRow>> mapperCaptor;

private final QueryContext queryContext =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not use a mock context?

private StreamSink<Struct> sink;

@Rule
public final MockitoRule mockitoRule = MockitoJUnit.rule();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of MockitoRule triggers spotbugs errors for me locally. Can we use @RunWith(MockitoJUnitRunner.class) instead please?

private TableSink<Struct> sink;

@Rule
public final MockitoRule mockitoRule = MockitoJUnit.rule();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

This patch moves stream and table sinks (SchemaKX.into) into execution
plan builders. This change also moves computation of the rowkey/rowtime
indexes to be excluded from the output into the plan builder.
@rodesai rodesai force-pushed the move-sink-to-plan-builder branch from 20a0905 to c777323 Compare September 19, 2019 19:49
@rodesai rodesai merged commit bfbdc20 into confluentinc:master Sep 20, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants