diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java index 1930dbf73dcc..3748548907c6 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java @@ -679,9 +679,9 @@ public void testSelectUDFs() { final String queryString = String.format( "SELECT ITEMID, " + "ORDERUNITS*10 AS Col1, " - + "PRICEARRAY[0]+10 AS Col2, " + + "PRICEARRAY[1]+10 AS Col2, " + "KEYVALUEMAP['key1']*KEYVALUEMAP['key2']+10 AS Col3, " - + "PRICEARRAY[1]>1000 AS Col4 " + + "PRICEARRAY[2]>1000 AS Col4 " + "FROM %s " + "WHERE ORDERUNITS > 20 AND ITEMID LIKE '%%_8';", orderDataProvider.kstreamName() diff --git a/ksql-engine/src/test/java/io/confluent/ksql/codegen/CodeGenRunnerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/codegen/CodeGenRunnerTest.java index e081a3bc4fe4..3ce7f8796508 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/codegen/CodeGenRunnerTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/codegen/CodeGenRunnerTest.java @@ -228,7 +228,7 @@ public void testIsNull() { @Test public void shouldHandleMultiDimensionalArray() { // Given: - final String simpleQuery = "SELECT col14[0][0] FROM CODEGEN_TEST EMIT CHANGES;"; + final String simpleQuery = "SELECT col14[1][1] FROM CODEGEN_TEST EMIT CHANGES;"; final Analysis analysis = analyzeQuery(simpleQuery, metaStore); // When: diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/UdfIntTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/UdfIntTest.java index 5fd4c5d845cb..984d42265177 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/UdfIntTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/UdfIntTest.java @@ -137,9 +137,9 @@ public void testApplyUdfsToColumns() { "CREATE STREAM \"%s\" AS SELECT " + "ITEMID, " + "ORDERUNITS*10, " - + "PRICEARRAY[0]+10, " + + "PRICEARRAY[1]+10, " + "KEYVALUEMAP['key1'] * KEYVALUEMAP['key2']+10, " - + "PRICEARRAY[1] > 1000 " + + "PRICEARRAY[2] > 1000 " + "FROM %s WHERE ORDERUNITS > 20 AND ITEMID LIKE '%%_8';", resultStreamName, testData.sourceStreamName @@ -163,7 +163,7 @@ public void testShouldCastSelectedColumns() { final String queryString = String.format( "CREATE STREAM \"%s\" AS SELECT " + "CAST (ORDERUNITS AS INTEGER), " - + "CAST( PRICEARRAY[1]>1000 AS STRING), " + + "CAST( PRICEARRAY[2]>1000 AS STRING), " + "CAST (SUBSTRING(ITEMID, 6) AS DOUBLE), " + "CAST(ORDERUNITS AS VARCHAR) " + "FROM %s WHERE ORDERUNITS > 20 AND ITEMID LIKE '%%_8';", diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java index 17197078ab5e..c233ef1846a5 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitor.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multiset; +import io.confluent.ksql.execution.codegen.helpers.ArrayAccess; import io.confluent.ksql.execution.codegen.helpers.SearchedCaseFunction; import io.confluent.ksql.execution.expression.tree.ArithmeticBinaryExpression; import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression; @@ -91,6 +92,7 @@ public class SqlToJavaVisitor { public static final List JAVA_IMPORTS = ImmutableList.of( "org.apache.kafka.connect.data.Struct", + "io.confluent.ksql.execution.codegen.helpers.ArrayAccess", "io.confluent.ksql.execution.codegen.helpers.SearchedCaseFunction", "io.confluent.ksql.execution.codegen.helpers.SearchedCaseFunction.LazyWhenClause", "java.util.HashMap", @@ -680,16 +682,14 @@ public Pair visitSubscriptExpression(SubscriptExpression node, SqlArray array = (SqlArray) internalSchema; String listName = process(node.getBase(), context).getLeft(); String suppliedIdx = process(node.getIndex(), context).getLeft(); - String trueIdx = node.getIndex().toString().startsWith("-") - ? String.format("((%s)%s).size()%s", internalSchemaJavaType, listName, suppliedIdx) - : suppliedIdx; String code = format( - "((%s) ((%s)%s).get((int)%s))", + "((%s) (%s.arrayAccess((%s) %s, ((int) %s))))", SchemaConverters.sqlToJavaConverter().toJavaType(array.getItemType()).getSimpleName(), + ArrayAccess.class.getSimpleName(), internalSchemaJavaType, listName, - trueIdx + suppliedIdx ); return new Pair<>(code, array.getItemType()); diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/helpers/ArrayAccess.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/helpers/ArrayAccess.java new file mode 100644 index 000000000000..d667367d9a18 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/codegen/helpers/ArrayAccess.java @@ -0,0 +1,45 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.codegen.helpers; + +import java.util.List; + +/** + * Used by reflection in the SqlToJavaVisitor to resolve + * ArrayAccess operations in a 1-indexed fashion. + */ +public final class ArrayAccess { + + private ArrayAccess() { } + + /** + * @param list the input list + * @param index the index, base-1 or negative (n from the end) + * @return the {@code index}-th item in {@code list} + */ + public static T arrayAccess(List list, int index) { + // subtract by 1 because SQL standard uses 1-based indexing; since + // SQL standard does not support negative (end-based) indexing, we + // will use -1 to represent the last element + final int trueIndex = index >= 0 ? index - 1 : list.size() + index; + if (trueIndex >= list.size() || trueIndex < 0) { + return null; + } + + return list.get(trueIndex); + } + +} diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java index d824ae899b51..fc94f7702ffc 100644 --- a/ksql-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/codegen/SqlToJavaVisitorTest.java @@ -124,26 +124,7 @@ public void shouldProcessArrayExpressionCorrectly() { // Then: assertThat( javaExpression, - equalTo("((Double) ((java.util.List)TEST1_COL4).get((int)0))") - ); - } - - @Test - public void shouldProcessArrayNegativeIndexExpressionCorrectly() { - // Given: - Expression expression = new SubscriptExpression( - ARRAYCOL, - ArithmeticUnaryExpression.negative(Optional.empty(), new IntegerLiteral(1)) - ); - - // When: - String javaExpression = sqlToJavaVisitor.process(expression); - - // Then: - assertThat( - javaExpression, - equalTo( - "((Double) ((java.util.List)TEST1_COL4).get((int)((java.util.List)TEST1_COL4).size()-1))") + equalTo("((Double) (ArrayAccess.arrayAccess((java.util.List) TEST1_COL4, ((int) 0))))") ); } diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/codegen/helpers/ArrayAccessTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/codegen/helpers/ArrayAccessTest.java new file mode 100644 index 000000000000..aee9addbd25a --- /dev/null +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/codegen/helpers/ArrayAccessTest.java @@ -0,0 +1,75 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.codegen.helpers; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.junit.Test; + +public class ArrayAccessTest { + + @Test + public void shouldBeOneIndexed() { + // Given: + List list = ImmutableList.of(1, 2); + + // When: + Integer access = ArrayAccess.arrayAccess(list, 1); + + // Then: + assertThat(access, is(1)); + } + + @Test + public void shouldSupportNegativeIndex() { + // Given: + List list = ImmutableList.of(1, 2); + + // When: + Integer access = ArrayAccess.arrayAccess(list, -1); + + // Then: + assertThat(access, is(2)); + } + + @Test + public void shouldReturnNullOnOutOfBoundsIndex() { + // Given: + List list = ImmutableList.of(1, 2); + + // When: + Integer access = ArrayAccess.arrayAccess(list, 3); + + // Then: + assertThat(access, nullValue()); + } + + @Test + public void shouldReturnNullOnNegativeOutOfBoundsIndex() { + // Given: + List list = ImmutableList.of(1, 2); + + // When: + Integer access = ArrayAccess.arrayAccess(list, -3); + + // Then: + assertThat(access, nullValue()); + } + +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/arrayindex.json b/ksql-functional-tests/src/test/resources/query-validation-tests/arrayindex.json index effebbac04ce..68bd8197ef0a 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/arrayindex.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/arrayindex.json @@ -9,7 +9,7 @@ "name": "select the first element of an Array", "statements": [ "CREATE STREAM test (colors ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT colors[0] as C FROM test;" + "CREATE STREAM OUTPUT AS SELECT colors[1] as C FROM test;" ], "inputs": [ {"topic": "test_topic", "value": {"colors": ["Red", "Green"]}}, diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/between.json b/ksql-functional-tests/src/test/resources/query-validation-tests/between.json index 7fcb01df55c1..09d244785570 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/between.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/between.json @@ -139,7 +139,7 @@ "name": "test BETWEEN with array dereference", "statements": [ "CREATE STREAM TEST (source array) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT source[1] AS THING FROM TEST WHERE source[1] BETWEEN 2 AND 4;" + "CREATE STREAM OUTPUT AS SELECT source[2] AS THING FROM TEST WHERE source[2] BETWEEN 2 AND 4;" ], "inputs": [ {"topic": "test_topic", "key": 1, "value": {"source": [10,1]}, "timestamp": 0}, diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/extract-json-field.json b/ksql-functional-tests/src/test/resources/query-validation-tests/extract-json-field.json index 133a6fe5e113..f363dcb1fddd 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/extract-json-field.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/extract-json-field.json @@ -22,7 +22,7 @@ "name": "extract JSON array field", "statements": [ "CREATE STREAM TEST (array_field ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT EXTRACTJSONFIELD(array_field[0], '$.nested') AS Col1, EXTRACTJSONFIELD(array_field[1], '$.nested') AS Col2 FROM TEST;" + "CREATE STREAM OUTPUT AS SELECT EXTRACTJSONFIELD(array_field[1], '$.nested') AS Col1, EXTRACTJSONFIELD(array_field[2], '$.nested') AS Col2 FROM TEST;" ], "inputs": [ {"topic": "test_topic", "value": {"array_field": ["{\"nested\": \"nest0\"}","{\"nested\": \"nest1\"}"]}}, diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/more-complex-struct.json b/ksql-functional-tests/src/test/resources/query-validation-tests/more-complex-struct.json index cf67fd934b3e..9c54749bb4fa 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/more-complex-struct.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/more-complex-struct.json @@ -800,7 +800,7 @@ "name": "complex struct select array and map items", "statements": [ "CREATE STREAM foo (NESTED_STRUCT_FIELD STRUCT >, SL3_DOUBLE_ARRAY ARRAY, SL3_STRING VARCHAR>, SL2_STRUCT_MAP MAP>, SL2_DOUBLE DOUBLE, SL2_BOOLEAN BOOLEAN >, SL1_STRING VARCHAR, SL1_STRUCT_ARRAY ARRAY> > , COL1 BIGINT, COL2 VARCHAR) WITH (kafka_topic='test_topic', value_format='json');", - "CREATE STREAM s3 AS SELECT NESTED_STRUCT_FIELD->SL1_STRUCT->SL2_STRUCT->SL3_STRUCT->SL4_DOUBLE_ARRAY[0] AS COL1, NESTED_STRUCT_FIELD->SL1_STRUCT->SL2_STRUCT_MAP['mapkey'] AS COL2, NESTED_STRUCT_FIELD->SL1_STRUCT->SL2_STRUCT->SL3_STRUCT->SL4_DOUBLE_ARRAY[10-5-5] AS COL3, NESTED_STRUCT_FIELD->SL1_STRUCT->SL2_STRUCT_MAP['mapkey']->SL2_3_DOUBLE AS COL4 from foo;" + "CREATE STREAM s3 AS SELECT NESTED_STRUCT_FIELD->SL1_STRUCT->SL2_STRUCT->SL3_STRUCT->SL4_DOUBLE_ARRAY[1] AS COL1, NESTED_STRUCT_FIELD->SL1_STRUCT->SL2_STRUCT_MAP['mapkey'] AS COL2, NESTED_STRUCT_FIELD->SL1_STRUCT->SL2_STRUCT->SL3_STRUCT->SL4_DOUBLE_ARRAY[10-5-4] AS COL3, NESTED_STRUCT_FIELD->SL1_STRUCT->SL2_STRUCT_MAP['mapkey']->SL2_3_DOUBLE AS COL4 from foo;" ], "inputs": [ { diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/project-filter.json b/ksql-functional-tests/src/test/resources/query-validation-tests/project-filter.json index fa3c1a93f58a..34054cf073ac 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/project-filter.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/project-filter.json @@ -85,7 +85,7 @@ "name": "Json Multi Dimensional Array 2", "statements": [ "CREATE STREAM array_array (ID BIGINT, ARRAY_COL ARRAY>) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM S3 as SELECT ID, ARRAY_COL[0][1] AS array_item FROM array_array;" + "CREATE STREAM S3 as SELECT ID, ARRAY_COL[1][2] AS array_item FROM array_array;" ], "inputs": [ {"topic": "test_topic", "key": 0, "value": {"id": 1, "array_col": [["item_00_1","item_01_1"],["item_10_1","item_11_1"]]}}, @@ -106,7 +106,7 @@ "name": "Json Multi Dimensional Array", "statements": [ "CREATE STREAM array_array (ID BIGINT, ARRAY_COL ARRAY>>) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM S3 as SELECT ID, ARRAY_COL[0][0][0] AS array_item FROM array_array;" + "CREATE STREAM S3 as SELECT ID, ARRAY_COL[1][1][1] AS array_item FROM array_array;" ], "inputs": [ {"topic": "test_topic", "key": 0, "value": {"id": 1, "array_col": [[["item_000_1","item_001_1"],["item_010_1","item_011_1"]],[["item_100_1","item_101_1"],["item_110_1","item_111_1"]]]}}, diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/split.json b/ksql-functional-tests/src/test/resources/query-validation-tests/split.json index 4c65d8366568..bfe46c9762b3 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/split.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/split.json @@ -70,7 +70,7 @@ "name": "split a message by commas and display pos 0 and 2 of the returned array", "statements": [ "CREATE STREAM TEST (message VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT SPLIT(message, ',')[0] as s1, SPLIT(message, ',')[2] as s2 FROM TEST;" + "CREATE STREAM OUTPUT AS SELECT SPLIT(message, ',')[1] as s1, SPLIT(message, ',')[3] as s2 FROM TEST;" ], "inputs": [ {"topic": "test_topic", "key": 1, "value": {"message": "a,b,c"}, "timestamp": 0},