diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FlatMapNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FlatMapNode.java index a05a5606c2d4..3a29811dbe68 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FlatMapNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FlatMapNode.java @@ -39,7 +39,6 @@ import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.structured.SchemaKStream; -import io.confluent.ksql.util.KsqlException; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -71,9 +70,6 @@ public FlatMapNode( this.functionRegistry = functionRegistry; this.finalSelectExpressions = buildFinalSelectExpressions(); outputSchema = buildLogicalSchema(source.getSchema()); - if (analysis.getTableFunctions().size() > 1) { - throw new KsqlException("Only one table function per query currently is supported"); - } } @Override @@ -117,7 +113,7 @@ public SchemaKStream buildStream(final KsqlQueryBuilder builder) { return getSource().buildStream(builder).flatMap( outputSchema, - analysis.getTableFunctions().get(0), + analysis.getTableFunctions(), contextStacker ); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index f6261d038461..84ee7386dbdc 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -70,6 +70,7 @@ import io.confluent.ksql.util.IdentifierUtil; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.SchemaUtil; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -686,28 +687,32 @@ private SchemaKGroupedStream groupByKey( public SchemaKStream flatMap( final LogicalSchema outputSchema, - final FunctionCall functionCall, + final List tableFunctions, final QueryContext.Stacker contextStacker ) { - final ColumnReferenceExp exp = (ColumnReferenceExp)functionCall.getArguments().get(0); - final ColumnName columnName = exp.getReference().name(); - final ColumnRef ref = ColumnRef.withoutSource(columnName); - final OptionalInt indexInInput = getSchema().valueColumnIndex(ref); - if (!indexInInput.isPresent()) { - throw new IllegalArgumentException("Can't find input column " + columnName); + final List tableFunctionAppliers = new ArrayList<>(); + for (FunctionCall functionCall: tableFunctions) { + final ColumnReferenceExp exp = (ColumnReferenceExp)functionCall.getArguments().get(0); + final ColumnName columnName = exp.getReference().name(); + final ColumnRef ref = ColumnRef.withoutSource(columnName); + final OptionalInt indexInInput = getSchema().valueColumnIndex(ref); + if (!indexInInput.isPresent()) { + throw new IllegalArgumentException("Can't find input column " + columnName); + } + final KsqlTableFunction tableFunction = UdtfUtil.resolveTableFunction( + functionRegistry, + functionCall, + getSchema() + ); + final TableFunctionApplier tableFunctionApplier = + new TableFunctionApplier(tableFunction, indexInInput.getAsInt()); + tableFunctionAppliers.add(tableFunctionApplier); } - final KsqlTableFunction tableFunction = UdtfUtil.resolveTableFunction( - functionRegistry, - functionCall, - getSchema() - ); - final TableFunctionApplier functionHolder = - new TableFunctionApplier(tableFunction, indexInInput.getAsInt()); final StreamFlatMap step = ExecutionStepFactory.streamFlatMap( contextStacker, sourceStep, outputSchema, - functionHolder + tableFunctionAppliers ); return new SchemaKStream( step, diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/function/udtf/KudtfFlatMapper.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/function/udtf/KudtfFlatMapper.java index fdfa39cda1ea..65a470af0c58 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/function/udtf/KudtfFlatMapper.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/function/udtf/KudtfFlatMapper.java @@ -18,6 +18,7 @@ import com.google.errorprone.annotations.Immutable; import io.confluent.ksql.GenericRow; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Objects; import org.apache.kafka.streams.kstream.ValueMapper; @@ -28,20 +29,36 @@ @Immutable public class KudtfFlatMapper implements ValueMapper> { - private final TableFunctionApplier functionHolder; + private final List tableFunctionAppliers; - public KudtfFlatMapper(final TableFunctionApplier functionHolder) { - this.functionHolder = Objects.requireNonNull(functionHolder); + public KudtfFlatMapper(final List tableFunctionAppliers) { + this.tableFunctionAppliers = Objects.requireNonNull(tableFunctionAppliers); } + /* + This function zips results from multiple table functions together as described in KLIP-9 + in the design-proposals directory + */ @Override public Iterable apply(final GenericRow row) { - final List exploded = functionHolder.apply(row); - final List rows = new ArrayList<>(exploded.size()); - for (Object val : exploded) { - final List arrayList = new ArrayList<>(row.getColumns()); - arrayList.add(val); - rows.add(new GenericRow(arrayList)); + final List> iters = new ArrayList<>(tableFunctionAppliers.size()); + int maxLength = 0; + for (TableFunctionApplier applier: tableFunctionAppliers) { + final List exploded = applier.apply(row); + iters.add(exploded.iterator()); + maxLength = Math.max(maxLength, exploded.size()); + } + final List rows = new ArrayList<>(maxLength); + for (int i = 0; i < maxLength; i++) { + final List newRow = new ArrayList<>(row.getColumns()); + for (Iterator iter: iters) { + if (iter.hasNext()) { + newRow.add(iter.next()); + } else { + newRow.add(null); + } + } + rows.add(new GenericRow(newRow)); } return rows; } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamFlatMap.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamFlatMap.java index 94a5a8816e2e..f33a335f96dd 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamFlatMap.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamFlatMap.java @@ -25,16 +25,16 @@ public class StreamFlatMap implements ExecutionStep> { private final ExecutionStepProperties properties; private final ExecutionStep> source; - private final TableFunctionApplier functionHolder; + private final List tableFunctionAppliers; public StreamFlatMap( final ExecutionStepProperties properties, final ExecutionStep> source, - final TableFunctionApplier functionHolder + final List tableFunctionAppliers ) { this.properties = Objects.requireNonNull(properties, "properties"); this.source = Objects.requireNonNull(source, "source"); - this.functionHolder = Objects.requireNonNull(functionHolder); + this.tableFunctionAppliers = Objects.requireNonNull(tableFunctionAppliers); } @Override @@ -52,8 +52,8 @@ public KStreamHolder build(final PlanBuilder builder) { return builder.visitFlatMap(this); } - public TableFunctionApplier getFunctionHolder() { - return functionHolder; + public List getTableFunctionAppliers() { + return tableFunctionAppliers; } public ExecutionStep> getSource() { diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/function/udtf/KudtfFlatMapperTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/function/udtf/KudtfFlatMapperTest.java new file mode 100644 index 000000000000..217f45115e1f --- /dev/null +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/function/udtf/KudtfFlatMapperTest.java @@ -0,0 +1,80 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.function.udtf; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; + +import io.confluent.ksql.GenericRow; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class KudtfFlatMapperTest { + + @Test + public void shouldFlatMapOneFunction() { + // Given: + TableFunctionApplier applier = createApplier(Arrays.asList(10, 10, 10)); + List appliers = Arrays.asList(applier); + KudtfFlatMapper flatMapper = new KudtfFlatMapper(appliers); + GenericRow row = new GenericRow(1, 2, 3); + + // When: + Iterable iterable = flatMapper.apply(row); + + // Then: + Iterator iter = iterable.iterator(); + assertThat(iter.next().getColumns(), is(Arrays.asList(1, 2, 3, 10))); + assertThat(iter.next().getColumns(), is(Arrays.asList(1, 2, 3, 10))); + assertThat(iter.next().getColumns(), is(Arrays.asList(1, 2, 3, 10))); + assertThat(iter.hasNext(), is(false)); + } + + @Test + public void shouldZipTwoFunctions() { + // Given: + TableFunctionApplier applier1 = createApplier(Arrays.asList(10, 10, 10)); + TableFunctionApplier applier2 = createApplier(Arrays.asList(20, 20)); + List appliers = Arrays.asList(applier1, applier2); + KudtfFlatMapper flatMapper = new KudtfFlatMapper(appliers); + GenericRow row = new GenericRow(1, 2, 3); + + // When: + Iterable iterable = flatMapper.apply(row); + + // Then: + Iterator iter = iterable.iterator(); + assertThat(iter.next().getColumns(), is(Arrays.asList(1, 2, 3, 10, 20))); + assertThat(iter.next().getColumns(), is(Arrays.asList(1, 2, 3, 10, 20))); + assertThat(iter.next().getColumns(), is(Arrays.asList(1, 2, 3, 10, null))); + assertThat(iter.hasNext(), is(false)); + } + + private TableFunctionApplier createApplier(List list) { + TableFunctionApplier applier = mock(TableFunctionApplier.class); + Mockito.doReturn(list).when(applier).apply(any()); + return applier; + } + +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/explode.json b/ksql-functional-tests/src/test/resources/query-validation-tests/explode.json index 0d3d97847eaf..3ad5dc227eff 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/explode.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/explode.json @@ -53,17 +53,6 @@ "message": "Unsupported argument type for EXPLODE Schema{MAP}" } }, - { - "name": "shouldn't handle more than one table function", - "statements": [ - "CREATE STREAM TEST (MY_ARR ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT EXPLODE(MY_ARR), EXPLODE(MY_ARR) VAL FROM TEST;" - ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlException", - "message": "Only one table function per query currently is supported" - } - }, { "name": "shouldn't be able to have table functions with aggregation", "statements": [ diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/table-functions.json b/ksql-functional-tests/src/test/resources/query-validation-tests/table-functions.json index f7346914bc4f..0f3b4c784c2d 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/table-functions.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/table-functions.json @@ -97,6 +97,24 @@ "expectedException": { "type": "io.confluent.ksql.parser.exception.ParseFailedException" } + }, + { + "name": "multiple table functions", + "statements": [ + "CREATE STREAM TEST (MY_ARR1 ARRAY, MY_ARR2 ARRAY, BAR BIGINT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT EXPLODE(MY_ARR1), EXPLODE(MY_ARR2) FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "MY_ARR1": [1, 2], "MY_ARR2": [10, 11, 12]}}, + {"topic": "test_topic", "key": 1, "value": {"ID": 1, "MY_ARR1": [3, 4], "MY_ARR2": [20]}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1, "KSQL_COL_1": 10}}, + {"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 2, "KSQL_COL_1": 11}}, + {"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": null, "KSQL_COL_1": 12}}, + {"topic": "OUTPUT", "key": "1", "value": {"KSQL_COL_0": 3, "KSQL_COL_1": 20}}, + {"topic": "OUTPUT", "key": "1", "value": {"KSQL_COL_0": 4, "KSQL_COL_1": null}} + ] } ] } \ No newline at end of file diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java index 452638160180..fe26cfd8576c 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java @@ -147,13 +147,13 @@ public static StreamFlatMap streamFlatMap( final QueryContext.Stacker stacker, final ExecutionStep> source, final LogicalSchema resultSchema, - final TableFunctionApplier functionHolder + final List tableFunctionAppliers ) { final QueryContext queryContext = stacker.getQueryContext(); return new StreamFlatMap<>( new DefaultExecutionStepProperties(resultSchema, queryContext), source, - functionHolder + tableFunctionAppliers ); } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java index e740f353fc8a..a64d5f7c486e 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java @@ -28,7 +28,7 @@ public static KStreamHolder build( final KStreamHolder stream, final StreamFlatMap step) { return stream.withStream(stream.getStream().flatMapValues( - new KudtfFlatMapper(step.getFunctionHolder()))); + new KudtfFlatMapper(step.getTableFunctionAppliers()))); } } \ No newline at end of file