Skip to content

Commit

Permalink
feat: Implement EXPLODE(ARRAY) for single table function in SELECT (#…
Browse files Browse the repository at this point in the history
…3589)

This is the first PR in a series to implement table functions. This PR implements the EXPLODE function.
  • Loading branch information
purplefox authored Oct 25, 2019
1 parent 8621594 commit 8b52aa8
Show file tree
Hide file tree
Showing 58 changed files with 1,662 additions and 252 deletions.
2 changes: 1 addition & 1 deletion ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ public void shouldDescribeOverloadedScalarFunction() {
public void shouldDescribeAggregateFunction() {
final String expectedSummary =
"Name : TOPK\n" +
"Author : confluent\n" +
"Author : Confluent\n" +
"Type : aggregate\n" +
"Jar : internal\n" +
"Variations : \n";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.function.udf.UdfMetadata;
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlConstants;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
Expand All @@ -38,7 +39,14 @@ public abstract class AggregateFunctionFactory {
.build();

public AggregateFunctionFactory(final String functionName) {
this(new UdfMetadata(functionName, "", "confluent", "", KsqlFunction.INTERNAL_PATH, false));
this(new UdfMetadata(
functionName,
"",
KsqlConstants.CONFLUENT_AUTHOR,
"",
KsqlFunction.INTERNAL_PATH,
false
));
}

public AggregateFunctionFactory(final UdfMetadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ public interface FunctionRegistry {
*/
boolean isAggregate(String functionName);

/**
* Test if the supplied {@code functionName} is a table function.
*
* <p>Note: unknown functions result in {@code false} return value.
*
* @param functionName the name of the function to test
* @return {@code true} if it is a table function, {@code false} otherwise.
*/
boolean isTableFunction(String functionName);

/**
* Get the factory for a UDF.
*
Expand Down Expand Up @@ -72,6 +82,8 @@ public interface FunctionRegistry {
KsqlAggregateFunction<?, ?, ?> getAggregateFunction(String functionName, Schema argumentType,
AggregateFunctionInitArguments initArgs);

KsqlTableFunction<?, ?> getTableFunction(String functionName, Schema argumentType);

/**
* @return all UDF factories.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public interface FunctionSignature {
* @return whether or not to consider the final argument in
* {@link #getArguments()} as variadic
*/
boolean isVariadic();
default boolean isVariadic() {
return false;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,4 @@ public interface KsqlAggregateFunction<I, A, O> extends FunctionSignature {
Function<A, O> getResultMapper();

String getDescription();

@Override
default boolean isVariadic() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function;

import io.confluent.ksql.schema.ksql.types.SqlType;
import java.util.List;
import org.apache.kafka.connect.data.Schema;

public interface KsqlTableFunction<I, O> extends FunctionSignature {

Schema getReturnType();

SqlType returnType();

List<O> flatMap(I input);

String getDescription();
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ public interface MutableFunctionRegistry extends FunctionRegistry {
* @throws KsqlException if a function, (of any type), with the same name exists.
*/
void addAggregateFunctionFactory(AggregateFunctionFactory aggregateFunctionFactory);

void addTableFunctionFactory(TableFunctionFactory tableFunctionFactory);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function;

import io.confluent.ksql.function.udf.UdfMetadata;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.connect.data.Schema;


public abstract class TableFunctionFactory {

private final UdfMetadata metadata;

public TableFunctionFactory(final UdfMetadata metadata) {
this.metadata = Objects.requireNonNull(metadata, "metadata can't be null");
}

public abstract KsqlTableFunction<?, ?> createTableFunction(List<Schema> argTypeList);

protected abstract List<List<Schema>> supportedArgs();

public String getName() {
return metadata.getName();
}

public String getDescription() {
return metadata.getDescription();
}

public String getPath() {
return metadata.getPath();
}

public String getAuthor() {
return metadata.getAuthor();
}

public String getVersion() {
return metadata.getVersion();
}

public boolean isInternal() {
return metadata.isInternal();
}
}
20 changes: 19 additions & 1 deletion ksql-common/src/main/java/io/confluent/ksql/name/ColumnName.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,29 @@
public final class ColumnName extends Name<ColumnName> {

private static final String AGGREGATE_COLUMN_PREFIX = "KSQL_AGG_VARIABLE_";
private static final String GENERATED_ALIAS_PREFIX = "KSQL_COL_";
private static final String SYNTHESISED_COLUMN_PREFIX = "KSQL_SYNTH_";

public static ColumnName aggregate(final int idx) {
public static ColumnName aggregateColumn(final int idx) {
return of(AGGREGATE_COLUMN_PREFIX + idx);
}

/**
* Where the user hasn't specified an alias for an expression in a SELECT we generate them
* using this method. This value is exposed to the user in the output schema
*/
public static ColumnName generatedColumnAlias(final int idx) {
return ColumnName.of(GENERATED_ALIAS_PREFIX + idx);
}

/**
* Used to generate a column name in an intermediate schema, e.g. for a column to hold
* values of a table function. These are never exposed to the user
*/
public static ColumnName synthesisedSchemaColumn(final int idx) {
return ColumnName.of(SYNTHESISED_COLUMN_PREFIX + idx);
}

public static ColumnName of(final String name) {
return new ColumnName(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;
package io.confluent.ksql.analyzer;

import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter;
import io.confluent.ksql.engine.rewrite.ExpressionTreeRewriter.Context;
Expand Down Expand Up @@ -45,7 +45,7 @@ public Optional<Expression> visitFunctionCall(
final ExpressionTreeRewriter.Context<Void> context) {
final String functionName = node.getName().name();
if (functionRegistry.isAggregate(functionName)) {
final ColumnName aggVarName = ColumnName.aggregate(aggVariableIndex);
final ColumnName aggVarName = ColumnName.aggregateColumn(aggVariableIndex);
aggVariableIndex++;
return Optional.of(
new ColumnReferenceExp(node.getLocation(), ColumnRef.withoutSource(aggVarName)));
Expand Down
11 changes: 11 additions & 0 deletions ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KsqlStream;
Expand All @@ -45,6 +46,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
Expand All @@ -66,6 +68,7 @@ public class Analysis {
private Optional<Expression> havingExpression = Optional.empty();
private OptionalInt limitClause = OptionalInt.empty();
private CreateSourceAsProperties withProperties = CreateSourceAsProperties.none();
private final List<FunctionCall> tableFunctions = new ArrayList<>();

public Analysis(final ResultMaterialization resultMaterialization) {
this.resultMaterialization = requireNonNull(resultMaterialization, "resultMaterialization");
Expand Down Expand Up @@ -206,6 +209,14 @@ public CreateSourceAsProperties getProperties() {
return withProperties;
}

void addTableFunction(final FunctionCall functionCall) {
this.tableFunctions.add(Objects.requireNonNull(functionCall));
}

public List<FunctionCall> getTableFunctions() {
return tableFunctions;
}

@Immutable
public static final class Into {

Expand Down
37 changes: 37 additions & 0 deletions ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.expression.tree.TraversalExpressionVisitor;
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.windows.KsqlWindowExpression;
Expand Down Expand Up @@ -516,6 +517,7 @@ protected AstNode visitSelect(final Select node, final Void context) {
} else if (selectItem instanceof SingleColumn) {
final SingleColumn column = (SingleColumn) selectItem;
addSelectItem(column.getExpression(), column.getAlias());
visitTableFunctions(column.getExpression());
} else {
throw new IllegalArgumentException(
"Unsupported SelectItem type: " + selectItem.getClass().getName());
Expand Down Expand Up @@ -626,6 +628,41 @@ public Void visitColumnReference(
analysis.addSelectItem(exp, columnName);
analysis.addSelectColumnRefs(columnRefs);
}

private void visitTableFunctions(final Expression expression) {
final TableFunctionVisitor visitor = new TableFunctionVisitor();
visitor.process(expression, null);
}

private final class TableFunctionVisitor extends TraversalExpressionVisitor<Void> {

private Optional<String> tableFunctionName = Optional.empty();

@Override
public Void visitFunctionCall(final FunctionCall functionCall, final Void context) {
final String functionName = functionCall.getName().name();
final boolean isTableFunction = metaStore.isTableFunction(functionName);

if (isTableFunction) {
if (tableFunctionName.isPresent()) {
throw new KsqlException("Table functions cannot be nested: "
+ tableFunctionName.get() + "(" + functionName + "())");
}

tableFunctionName = Optional.of(functionName);

analysis.addTableFunction(functionCall);
}

super.visitFunctionCall(functionCall, context);

if (isTableFunction) {
tableFunctionName = Optional.empty();
}

return null;
}
}
}

@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.util.AggregateExpressionRewriter;
import io.confluent.ksql.util.KsqlException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -175,6 +174,10 @@ private static void enforceAggregateRules(
return;
}

if (!analysis.getTableFunctions().isEmpty()) {
throw new KsqlException("Table functions cannot be used with aggregations.");
}

if (aggregateAnalysis.getAggregateFunctions().isEmpty()) {
throw new KsqlException(
"GROUP BY requires columns using aggregate functions in SELECT clause.");
Expand Down
41 changes: 21 additions & 20 deletions ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,30 @@ class QueryEngine {
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
this.processingLogContext = Objects.requireNonNull(
processingLogContext,
"processingLogContext");
"processingLogContext"
);
this.queryIdGenerator = Objects.requireNonNull(queryIdGenerator, "queryIdGenerator");
}

static OutputNode buildQueryLogicalPlan(
final Query query,
final Optional<Sink> sink,
final MetaStore metaStore,
final KsqlConfig config
) {
final String outputPrefix = config.getString(KsqlConfig.KSQL_OUTPUT_TOPIC_NAME_PREFIX_CONFIG);

final Set<SerdeOption> defaultSerdeOptions = SerdeOptions.buildDefaults(config);

final QueryAnalyzer queryAnalyzer =
new QueryAnalyzer(metaStore, outputPrefix, defaultSerdeOptions);

final Analysis analysis = queryAnalyzer.analyze(query, sink);
final AggregateAnalysisResult aggAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);

return new LogicalPlanner(config, analysis, aggAnalysis, metaStore).buildPlan();
}

PhysicalPlan<?> buildPhysicalPlan(
final LogicalPlanNode logicalPlanNode,
final KsqlConfig ksqlConfig,
Expand All @@ -84,23 +104,4 @@ PhysicalPlan<?> buildPhysicalPlan(

return physicalPlanBuilder.buildPhysicalPlan(logicalPlanNode);
}

static OutputNode buildQueryLogicalPlan(
final Query query,
final Optional<Sink> sink,
final MetaStore metaStore,
final KsqlConfig config
) {
final String outputPrefix = config.getString(KsqlConfig.KSQL_OUTPUT_TOPIC_NAME_PREFIX_CONFIG);

final Set<SerdeOption> defaultSerdeOptions = SerdeOptions.buildDefaults(config);

final QueryAnalyzer queryAnalyzer =
new QueryAnalyzer(metaStore, outputPrefix, defaultSerdeOptions);

final Analysis analysis = queryAnalyzer.analyze(query, sink);
final AggregateAnalysisResult aggAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);

return new LogicalPlanner(config, analysis, aggAnalysis, metaStore).buildPlan();
}
}
Loading

0 comments on commit 8b52aa8

Please sign in to comment.