Skip to content

Commit

Permalink
feat: Support multiple table functions in queries (#3685)
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored Oct 29, 2019
1 parent 417494a commit 44be5a2
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.util.KsqlException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -71,9 +70,6 @@ public FlatMapNode(
this.functionRegistry = functionRegistry;
this.finalSelectExpressions = buildFinalSelectExpressions();
outputSchema = buildLogicalSchema(source.getSchema());
if (analysis.getTableFunctions().size() > 1) {
throw new KsqlException("Only one table function per query currently is supported");
}
}

@Override
Expand Down Expand Up @@ -117,7 +113,7 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {

return getSource().buildStream(builder).flatMap(
outputSchema,
analysis.getTableFunctions().get(0),
analysis.getTableFunctions(),
contextStacker
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import io.confluent.ksql.util.IdentifierUtil;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.SchemaUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -686,28 +687,32 @@ private SchemaKGroupedStream groupByKey(

public SchemaKStream<K> flatMap(
final LogicalSchema outputSchema,
final FunctionCall functionCall,
final List<FunctionCall> tableFunctions,
final QueryContext.Stacker contextStacker
) {
final ColumnReferenceExp exp = (ColumnReferenceExp)functionCall.getArguments().get(0);
final ColumnName columnName = exp.getReference().name();
final ColumnRef ref = ColumnRef.withoutSource(columnName);
final OptionalInt indexInInput = getSchema().valueColumnIndex(ref);
if (!indexInInput.isPresent()) {
throw new IllegalArgumentException("Can't find input column " + columnName);
final List<TableFunctionApplier> tableFunctionAppliers = new ArrayList<>();
for (FunctionCall functionCall: tableFunctions) {
final ColumnReferenceExp exp = (ColumnReferenceExp)functionCall.getArguments().get(0);
final ColumnName columnName = exp.getReference().name();
final ColumnRef ref = ColumnRef.withoutSource(columnName);
final OptionalInt indexInInput = getSchema().valueColumnIndex(ref);
if (!indexInInput.isPresent()) {
throw new IllegalArgumentException("Can't find input column " + columnName);
}
final KsqlTableFunction tableFunction = UdtfUtil.resolveTableFunction(
functionRegistry,
functionCall,
getSchema()
);
final TableFunctionApplier tableFunctionApplier =
new TableFunctionApplier(tableFunction, indexInInput.getAsInt());
tableFunctionAppliers.add(tableFunctionApplier);
}
final KsqlTableFunction tableFunction = UdtfUtil.resolveTableFunction(
functionRegistry,
functionCall,
getSchema()
);
final TableFunctionApplier functionHolder =
new TableFunctionApplier(tableFunction, indexInInput.getAsInt());
final StreamFlatMap<K> step = ExecutionStepFactory.streamFlatMap(
contextStacker,
sourceStep,
outputSchema,
functionHolder
tableFunctionAppliers
);
return new SchemaKStream<K>(
step,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.GenericRow;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.streams.kstream.ValueMapper;
Expand All @@ -28,20 +29,36 @@
@Immutable
public class KudtfFlatMapper implements ValueMapper<GenericRow, Iterable<GenericRow>> {

private final TableFunctionApplier functionHolder;
private final List<TableFunctionApplier> tableFunctionAppliers;

public KudtfFlatMapper(final TableFunctionApplier functionHolder) {
this.functionHolder = Objects.requireNonNull(functionHolder);
public KudtfFlatMapper(final List<TableFunctionApplier> tableFunctionAppliers) {
this.tableFunctionAppliers = Objects.requireNonNull(tableFunctionAppliers);
}

/*
This function zips results from multiple table functions together as described in KLIP-9
in the design-proposals directory
*/
@Override
public Iterable<GenericRow> apply(final GenericRow row) {
final List<Object> exploded = functionHolder.apply(row);
final List<GenericRow> rows = new ArrayList<>(exploded.size());
for (Object val : exploded) {
final List<Object> arrayList = new ArrayList<>(row.getColumns());
arrayList.add(val);
rows.add(new GenericRow(arrayList));
final List<Iterator<Object>> iters = new ArrayList<>(tableFunctionAppliers.size());
int maxLength = 0;
for (TableFunctionApplier applier: tableFunctionAppliers) {
final List<Object> exploded = applier.apply(row);
iters.add(exploded.iterator());
maxLength = Math.max(maxLength, exploded.size());
}
final List<GenericRow> rows = new ArrayList<>(maxLength);
for (int i = 0; i < maxLength; i++) {
final List<Object> newRow = new ArrayList<>(row.getColumns());
for (Iterator<Object> iter: iters) {
if (iter.hasNext()) {
newRow.add(iter.next());
} else {
newRow.add(null);
}
}
rows.add(new GenericRow(newRow));
}
return rows;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ public class StreamFlatMap<K> implements ExecutionStep<KStreamHolder<K>> {

private final ExecutionStepProperties properties;
private final ExecutionStep<KStreamHolder<K>> source;
private final TableFunctionApplier functionHolder;
private final List<TableFunctionApplier> tableFunctionAppliers;

public StreamFlatMap(
final ExecutionStepProperties properties,
final ExecutionStep<KStreamHolder<K>> source,
final TableFunctionApplier functionHolder
final List<TableFunctionApplier> tableFunctionAppliers
) {
this.properties = Objects.requireNonNull(properties, "properties");
this.source = Objects.requireNonNull(source, "source");
this.functionHolder = Objects.requireNonNull(functionHolder);
this.tableFunctionAppliers = Objects.requireNonNull(tableFunctionAppliers);
}

@Override
Expand All @@ -52,8 +52,8 @@ public KStreamHolder<K> build(final PlanBuilder builder) {
return builder.visitFlatMap(this);
}

public TableFunctionApplier getFunctionHolder() {
return functionHolder;
public List<TableFunctionApplier> getTableFunctionAppliers() {
return tableFunctionAppliers;
}

public ExecutionStep<KStreamHolder<K>> getSource() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.function.udtf;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;

import io.confluent.ksql.GenericRow;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class KudtfFlatMapperTest {

@Test
public void shouldFlatMapOneFunction() {
// Given:
TableFunctionApplier applier = createApplier(Arrays.asList(10, 10, 10));
List<TableFunctionApplier> appliers = Arrays.asList(applier);
KudtfFlatMapper flatMapper = new KudtfFlatMapper(appliers);
GenericRow row = new GenericRow(1, 2, 3);

// When:
Iterable<GenericRow> iterable = flatMapper.apply(row);

// Then:
Iterator<GenericRow> iter = iterable.iterator();
assertThat(iter.next().getColumns(), is(Arrays.asList(1, 2, 3, 10)));
assertThat(iter.next().getColumns(), is(Arrays.asList(1, 2, 3, 10)));
assertThat(iter.next().getColumns(), is(Arrays.asList(1, 2, 3, 10)));
assertThat(iter.hasNext(), is(false));
}

@Test
public void shouldZipTwoFunctions() {
// Given:
TableFunctionApplier applier1 = createApplier(Arrays.asList(10, 10, 10));
TableFunctionApplier applier2 = createApplier(Arrays.asList(20, 20));
List<TableFunctionApplier> appliers = Arrays.asList(applier1, applier2);
KudtfFlatMapper flatMapper = new KudtfFlatMapper(appliers);
GenericRow row = new GenericRow(1, 2, 3);

// When:
Iterable<GenericRow> iterable = flatMapper.apply(row);

// Then:
Iterator<GenericRow> iter = iterable.iterator();
assertThat(iter.next().getColumns(), is(Arrays.asList(1, 2, 3, 10, 20)));
assertThat(iter.next().getColumns(), is(Arrays.asList(1, 2, 3, 10, 20)));
assertThat(iter.next().getColumns(), is(Arrays.asList(1, 2, 3, 10, null)));
assertThat(iter.hasNext(), is(false));
}

private <T> TableFunctionApplier createApplier(List<T> list) {
TableFunctionApplier applier = mock(TableFunctionApplier.class);
Mockito.doReturn(list).when(applier).apply(any());
return applier;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,6 @@
"message": "Unsupported argument type for EXPLODE Schema{MAP}"
}
},
{
"name": "shouldn't handle more than one table function",
"statements": [
"CREATE STREAM TEST (MY_ARR ARRAY<BIGINT>) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT EXPLODE(MY_ARR), EXPLODE(MY_ARR) VAL FROM TEST;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlException",
"message": "Only one table function per query currently is supported"
}
},
{
"name": "shouldn't be able to have table functions with aggregation",
"statements": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,24 @@
"expectedException": {
"type": "io.confluent.ksql.parser.exception.ParseFailedException"
}
},
{
"name": "multiple table functions",
"statements": [
"CREATE STREAM TEST (MY_ARR1 ARRAY<BIGINT>, MY_ARR2 ARRAY<BIGINT>, BAR BIGINT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT EXPLODE(MY_ARR1), EXPLODE(MY_ARR2) FROM TEST;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"ID": 0, "MY_ARR1": [1, 2], "MY_ARR2": [10, 11, 12]}},
{"topic": "test_topic", "key": 1, "value": {"ID": 1, "MY_ARR1": [3, 4], "MY_ARR2": [20]}}
],
"outputs": [
{"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1, "KSQL_COL_1": 10}},
{"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 2, "KSQL_COL_1": 11}},
{"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": null, "KSQL_COL_1": 12}},
{"topic": "OUTPUT", "key": "1", "value": {"KSQL_COL_0": 3, "KSQL_COL_1": 20}},
{"topic": "OUTPUT", "key": "1", "value": {"KSQL_COL_0": 4, "KSQL_COL_1": null}}
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ public static <K> StreamFlatMap<K> streamFlatMap(
final QueryContext.Stacker stacker,
final ExecutionStep<KStreamHolder<K>> source,
final LogicalSchema resultSchema,
final TableFunctionApplier functionHolder
final List<TableFunctionApplier> tableFunctionAppliers
) {
final QueryContext queryContext = stacker.getQueryContext();
return new StreamFlatMap<>(
new DefaultExecutionStepProperties(resultSchema, queryContext),
source,
functionHolder
tableFunctionAppliers
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public static <K> KStreamHolder<K> build(
final KStreamHolder<K> stream,
final StreamFlatMap<K> step) {
return stream.withStream(stream.getStream().flatMapValues(
new KudtfFlatMapper(step.getFunctionHolder())));
new KudtfFlatMapper(step.getTableFunctionAppliers())));
}

}

0 comments on commit 44be5a2

Please sign in to comment.