From 5b9c45d0d9a60e3755380d482a625ec89e2fb4bc Mon Sep 17 00:00:00 2001 From: Tim Fox Date: Mon, 28 Oct 2019 09:52:32 -0700 Subject: [PATCH] fix: A few tweaks --- .../java/io/confluent/ksql/function/FunctionRegistry.java | 3 ++- .../java/io/confluent/ksql/function/KsqlTableFunction.java | 4 ++++ .../java/io/confluent/ksql/function/BaseTableFunction.java | 3 +++ .../io/confluent/ksql/function/DynamicFunctionInvoker.java | 2 +- .../io/confluent/ksql/function/InternalFunctionRegistry.java | 5 ----- .../io/confluent/ksql/function/UdtfTableFunctionFactory.java | 3 +++ .../java/io/confluent/ksql/function/udtf/array/Explode.java | 4 ++++ .../test/java/io/confluent/ksql/function/udf/TestUdtf.java | 2 +- .../java/io/confluent/ksql/function/udf/UdfParameter.java | 2 +- 9 files changed, 19 insertions(+), 9 deletions(-) diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/FunctionRegistry.java b/ksql-common/src/main/java/io/confluent/ksql/function/FunctionRegistry.java index 7780d9e5fafc..744a6c1b8ba5 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/function/FunctionRegistry.java +++ b/ksql-common/src/main/java/io/confluent/ksql/function/FunctionRegistry.java @@ -80,7 +80,8 @@ public interface FunctionRegistry { * @throws KsqlException on unknown UDAF, or on unsupported {@code argumentType}. */ KsqlAggregateFunction getAggregateFunction( - String functionName, Schema argumentType, + String functionName, + Schema argumentType, AggregateFunctionInitArguments initArgs ); diff --git a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlTableFunction.java b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlTableFunction.java index 96fffb32f2bf..5eb7d73b67ff 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/function/KsqlTableFunction.java +++ b/ksql-common/src/main/java/io/confluent/ksql/function/KsqlTableFunction.java @@ -19,6 +19,10 @@ import java.util.List; import org.apache.kafka.connect.data.Schema; +/** + * A wrapper around the actual table function which provides methods to get return type and + * description, and allows the function to be invoked. + */ public interface KsqlTableFunction extends FunctionSignature { Schema getReturnType(); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/BaseTableFunction.java b/ksql-engine/src/main/java/io/confluent/ksql/function/BaseTableFunction.java index 20876391f209..a81833daecc5 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/BaseTableFunction.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/BaseTableFunction.java @@ -24,6 +24,9 @@ import java.util.Objects; import org.apache.kafka.connect.data.Schema; +/** + * Abstract base class for table functions + */ @Immutable public abstract class BaseTableFunction implements KsqlTableFunction { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/DynamicFunctionInvoker.java b/ksql-engine/src/main/java/io/confluent/ksql/function/DynamicFunctionInvoker.java index c98d571e0ca9..14757bf263ea 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/DynamicFunctionInvoker.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/DynamicFunctionInvoker.java @@ -21,7 +21,7 @@ import java.lang.reflect.TypeVariable; /** - * An implementation of UdfInvoker which invokes the UDF dynamically using reflection + * An implementation of UdfInvoker which invokes the UDF using reflection */ public class DynamicFunctionInvoker implements FunctionInvoker { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java b/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java index 74f1a277bcaf..a84ca1707050 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java @@ -244,7 +244,6 @@ private void init() { addJsonFunctions(); addStructFieldFetcher(); addUdafFunctions(); - addUdtfFunctions(); } private void addStringFunctions() { @@ -392,10 +391,6 @@ private void addUdafFunctions() { functionRegistry.addAggregateFunctionFactory(new TopkDistinctAggFunctionFactory()); } - private void addUdtfFunctions() { - //functionRegistry.addTableFunctionFactory(new ExplodeFunctionFactory()); - } - private void addBuiltInFunction(final KsqlFunction ksqlFunction) { addBuiltInFunction(ksqlFunction, false); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/UdtfTableFunctionFactory.java b/ksql-engine/src/main/java/io/confluent/ksql/function/UdtfTableFunctionFactory.java index bbd346a5658b..d7876a8e02cd 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/UdtfTableFunctionFactory.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/UdtfTableFunctionFactory.java @@ -20,6 +20,9 @@ import java.util.stream.Collectors; import org.apache.kafka.connect.data.Schema; +/** + * A table function factory used for creating user defined table functions. + */ public class UdtfTableFunctionFactory extends TableFunctionFactory { private final UdfIndex udtfIndex; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/Explode.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/Explode.java index e9fefd990add..7f8f0a777e6c 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/Explode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/Explode.java @@ -22,6 +22,10 @@ import java.util.Collections; import java.util.List; +/** + * Implementation of the 'explode' table function. This table function takes an array of values and + * explodes it into zero or more rows, one for each value in the array. + */ @UdtfDescription(name = "explode", author = KsqlConstants.CONFLUENT_AUTHOR, description = "Explodes an array. This function outputs one value for each element of the array.") diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/TestUdtf.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/TestUdtf.java index c49960e67244..69a474573591 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/TestUdtf.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/TestUdtf.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 Confluent Inc. + * Copyright 2019 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the diff --git a/ksql-udf/src/main/java/io/confluent/ksql/function/udf/UdfParameter.java b/ksql-udf/src/main/java/io/confluent/ksql/function/udf/UdfParameter.java index 7ab4dbf664d2..b267f295fbb3 100644 --- a/ksql-udf/src/main/java/io/confluent/ksql/function/udf/UdfParameter.java +++ b/ksql-udf/src/main/java/io/confluent/ksql/function/udf/UdfParameter.java @@ -23,7 +23,7 @@ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.PARAMETER}) /* - * Optionally, applied to @Udf function parameters. + * Optionally, applied to @Udf or @Udtf function parameters. */ public @interface UdfParameter {