Skip to content

Commit

Permalink
Add WindowStart and WindowEnd UDFs (confluentinc#1993)
Browse files Browse the repository at this point in the history
* add WindowStartTime UDAF
  • Loading branch information
big-andy-coates authored Oct 24, 2018
1 parent 112b202 commit 82b603d
Show file tree
Hide file tree
Showing 29 changed files with 1,269 additions and 271 deletions.
6 changes: 6 additions & 0 deletions docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1289,6 +1289,12 @@ Aggregate functions
+------------------------+---------------------------+------------+---------------------------------------------------------------------+
| TOPKDISTINCT | ``TOPKDISTINCT(col1, k)`` | Stream | Return the distinct Top *K* values for the given column and window |
+------------------------+---------------------------+------------+---------------------------------------------------------------------+
| WindowStart | ``WindowStart()`` | Stream | Extract the start time of the current window, in milliseconds. |
| | | Table | If the query is not windowed the function will return null. |
+------------------------+---------------------------+------------+---------------------------------------------------------------------+
| WindowEnd | ``WindowEnd()`` | Stream | Extract the end time of the current window, in milliseconds. |
| | | Table | If the query is not windowed the function will return null. |
+------------------------+---------------------------+------------+---------------------------------------------------------------------+

.. _ksql_key_requirements:

Expand Down
15 changes: 15 additions & 0 deletions docs/tutorials/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,21 @@ counting/aggregation step per region.
WINDOW SESSION (60 SECONDS) \
GROUP BY regionid;
Sometimes you may want to include the bounds of the current window in the result so that it is
more easily accessible to consumers of the data. The statement below extracts the start and
end time of the current session window into fields within output rows.

.. code:: sql
CREATE TABLE pageviews_per_region_per_session AS
SELECT regionid,
windowStart(),
windowEnd(),
count(*)
FROM pageviews_enriched
WINDOW SESSION (60 SECONDS)
GROUP BY regionid;
Working with arrays and maps
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@

public class AggregateAnalyzer extends DefaultTraversalVisitor<Node, AnalysisContext> {

private AggregateAnalysis aggregateAnalysis;
private Analysis analysis;
private FunctionRegistry functionRegistry;
private final AggregateAnalysis aggregateAnalysis;
private final Analysis analysis;
private final FunctionRegistry functionRegistry;

private boolean hasAggregateFunction = false;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.ksql.function.udaf.placeholder;

import io.confluent.ksql.function.udaf.TableUdaf;

/**
* A no-op {@link TableUdaf} that is used as a placeholder for some later hardcoded computation.
*/
public final class PlaceholderTableUdaf implements TableUdaf<Long, Long> {

public static final PlaceholderTableUdaf INSTANCE = new PlaceholderTableUdaf();

private PlaceholderTableUdaf(){
}

@Override
public Long undo(final Long valueToUndo, final Long aggregateValue) {
return null;
}

@Override
public Long initialize() {
return null;
}

@Override
public Long aggregate(final Long value, final Long aggregate) {
return null;
}

@Override
public Long merge(final Long aggOne, final Long aggTwo) {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.ksql.function.udaf.window;

import io.confluent.ksql.function.udaf.TableUdaf;
import io.confluent.ksql.function.udaf.UdafDescription;
import io.confluent.ksql.function.udaf.UdafFactory;
import io.confluent.ksql.function.udaf.placeholder.PlaceholderTableUdaf;

/**
* A placeholder KUDAF for extracting window end times.
*
* <p>The KUDAF itself does nothing. It's just a placeholder.
*
* @see WindowSelectMapper
*/
@SuppressWarnings("WeakerAccess") // Invoked via reflection.
@UdafDescription(name = "WindowEnd", author = "Confluent",
description = "Returns the window end time, in milliseconds, for the given record. "
+ "If the given record is not part of a window the function will return NULL.")
public final class WindowEndKudaf {

private WindowEndKudaf() {
}

static String getFunctionName() {
return "WindowEnd";
}

@UdafFactory(description = "Extracts the window end time")
public static TableUdaf<Long, Long> createWindowEnd() {
return PlaceholderTableUdaf.INSTANCE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.ksql.function.udaf.window;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.function.KsqlAggregateFunction;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;

/**
* Used to handle the special cased {@link WindowStartKudaf} and {@link WindowEndKudaf}.
*/
public final class WindowSelectMapper
implements ValueMapperWithKey<Windowed<?>, GenericRow, GenericRow> {

private static final Map<String, Type> WINDOW_FUNCTION_NAMES = ImmutableMap.of(
WindowStartKudaf.getFunctionName().toUpperCase(), Type.StartTime,
WindowEndKudaf.getFunctionName().toUpperCase(), Type.EndTime
);

private final Map<Integer, Type> windowSelects;

public WindowSelectMapper(
final Map<Integer, KsqlAggregateFunction> aggFunctionsByIndex) {
this.windowSelects = aggFunctionsByIndex.entrySet().stream()
.filter(e ->
WINDOW_FUNCTION_NAMES.containsKey(e.getValue().getFunctionName().toUpperCase()))
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> WINDOW_FUNCTION_NAMES.get(e.getValue().getFunctionName().toUpperCase())));
}

public boolean hasSelects() {
return !windowSelects.isEmpty();
}

@Override
public GenericRow apply(final Windowed<?> readOnlyKey, final GenericRow row) {
final Window window = readOnlyKey.window();

windowSelects.forEach((index, type) ->
row.getColumns().set(index, type.mapper.apply(window)));

return row;
}

private enum Type {
StartTime(Window::start), EndTime(Window::end);

private final Function<Window, Object> mapper;

Type(final Function<Window, Object> mapper) {
this.mapper = mapper;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.ksql.function.udaf.window;

import io.confluent.ksql.function.udaf.TableUdaf;
import io.confluent.ksql.function.udaf.UdafDescription;
import io.confluent.ksql.function.udaf.UdafFactory;
import io.confluent.ksql.function.udaf.placeholder.PlaceholderTableUdaf;

/**
* A placeholder KUDAF for extracting window start times.
*
* <p>The KUDAF itself does nothing. It's just a placeholder.
*
* @see WindowSelectMapper
*/
@SuppressWarnings("WeakerAccess") // Invoked via reflection.
@UdafDescription(name = "WindowStart", author = "Confluent",
description = "Returns the window start time, in milliseconds, for the given record. "
+ "If the given record is not part of a window the function will return NULL.")
public final class WindowStartKudaf {

private WindowStartKudaf() {
}

static String getFunctionName() {
return "WindowStart";
}

@UdafFactory(description = "Extracts the window start time")
public static TableUdaf<Long, Long> createWindowStart() {
return PlaceholderTableUdaf.INSTANCE;
}
}
Loading

0 comments on commit 82b603d

Please sign in to comment.