Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WindowStart and WindowEnd UDFs #1993

Merged
merged 18 commits into from
Oct 24, 2018

Conversation

big-andy-coates
Copy link
Contributor

@big-andy-coates big-andy-coates commented Oct 3, 2018

Description

Fixes #1674

Requires #2024 to be merged before it can be completed. Still to do (These will be done in a follow up PR):

  • UDF variant to cover non-aggregate uses.
  • Ensure straight queries work too.
  • documentation!

This PR adds two new UDAFs to allow access to the bounds of a windowed key: WindowStart() and WindowEnd(). These can be used to access the window bounds of the current window e.g.

-- Given an existing stream such as:
CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');

-- You can now write something like:
CREATE STREAM S1 as SELECT id, WindowStart(), WindowEnd() FROM test WINDOW TUMBLING (SIZE 30 SECONDS) group by id;

-- It also supports computation on these values too, e.g.
CREATE STREAM S2 as SELECT id, WindowStart() - 1000, WindowEnd() / 2 FROM test WINDOW TUMBLING (SIZE 30 SECONDS) group by id;

Background

Before v5.0 the message coming out from WINDOWed queries would have a timestamp equal to the start of the window. This was a bug in streams, which has been fixed, meaning that for v.50 the messages from the same queries came out with a timestamp equal to the timestamp of the message that caused the record to be output, i.e. not equal to the start of the window. This change broke our own clickstream demo and possibly other customer use-cases.

Our own clickstream demo was broken because we use a Connect SMT to extract the records timestamp and make it available, (as the window start time), in ElasticSearch.

Possible fixes

It was proposed that we could fix the regression by:

  1. Adding a ROWWINDOW implicit column to windowed queries, e.g. SELECT id, ROWWINDOW->start FROM test WINDOW TUMBLING (SIZE 5 SECONDS) GROUP BY id;
  2. Adding a WindowStart WindowEnd methods, e.g. SELECT id, WindowStart() FROM test WINDOW TUMBLING (SIZE 5 SECONDS) GROUP BY id;
  3. Adding a SMT for Connect to use to extract the window start from the key.

Of these, I've looked to implement the WindowStart approach. Why? I'll tell you:

  1. ROWWINDOW doesn't make conceptual sense as it is not a feature of the source data. The other implicit columns related to the source stream. For example, consider a query like SELECT id, ROWKEY, ROWWINDOW->start FROM test WINDOW ... . In this case ROWKEY is the key of row from the test source. But ROWWINDOW would not be from test, it would be the window details of the output, not the intput. So I think this would make ROWWINDOW very confusing / counterintuitive.

    To put this another way, selecting ROWTIME and ROWKEY would be selecting fields on the input schema. Where as ROWWINDOW would be on the output schema. You can have sources that might or might not already be windowed. So if you have a source that is already windowed, a user might understandably expect ROWWINDOW to be the window information of the input, not the output.

  2. Adding a SMT is tricky. We can't easily add a KSQL specific SMT to Apache Connect. We can have one implemented in the KSQL repo that's packaged to a jar that would need to be dropped into Connect's class path to work - but this sucks in terms of UX.

That leaves WindowStart()... now this probably also falls into the same category as ROWWINDOW in that normally the select statement is selecting data from the source, where as this is selecting from the output. However, for some reason it seems to fit better - and its also easier to implement. Though I'm still open to other suggestions!

Solution

I've implemented WindowStart and WindowEnd UDAFs. However, because this is selecting from the output, it can't actually be implemented using just UDAF. So the method itself has no implementation. It's just a placeholder so that WindowSelectMapper can detect where in the output row the window bounds should be put. This is necessary because the Windowed key is only available post the aggregate step, i.e. in the second SelectMapper.

Testing done

Manual + Json based test.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@big-andy-coates big-andy-coates requested a review from a team as a code owner October 3, 2018 13:50
@big-andy-coates big-andy-coates requested a review from rmoff October 3, 2018 13:51
@big-andy-coates
Copy link
Contributor Author

@rmoff @miguno - be good to get your thoughts / input on the approach...

@dguy
Copy link
Contributor

dguy commented Oct 3, 2018

I think the approach works. You can only get at the window information after the fact, so it makes sense to select it from the output. The window is still derived from the input, so i don't see any issues.
Compatibility wise it should be ok as it is just appending a new step to the end of the topology - it doesn't change any repartitions etc

@miguno
Copy link
Contributor

miguno commented Oct 3, 2018

I am ok with the outlined approach.

AFAICT there's no real prior art from the world of traditional SQL because the concept of windowing doesn't exist there.

FWIW, Calcite (another streaming SQL tool) follows a similar approach, cf. TUMBLE_START(...) and TUMBLE_END(...) at https://calcite.apache.org/docs/stream.html#tumbling-windows-improved.

Copy link
Contributor

@rmoff rmoff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for this (and +100 for rejecting the SMT approach 🙃 )

Also just to be clear:

This change broke our own clickstream demo and possibly other customer use-cases.

It broke things for many other people, all following the same pattern as our clickstream demo (and a very common pattern to take) - KSQL to aggregate data, and use the window start time as the basis for analytics on that aggregated data.

@mjsax
Copy link
Member

mjsax commented Oct 3, 2018

I don't think that TUMBLE_START() is a good name -- the function should be available for overlapping and session windows, too.

@big-andy-coates
Copy link
Contributor Author

@mjsax

I don't think that TUMBLE_START() is a good name -- the function should be available for overlapping and session windows, too.

The name for us will be WindowStart and WindowEnd or maybe Window_Start and Window_End - what do people reckon?

@rodesai
Copy link
Contributor

rodesai commented Oct 4, 2018

Overall this approach makes sense to me, as do the names WindowStart and WindowEnd.

@@ -0,0 +1,159 @@
{
"comments": [
"Test cases covering WindowStartTime UDAF"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a case to validate that the result can be used in expressions (e.g. something like SELECT ... WindowStart() / 1000 AS START_SECONDS ... )

Copy link
Contributor Author

@big-andy-coates big-andy-coates Oct 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a test case.

@hjafarpour
Copy link
Contributor

@big-andy-coates I like the idea of using windowStart and windowEnd UDAFs. The implementation, on the other hand, would be more complex than the one you have. When you have the UDAF in your query, it will be analyzed and planned in AggregateNode too and there are implications on the schema for each node in the logical and physical plan level. You need to consider these when planning the implementation path.

@mjsax
Copy link
Member

mjsax commented Oct 4, 2018

I discussed this this @hjafarpour because I was initially confused why we need a aggregation UDF bu not an regular UDF. We discussed the overall design and it seems this approach won't work atm. It might be better to implement a regular UDF that is applied to ROWKEY when reading the result, ie, you would need two queries: (1) the regular window-aggregation query (2) a query that read the result from the first query and extract the timestamp from ROWKEY with a regular UDF.

Thoughts?

@rodesai
Copy link
Contributor

rodesai commented Oct 4, 2018

I think the problem there is needing 2 queries. If you want to export the results of a windowed aggregation to an external sink, you'd need a whole other query to do so - which isn't ideal.

Using a UDAF basically gives you a marker in the intermediate schema of the aggregate node that you can use to swap the bogus result of the UDAF with the window start (or end).

Copy link
Contributor

@hjafarpour hjafarpour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work fine. The code will replace the value of the UDAF with the value extracted from the key and as long as the type return type for the UDAF is the same as the type for window start time schemas will be correct.
Left a minor comment.

public WindowSelectMapper(
final Map<Integer, KsqlAggregateFunction> aggFunctionsByIndex) {
this.windowSelects = aggFunctionsByIndex.entrySet().stream()
.filter(e -> e.getValue().getFunctionName().equals(WindowStartTimeKudaf.getFunctionName()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be case insensitive comparison.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@mjsax
Copy link
Member

mjsax commented Oct 4, 2018

Thanks to clarifying @rodesai and @hjafarpour.

@big-andy-coates big-andy-coates requested review from rodesai, dguy, hjafarpour and a team October 9, 2018 10:33
@big-andy-coates big-andy-coates changed the title Add WindowStartTime UDAF Add WindowStart and WindowEnd UDFs Oct 10, 2018
Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}

@Test
public void shouldNoUseSelectMapperForNonWindowed() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shouldNotUse

CONFLICT (content): Merge conflict in ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java
CONFLICT (content): Merge conflict in ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java
CONFLICT (content): Merge conflict in ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java
CONFLICT (content): Merge conflict in ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java
CONFLICT (content): Merge conflict in ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java
CONFLICT (content): Merge conflict in ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java
CONFLICT (content): Merge conflict in ksql-engine/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java
CONFLICT (content): Merge conflict in ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java
Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hjafarpour
Copy link
Contributor

hjafarpour commented Oct 19, 2018

@big-andy-coates We need to get this in 5.1. This currently has mater as target. Can you also change the target to 5.1.x branch before merging.

@big-andy-coates big-andy-coates merged commit 82b603d into confluentinc:master Oct 24, 2018
@big-andy-coates big-andy-coates deleted the window_start_time branch October 24, 2018 19:12
big-andy-coates added a commit to big-andy-coates/ksql that referenced this pull request Oct 24, 2018
* add WindowStartTime UDAF

(cherry picked from commit 82b603d)
big-andy-coates added a commit that referenced this pull request Oct 24, 2018
* add WindowStartTime UDAF

(cherry picked from commit 82b603d)
big-andy-coates added a commit to big-andy-coates/ksql that referenced this pull request Oct 24, 2018
…2090)

* add WindowStartTime UDAF

(cherry picked from commit 839b049)
big-andy-coates added a commit that referenced this pull request Oct 24, 2018
* add WindowStartTime UDAF

(cherry picked from commit 839b049)
@rmoff
Copy link
Contributor

rmoff commented Oct 29, 2018

🎉 awesome, thanks for doing this @big-andy-coates !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add a UDF to extract the window start time from the message key
7 participants