Skip to content

Commit

Permalink
feat: Implement user defined table functions (#3687)
Browse files Browse the repository at this point in the history
Users can now create their own table functions using annotations in a very similar way to how UDFs are created.
  • Loading branch information
purplefox authored Oct 29, 2019
1 parent 939c45a commit e62bd46
Show file tree
Hide file tree
Showing 36 changed files with 1,202 additions and 492 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package io.confluent.ksql.benchmark;

import io.confluent.ksql.function.UdfInvoker;
import io.confluent.ksql.function.FunctionInvoker;
import io.confluent.ksql.function.UdfLoader;
import io.confluent.ksql.function.udf.PluggableUdf;
import java.lang.reflect.Method;
Expand Down Expand Up @@ -71,7 +71,7 @@ private Method createMethod(final String methodName, final Class<?>... params) {

private PluggableUdf createPluggableUdf(final Method method) {
try {
final UdfInvoker invoker = UdfLoader.createUdfInvoker(method);
final FunctionInvoker invoker = UdfLoader.createFunctionInvoker(method);
return new PluggableUdf(invoker, this);
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public interface FunctionRegistry {

/**
* Get the factory for a UDAF.
*
* @param functionName the name of the function
* @return the factory.
* @throws KsqlException on unknown UDAF.
Expand All @@ -64,9 +65,8 @@ public interface FunctionRegistry {
* Get an instance of an aggregate function.
*
* <p>The current assumption is that all aggregate functions take a single argument for
* computing the aggregate at runtime.
* For functions that have no runtime arguments pass {@link #DEFAULT_FUNCTION_ARG_SCHEMA} for the
* {@code argumentType} parameter.
* computing the aggregate at runtime. For functions that have no runtime arguments pass {@link
* #DEFAULT_FUNCTION_ARG_SCHEMA} for the {@code argumentType} parameter.
*
* <p>Some aggregate functions also take initialisation arguments, e.g.
* <code> SELECT TOPK(AGE, 5) FROM PEOPLE</code>.
Expand All @@ -79,10 +79,21 @@ public interface FunctionRegistry {
* @return the function instance.
* @throws KsqlException on unknown UDAF, or on unsupported {@code argumentType}.
*/
KsqlAggregateFunction<?, ?, ?> getAggregateFunction(String functionName, Schema argumentType,
AggregateFunctionInitArguments initArgs);
KsqlAggregateFunction<?, ?, ?> getAggregateFunction(
String functionName,
Schema argumentType,
AggregateFunctionInitArguments initArgs
);

KsqlTableFunction<?, ?> getTableFunction(String functionName, Schema argumentType);
/**
* Get a table function.
*
* @param functionName the name of the function.
* @param argumentTypes the schemas of the arguments.
* @return the function instance.
* @throws KsqlException on unknown table function, or on unsupported {@code argumentType}.
*/
KsqlTableFunction getTableFunction(String functionName, List<Schema> argumentTypes);

/**
* @return all UDF factories.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
import java.util.List;
import org.apache.kafka.connect.data.Schema;

public interface KsqlTableFunction<I, O> extends FunctionSignature {
/**
* A wrapper around the actual table function which provides methods to get return type and
* description, and allows the function to be invoked.
*/
public interface KsqlTableFunction extends FunctionSignature {

Schema getReturnType();

SqlType returnType();

List<O> flatMap(I input);
List<?> apply(Object... args);

String getDescription();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ public interface MutableFunctionRegistry extends FunctionRegistry {
* Ensure the supplied function factory is registered.
*
* <p>The method will register the factory if a factory with the same name is not already
* registered. If a factory with the same name is already registered the method will throw
* if the two factories not are equivalent, (see {@link UdfFactory#matches(UdfFactory)}.
* registered. If a factory with the same name is already registered the method will throw if the
* two factories not are equivalent, (see {@link UdfFactory#matches(UdfFactory)}.
*
* @param factory the factory to register.
* @return the udf factory.
* @throws KsqlException if a UDAF function with the same name exists, or if an incompatible UDF
* function factory already exists.
* function factory already exists.
*/
UdfFactory ensureFunctionFactory(UdfFactory factory);

Expand All @@ -52,5 +52,11 @@ public interface MutableFunctionRegistry extends FunctionRegistry {
*/
void addAggregateFunctionFactory(AggregateFunctionFactory aggregateFunctionFactory);

/**
* Register a table function factory.
*
* @param tableFunctionFactory the factory to register
* @throws KsqlException if a function, (of any type), with the same name exists.
*/
void addTableFunctionFactory(TableFunctionFactory tableFunctionFactory);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public TableFunctionFactory(final UdfMetadata metadata) {
this.metadata = Objects.requireNonNull(metadata, "metadata can't be null");
}

public abstract KsqlTableFunction<?, ?> createTableFunction(List<Schema> argTypeList);
public abstract KsqlTableFunction createTableFunction(List<Schema> argTypeList);

protected abstract List<List<Schema>> supportedArgs();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
import java.util.Objects;
import org.apache.kafka.connect.data.Schema;

/**
* Abstract base class for table functions
*/
@Immutable
public abstract class BaseTableFunction<I, O> implements KsqlTableFunction<I, O> {
public abstract class BaseTableFunction implements KsqlTableFunction {

private static final ConnectToSqlTypeConverter CONNECT_TO_SQL_CONVERTER
= SchemaConverters.connectToSqlConverter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@
import java.lang.reflect.TypeVariable;

/**
* An implementation of UdfInvoker which invokes the UDF dynamically using reflection
* An implementation of UdfInvoker which invokes the UDF using reflection
*/
public class DynamicUdfInvoker implements UdfInvoker {
public class DynamicFunctionInvoker implements FunctionInvoker {

private final Method method;

DynamicUdfInvoker(final Method method) {
DynamicFunctionInvoker(final Method method) {
final Class<?>[] types = method.getParameterTypes();
for (int i = 0; i < types.length; i++) {
if (method.getParameterTypes()[i].isArray()
&& (!method.isVarArgs() || i != method.getParameterCount() - 1)) {
throw new KsqlFunctionException(
"Invalid UDF method signature (contains non var-arg array): " + method);
"Invalid function method signature (contains non var-arg array): " + method);
}
if (method.getGenericParameterTypes()[i] instanceof TypeVariable
|| method.getGenericParameterTypes()[i] instanceof GenericArrayType) {
Expand All @@ -52,7 +52,7 @@ public Object eval(final Object udf, final Object... args) {
final Object[] extractedArgs = extractArgs(args);
return method.invoke(udf, extractedArgs);
} catch (Exception e) {
throw new KsqlFunctionException("Failed to invoke udf " + method, e);
throw new KsqlFunctionException("Failed to invoke function " + method, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package io.confluent.ksql.function;

/**
* This interface is used to invoke UDFs
* This interface is used to invoke UDFs and UDTFs
*/
public interface UdfInvoker {
public interface FunctionInvoker {

/**
* Call onto an UDF instance with the expected args. This is providing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.confluent.ksql.function.udf.string.LenKudf;
import io.confluent.ksql.function.udf.string.TrimKudf;
import io.confluent.ksql.function.udf.string.UCaseKudf;
import io.confluent.ksql.function.udtf.array.ExplodeFunctionFactory;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
Expand Down Expand Up @@ -128,14 +127,14 @@ public synchronized KsqlAggregateFunction getAggregateFunction(
@Override
public synchronized KsqlTableFunction getTableFunction(
final String functionName,
final Schema argumentType
final List<Schema> argumentTypes
) {
final TableFunctionFactory udtfFactory = udtfs.get(functionName.toUpperCase());
if (udtfFactory == null) {
throw new KsqlException("No table function with name " + functionName + " exists!");
}

return udtfFactory.createTableFunction(Collections.singletonList(argumentType));
return udtfFactory.createTableFunction(argumentTypes);
}

@Override
Expand Down Expand Up @@ -245,7 +244,6 @@ private void init() {
addJsonFunctions();
addStructFieldFetcher();
addUdafFunctions();
addUdtfFunctions();
}

private void addStringFunctions() {
Expand Down Expand Up @@ -393,10 +391,6 @@ private void addUdafFunctions() {
functionRegistry.addAggregateFunctionFactory(new TopkDistinctAggFunctionFactory());
}

private void addUdtfFunctions() {
functionRegistry.addTableFunctionFactory(new ExplodeFunctionFactory());
}

private void addBuiltInFunction(final KsqlFunction ksqlFunction) {
addBuiltInFunction(ksqlFunction, false);
}
Expand Down
Loading

0 comments on commit e62bd46

Please sign in to comment.