-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add WindowStart and WindowEnd UDFs (#1993)
* add WindowStartTime UDAF
- Loading branch information
1 parent
112b202
commit 82b603d
Showing
29 changed files
with
1,269 additions
and
271 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
50 changes: 50 additions & 0 deletions
50
...ngine/src/main/java/io/confluent/ksql/function/udaf/placeholder/PlaceholderTableUdaf.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Copyright 2018 Confluent Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.function.udaf.placeholder; | ||
|
||
import io.confluent.ksql.function.udaf.TableUdaf; | ||
|
||
/** | ||
* A no-op {@link TableUdaf} that is used as a placeholder for some later hardcoded computation. | ||
*/ | ||
public final class PlaceholderTableUdaf implements TableUdaf<Long, Long> { | ||
|
||
public static final PlaceholderTableUdaf INSTANCE = new PlaceholderTableUdaf(); | ||
|
||
private PlaceholderTableUdaf(){ | ||
} | ||
|
||
@Override | ||
public Long undo(final Long valueToUndo, final Long aggregateValue) { | ||
return null; | ||
} | ||
|
||
@Override | ||
public Long initialize() { | ||
return null; | ||
} | ||
|
||
@Override | ||
public Long aggregate(final Long value, final Long aggregate) { | ||
return null; | ||
} | ||
|
||
@Override | ||
public Long merge(final Long aggOne, final Long aggTwo) { | ||
return null; | ||
} | ||
} |
48 changes: 48 additions & 0 deletions
48
ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowEndKudaf.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
/* | ||
* Copyright 2018 Confluent Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.function.udaf.window; | ||
|
||
import io.confluent.ksql.function.udaf.TableUdaf; | ||
import io.confluent.ksql.function.udaf.UdafDescription; | ||
import io.confluent.ksql.function.udaf.UdafFactory; | ||
import io.confluent.ksql.function.udaf.placeholder.PlaceholderTableUdaf; | ||
|
||
/** | ||
* A placeholder KUDAF for extracting window end times. | ||
* | ||
* <p>The KUDAF itself does nothing. It's just a placeholder. | ||
* | ||
* @see WindowSelectMapper | ||
*/ | ||
@SuppressWarnings("WeakerAccess") // Invoked via reflection. | ||
@UdafDescription(name = "WindowEnd", author = "Confluent", | ||
description = "Returns the window end time, in milliseconds, for the given record. " | ||
+ "If the given record is not part of a window the function will return NULL.") | ||
public final class WindowEndKudaf { | ||
|
||
private WindowEndKudaf() { | ||
} | ||
|
||
static String getFunctionName() { | ||
return "WindowEnd"; | ||
} | ||
|
||
@UdafFactory(description = "Extracts the window end time") | ||
public static TableUdaf<Long, Long> createWindowEnd() { | ||
return PlaceholderTableUdaf.INSTANCE; | ||
} | ||
} |
75 changes: 75 additions & 0 deletions
75
ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowSelectMapper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
/* | ||
* Copyright 2018 Confluent Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.function.udaf.window; | ||
|
||
import com.google.common.collect.ImmutableMap; | ||
import io.confluent.ksql.GenericRow; | ||
import io.confluent.ksql.function.KsqlAggregateFunction; | ||
import java.util.Map; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
import org.apache.kafka.streams.kstream.ValueMapperWithKey; | ||
import org.apache.kafka.streams.kstream.Window; | ||
import org.apache.kafka.streams.kstream.Windowed; | ||
|
||
/** | ||
* Used to handle the special cased {@link WindowStartKudaf} and {@link WindowEndKudaf}. | ||
*/ | ||
public final class WindowSelectMapper | ||
implements ValueMapperWithKey<Windowed<?>, GenericRow, GenericRow> { | ||
|
||
private static final Map<String, Type> WINDOW_FUNCTION_NAMES = ImmutableMap.of( | ||
WindowStartKudaf.getFunctionName().toUpperCase(), Type.StartTime, | ||
WindowEndKudaf.getFunctionName().toUpperCase(), Type.EndTime | ||
); | ||
|
||
private final Map<Integer, Type> windowSelects; | ||
|
||
public WindowSelectMapper( | ||
final Map<Integer, KsqlAggregateFunction> aggFunctionsByIndex) { | ||
this.windowSelects = aggFunctionsByIndex.entrySet().stream() | ||
.filter(e -> | ||
WINDOW_FUNCTION_NAMES.containsKey(e.getValue().getFunctionName().toUpperCase())) | ||
.collect(Collectors.toMap( | ||
Map.Entry::getKey, | ||
e -> WINDOW_FUNCTION_NAMES.get(e.getValue().getFunctionName().toUpperCase()))); | ||
} | ||
|
||
public boolean hasSelects() { | ||
return !windowSelects.isEmpty(); | ||
} | ||
|
||
@Override | ||
public GenericRow apply(final Windowed<?> readOnlyKey, final GenericRow row) { | ||
final Window window = readOnlyKey.window(); | ||
|
||
windowSelects.forEach((index, type) -> | ||
row.getColumns().set(index, type.mapper.apply(window))); | ||
|
||
return row; | ||
} | ||
|
||
private enum Type { | ||
StartTime(Window::start), EndTime(Window::end); | ||
|
||
private final Function<Window, Object> mapper; | ||
|
||
Type(final Function<Window, Object> mapper) { | ||
this.mapper = mapper; | ||
} | ||
} | ||
} |
48 changes: 48 additions & 0 deletions
48
ksql-engine/src/main/java/io/confluent/ksql/function/udaf/window/WindowStartKudaf.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
/* | ||
* Copyright 2018 Confluent Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.function.udaf.window; | ||
|
||
import io.confluent.ksql.function.udaf.TableUdaf; | ||
import io.confluent.ksql.function.udaf.UdafDescription; | ||
import io.confluent.ksql.function.udaf.UdafFactory; | ||
import io.confluent.ksql.function.udaf.placeholder.PlaceholderTableUdaf; | ||
|
||
/** | ||
* A placeholder KUDAF for extracting window start times. | ||
* | ||
* <p>The KUDAF itself does nothing. It's just a placeholder. | ||
* | ||
* @see WindowSelectMapper | ||
*/ | ||
@SuppressWarnings("WeakerAccess") // Invoked via reflection. | ||
@UdafDescription(name = "WindowStart", author = "Confluent", | ||
description = "Returns the window start time, in milliseconds, for the given record. " | ||
+ "If the given record is not part of a window the function will return NULL.") | ||
public final class WindowStartKudaf { | ||
|
||
private WindowStartKudaf() { | ||
} | ||
|
||
static String getFunctionName() { | ||
return "WindowStart"; | ||
} | ||
|
||
@UdafFactory(description = "Extracts the window start time") | ||
public static TableUdaf<Long, Long> createWindowStart() { | ||
return PlaceholderTableUdaf.INSTANCE; | ||
} | ||
} |
Oops, something went wrong.