Skip to content

Commit

Permalink
feat: Added more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Fox committed Oct 28, 2019
1 parent b3637ec commit 4b886b1
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@

package io.confluent.ksql.function.udtf.array;

import io.confluent.ksql.function.udf.UdfSchemaProvider;
import io.confluent.ksql.function.udtf.Udtf;
import io.confluent.ksql.function.udtf.UdtfDescription;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.List;

@UdtfDescription(name = "explode", author = KsqlConstants.CONFLUENT_AUTHOR,
description = "explodes an array")
description =
"Explodes an array. This function outputs one value for each element of the array.")
public class Explode {

@Udtf
Expand Down Expand Up @@ -55,7 +52,7 @@ public List<String> explodeString(final List<String> input) {
return explode(input);
}

@Udtf(schemaProvider = "provideDecimalSchema")
@Udtf
public List<BigDecimal> explodeBigDecimal(final List<BigDecimal> input) {
return explode(input);
}
Expand All @@ -64,15 +61,4 @@ private <T> List<T> explode(final List<T> list) {
return list == null ? Collections.emptyList() : list;
}

@UdfSchemaProvider
public SqlType provideDecimalSchema(final List<SqlType> params) {
final SqlType s0 = params.get(0);
if (s0.baseType() != SqlBaseType.DECIMAL) {
throw new KsqlException(
"The schema provider method for explode expects a BigDecimal parameter"
+ "type as a parameter.");
}
return s0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,22 @@
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Table functions cannot be used with aggregations."
}
},
{
"name": "explode different types",
"statements": [
"CREATE STREAM TEST (F0 ARRAY<INT>, F1 ARRAY<BIGINT>, F2 ARRAY<DOUBLE>, F3 ARRAY<BOOLEAN>, F4 ARRAY<STRING>, F5 ARRAY<DECIMAL(20, 10)>) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT EXPLODE(F0), EXPLODE(F1), EXPLODE(F2), EXPLODE(F3), EXPLODE(F4), EXPLODE(F5) FROM TEST;"
],
"inputs": [
{
"topic": "test_topic", "key": 0, "value": {"F0": [1, 2], "F1": [2, 3], "F2": [3.1, 4.1], "F3": [true, false], "F4": ["foo", "bar"], "F5": [123.456, 456.123]}
}
],
"outputs": [
{"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1, "KSQL_COL_1": 2, "KSQL_COL_2": 3.1, "KSQL_COL_3": true, "KSQL_COL_4": "foo", "KSQL_COL_5": 123.456}},
{"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 2, "KSQL_COL_1": 3, "KSQL_COL_2": 4.1, "KSQL_COL_3": false, "KSQL_COL_4": "bar", "KSQL_COL_5": 456.123}}
]
}
]
}

0 comments on commit 4b886b1

Please sign in to comment.