Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WindowStart and WindowEnd UDFs #1993

Merged
merged 18 commits into from
Oct 24, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.ksql.function.udaf.window;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.function.KsqlAggregateFunction;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;

/**
* Used to handle the special cased {@link WindowStartTimeKudaf}
*/
public final class WindowSelectMapper
implements ValueMapperWithKey<Windowed<?>, GenericRow, GenericRow> {

private final Map<Integer, Type> windowSelects;

public WindowSelectMapper(
final Map<Integer, KsqlAggregateFunction> aggFunctionsByIndex) {
this.windowSelects = aggFunctionsByIndex.entrySet().stream()
.filter(e -> e.getValue().getFunctionName().equals(WindowStartTimeKudaf.getFunctionName()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be case insensitive comparison.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

.collect(Collectors.toMap(Map.Entry::getKey, e -> Type.StartTime)); // Todo(ac): End time
}

public boolean hasSelects() {
return !windowSelects.isEmpty();
}

@Override
public GenericRow apply(final Windowed<?> readOnlyKey, final GenericRow row) {
final Window window = readOnlyKey.window();

windowSelects.forEach((index, type) ->
row.getColumns().set(index, type.mapper.apply(window)));

return row;
}

private enum Type {
StartTime(Window::start), EndTime(Window::end);

private final Function<Window, Object> mapper;

Type(final Function<Window, Object> mapper) {
this.mapper = mapper;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.ksql.function.udaf.window;

import io.confluent.ksql.function.udaf.TableUdaf;
import io.confluent.ksql.function.udaf.UdafDescription;
import io.confluent.ksql.function.udaf.UdafFactory;

/**
* A placeholder KUDAF for extracting window start times.
*
* <p>The KUDAF itself does nothing.
*
* @see WindowSelectMapper
*/
@UdafDescription(name = "WindowStartTime", author = "Confluent",
description = "Returns the window start time, in milliseconds, for the given record. "
+ "If the given record is not part of a window the function will return NULL.")
public final class WindowStartTimeKudaf {

private WindowStartTimeKudaf() {
}

static String getFunctionName() {
return "WindowStartTime"; // Todo(ac): Test that ensures this matches annotation.
}

@UdafFactory(description = "Extracts the window start time")
public static TableUdaf<Long, Long> createWindowStart() {
return new TableUdaf<Long, Long>() {
@Override
public Long undo(final Long valueToUndo, final Long aggregateValue) {
return null;
}

@Override
public Long initialize() {
return null;
}

@Override
public Long aggregate(final Long value, final Long aggregate) {
return null;
}

@Override
public Long merge(final Long aggOne, final Long aggTwo) {
return null;
}
};
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright 2017 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -22,6 +22,7 @@
import io.confluent.ksql.function.KsqlAggregateFunction;
import io.confluent.ksql.function.UdafAggregator;
import io.confluent.ksql.function.udaf.KudafAggregator;
import io.confluent.ksql.function.udaf.window.WindowSelectMapper;
import io.confluent.ksql.parser.tree.KsqlWindowExpression;
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.util.KsqlConfig;
Expand All @@ -37,6 +38,7 @@
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.WindowStore;

public class SchemaKGroupedStream {
Expand Down Expand Up @@ -78,10 +80,11 @@ public SchemaKTable aggregate(
final Map<Integer, Integer> aggValToValColumnMap,
final WindowExpression windowExpression,
final Serde<GenericRow> topicValueSerDe) {
final KTable aggKtable;
KTable aggKtable;
final UdafAggregator aggregator = new KudafAggregator(
aggValToFunctionMap, aggValToValColumnMap);
if (windowExpression != null) {

if (windowExpression != null) { // Todo(ac): refactor into methods.
final Materialized<String, GenericRow, ?> materialized
= Materialized.<String, GenericRow, WindowStore<Bytes, byte[]>>with(
Serdes.String(), topicValueSerDe);
Expand All @@ -93,6 +96,13 @@ public SchemaKTable aggregate(
aggregator,
materialized
);

// Todo(ac): Inject & test
final WindowSelectMapper windowSelectMapper = new WindowSelectMapper(aggValToFunctionMap);
if (windowSelectMapper.hasSelects()) {
aggKtable = aggKtable.mapValues((readOnlyKey, value) ->
windowSelectMapper.apply((Windowed<?>) readOnlyKey, (GenericRow) value));
}
} else {
aggKtable = kgroupedStream.aggregate(
initializer,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
{
"comments": [
"Test cases covering WindowStartTime UDAF"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a case to validate that the result can be used in expressions (e.g. something like SELECT ... WindowStart() / 1000 AS START_SECONDS ... )

Copy link
Contributor Author

@big-andy-coates big-andy-coates Oct 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a test case.

],
"tests": [
{
"name": "table session",
"statements": [
"CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');",
"CREATE TABLE S2 as SELECT id, WindowStartTime() FROM test WINDOW SESSION (30 SECONDS) group by id;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 0},
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 10000},
{"topic": "test_topic", "key": 1, "value": "1", "timestamp": 10000},
{"topic": "test_topic", "key": 1, "value": "1", "timestamp": 40000}
],
"outputs": [
{"topic": "S2", "key": 0, "value": "0,0", "timestamp": 0, "window": {"start": 0, "end": 30000}},
{"topic": "S2", "key": 0, "value": null, "timestamp": 10000, "window": {"start": 0, "end": 30000}},
{"topic": "S2", "key": 0, "value": "0,0", "timestamp": 10000, "window": {"start": 0, "end": 40000}},
{"topic": "S2", "key": 1, "value": "1,10000", "timestamp": 10000, "window": {"start": 10000, "end": 40000}},
{"topic": "S2", "key": 1, "value": null, "timestamp": 40000, "window": {"start": 10000, "end": 40000}},
{"topic": "S2", "key": 1, "value": "1,10000", "timestamp": 40000, "window": {"start": 10000, "end": 70000}}
]
},
{
"name": "stream session",
"statements": [
"CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');",
"CREATE STREAM S2 as SELECT id, WindowStartTime() FROM test WINDOW SESSION (30 SECONDS) group by id;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 0},
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 10000},
{"topic": "test_topic", "key": 1, "value": "1", "timestamp": 10000},
{"topic": "test_topic", "key": 1, "value": "1", "timestamp": 40000}
],
"outputs": [
{"topic": "S2", "key": 0, "value": "0,0", "timestamp": 0, "window": {"start": 0, "end": 30000}},
{"topic": "S2", "key": 0, "value": null, "timestamp": 10000, "window": {"start": 0, "end": 30000}},
{"topic": "S2", "key": 0, "value": "0,0", "timestamp": 10000, "window": {"start": 0, "end": 40000}},
{"topic": "S2", "key": 1, "value": "1,10000", "timestamp": 10000, "window": {"start": 10000, "end": 40000}},
{"topic": "S2", "key": 1, "value": null, "timestamp": 40000, "window": {"start": 10000, "end": 40000}},
{"topic": "S2", "key": 1, "value": "1,10000", "timestamp": 40000, "window": {"start": 10000, "end": 70000}}
]
},
{
"name": "table tumbling",
"statements": [
"CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');",
"CREATE TABLE S2 as SELECT id, WindowStartTime() FROM test WINDOW TUMBLING (SIZE 30 SECONDS) group by id;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 0},
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 10000},
{"topic": "test_topic", "key": 100, "value": "100", "timestamp": 30000},
{"topic": "test_topic", "key": 100, "value": "100", "timestamp": 45000},
{"topic": "test_topic", "key": 100, "value": "100", "timestamp": 50000},
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 35000},
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 70000}
],
"outputs": [
{"topic": "S2", "key": 0, "value": "0,0", "timestamp": 0, "window": {"start": 0, "end": 30000}},
{"topic": "S2", "key": 0, "value": "0,0", "timestamp": 10000, "window": {"start": 0, "end": 30000}},
{"topic": "S2", "key": 100, "value": "100,30000", "timestamp": 30000, "window": {"start": 30000, "end": 60000}},
{"topic": "S2", "key": 100, "value": "100,30000", "timestamp": 45000, "window": {"start": 30000, "end": 60000}},
{"topic": "S2", "key": 100, "value": "100,30000", "timestamp": 50000, "window": {"start": 30000, "end": 60000}},
{"topic": "S2", "key": 0, "value": "0,30000", "timestamp": 35000, "window": {"start": 30000, "end": 60000}},
{"topic": "S2", "key": 0, "value": "0,60000", "timestamp": 70000, "window": {"start": 60000, "end": 90000}}
]
},
{
"name": "stream tumbling",
"statements": [
"CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');",
"CREATE STREAM S2 as SELECT id, WindowStartTime() FROM test WINDOW TUMBLING (SIZE 30 SECONDS) group by id;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 0},
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 10000},
{"topic": "test_topic", "key": 100, "value": "100", "timestamp": 30000},
{"topic": "test_topic", "key": 100, "value": "100", "timestamp": 45000},
{"topic": "test_topic", "key": 100, "value": "100", "timestamp": 50000},
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 35000},
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 70000}
],
"outputs": [
{"topic": "S2", "key": 0, "value": "0,0", "timestamp": 0, "window": {"start": 0, "end": 30000}},
{"topic": "S2", "key": 0, "value": "0,0", "timestamp": 10000, "window": {"start": 0, "end": 30000}},
{"topic": "S2", "key": 100, "value": "100,30000", "timestamp": 30000, "window": {"start": 30000, "end": 60000}},
{"topic": "S2", "key": 100, "value": "100,30000", "timestamp": 45000, "window": {"start": 30000, "end": 60000}},
{"topic": "S2", "key": 100, "value": "100,30000", "timestamp": 50000, "window": {"start": 30000, "end": 60000}},
{"topic": "S2", "key": 0, "value": "0,30000", "timestamp": 35000, "window": {"start": 30000, "end": 60000}},
{"topic": "S2", "key": 0, "value": "0,60000", "timestamp": 70000, "window": {"start": 60000, "end": 90000}}
]
},
{
"name": "table hopping",
"statements": [
"CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');",
"CREATE TABLE S2 as SELECT id, WindowStartTime() FROM test WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 5 SECONDS) group by id;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 0},
{"topic": "test_topic", "key": 100, "value": "100", "timestamp": 2000},
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 4999},
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 5000}
],
"outputs": [
{"topic": "S2", "key": 0, "value": "0,0", "timestamp": 0, "window": {"start": 0, "end": 30000}},
{"topic": "S2", "key": 100, "value": "100,0", "timestamp": 2000, "window": {"start": 0, "end": 30000}},
{"topic": "S2", "key": 0, "value": "0,0", "timestamp": 4999, "window": {"start": 0, "end": 30000}},
{"topic": "S2", "key": 0, "value": "0,0", "timestamp": 5000, "window": {"start": 0, "end": 30000}},
{"topic": "S2", "key": 0, "value": "0,5000", "timestamp": 5000, "window": {"start": 5000, "end": 35000}}
]
},
{
"name": "stream hopping",
"statements": [
"CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');",
"CREATE STREAM S2 as SELECT id, WindowStartTime() FROM test WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 5 SECONDS) group by id;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 0},
{"topic": "test_topic", "key": 100, "value": "100", "timestamp": 2000},
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 4999},
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 5000}
],
"outputs": [
{"topic": "S2", "key": 0, "value": "0,0", "timestamp": 0, "window": {"start": 0, "end": 30000}},
{"topic": "S2", "key": 100, "value": "100,0", "timestamp": 2000, "window": {"start": 0, "end": 30000}},
{"topic": "S2", "key": 0, "value": "0,0", "timestamp": 4999, "window": {"start": 0, "end": 30000}},
{"topic": "S2", "key": 0, "value": "0,0", "timestamp": 5000, "window": {"start": 0, "end": 30000}},
{"topic": "S2", "key": 0, "value": "0,5000", "timestamp": 5000, "window": {"start": 5000, "end": 35000}}
]
},
{
"name": "none",
"comment" : "Without a WINDOW statement the methods will return NULL",
"statements": [
"CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');",
"CREATE STREAM S2 as SELECT id, WindowStartTime() FROM test group by id;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 0},
{"topic": "test_topic", "key": 100, "value": "100", "timestamp": 2000},
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 4999},
{"topic": "test_topic", "key": 0, "value": "0", "timestamp": 5000}
],
"outputs": [
{"topic": "S2", "key": 0, "value": "0,", "timestamp": 0},
{"topic": "S2", "key": 100, "value": "100,", "timestamp": 2000},
{"topic": "S2", "key": 0, "value": "0,", "timestamp": 4999},
{"topic": "S2", "key": 0, "value": "0,", "timestamp": 5000}
]
}
]
}