Skip to content

Commit

Permalink
fix: array access is now 1-indexed instead of 0-indexed (#4057)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: any queries that were using array index mechanism
should change to use 1-base indexing instead of 0-base.
  • Loading branch information
agavra authored Dec 5, 2019
1 parent 55d75f2 commit f09f797
Show file tree
Hide file tree
Showing 13 changed files with 139 additions and 38 deletions.
4 changes: 2 additions & 2 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -679,9 +679,9 @@ public void testSelectUDFs() {
final String queryString = String.format(
"SELECT ITEMID, "
+ "ORDERUNITS*10 AS Col1, "
+ "PRICEARRAY[0]+10 AS Col2, "
+ "PRICEARRAY[1]+10 AS Col2, "
+ "KEYVALUEMAP['key1']*KEYVALUEMAP['key2']+10 AS Col3, "
+ "PRICEARRAY[1]>1000 AS Col4 "
+ "PRICEARRAY[2]>1000 AS Col4 "
+ "FROM %s "
+ "WHERE ORDERUNITS > 20 AND ITEMID LIKE '%%_8';",
orderDataProvider.kstreamName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public void testIsNull() {
@Test
public void shouldHandleMultiDimensionalArray() {
// Given:
final String simpleQuery = "SELECT col14[0][0] FROM CODEGEN_TEST EMIT CHANGES;";
final String simpleQuery = "SELECT col14[1][1] FROM CODEGEN_TEST EMIT CHANGES;";
final Analysis analysis = analyzeQuery(simpleQuery, metaStore);

// When:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ public void testApplyUdfsToColumns() {
"CREATE STREAM \"%s\" AS SELECT "
+ "ITEMID, "
+ "ORDERUNITS*10, "
+ "PRICEARRAY[0]+10, "
+ "PRICEARRAY[1]+10, "
+ "KEYVALUEMAP['key1'] * KEYVALUEMAP['key2']+10, "
+ "PRICEARRAY[1] > 1000 "
+ "PRICEARRAY[2] > 1000 "
+ "FROM %s WHERE ORDERUNITS > 20 AND ITEMID LIKE '%%_8';",
resultStreamName,
testData.sourceStreamName
Expand All @@ -163,7 +163,7 @@ public void testShouldCastSelectedColumns() {
final String queryString = String.format(
"CREATE STREAM \"%s\" AS SELECT "
+ "CAST (ORDERUNITS AS INTEGER), "
+ "CAST( PRICEARRAY[1]>1000 AS STRING), "
+ "CAST( PRICEARRAY[2]>1000 AS STRING), "
+ "CAST (SUBSTRING(ITEMID, 6) AS DOUBLE), "
+ "CAST(ORDERUNITS AS VARCHAR) "
+ "FROM %s WHERE ORDERUNITS > 20 AND ITEMID LIKE '%%_8';",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multiset;
import io.confluent.ksql.execution.codegen.helpers.ArrayAccess;
import io.confluent.ksql.execution.codegen.helpers.SearchedCaseFunction;
import io.confluent.ksql.execution.expression.tree.ArithmeticBinaryExpression;
import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression;
Expand Down Expand Up @@ -91,6 +92,7 @@ public class SqlToJavaVisitor {

public static final List<String> JAVA_IMPORTS = ImmutableList.of(
"org.apache.kafka.connect.data.Struct",
"io.confluent.ksql.execution.codegen.helpers.ArrayAccess",
"io.confluent.ksql.execution.codegen.helpers.SearchedCaseFunction",
"io.confluent.ksql.execution.codegen.helpers.SearchedCaseFunction.LazyWhenClause",
"java.util.HashMap",
Expand Down Expand Up @@ -680,16 +682,14 @@ public Pair<String, SqlType> visitSubscriptExpression(SubscriptExpression node,
SqlArray array = (SqlArray) internalSchema;
String listName = process(node.getBase(), context).getLeft();
String suppliedIdx = process(node.getIndex(), context).getLeft();
String trueIdx = node.getIndex().toString().startsWith("-")
? String.format("((%s)%s).size()%s", internalSchemaJavaType, listName, suppliedIdx)
: suppliedIdx;

String code = format(
"((%s) ((%s)%s).get((int)%s))",
"((%s) (%s.arrayAccess((%s) %s, ((int) %s))))",
SchemaConverters.sqlToJavaConverter().toJavaType(array.getItemType()).getSimpleName(),
ArrayAccess.class.getSimpleName(),
internalSchemaJavaType,
listName,
trueIdx
suppliedIdx
);

return new Pair<>(code, array.getItemType());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.codegen.helpers;

import java.util.List;

/**
* Used by reflection in the SqlToJavaVisitor to resolve
* ArrayAccess operations in a 1-indexed fashion.
*/
public final class ArrayAccess {

private ArrayAccess() { }

/**
* @param list the input list
* @param index the index, base-1 or negative (n from the end)
* @return the {@code index}-th item in {@code list}
*/
public static <T> T arrayAccess(List<T> list, int index) {
// subtract by 1 because SQL standard uses 1-based indexing; since
// SQL standard does not support negative (end-based) indexing, we
// will use -1 to represent the last element
final int trueIndex = index >= 0 ? index - 1 : list.size() + index;
if (trueIndex >= list.size() || trueIndex < 0) {
return null;
}

return list.get(trueIndex);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -124,26 +124,7 @@ public void shouldProcessArrayExpressionCorrectly() {
// Then:
assertThat(
javaExpression,
equalTo("((Double) ((java.util.List)TEST1_COL4).get((int)0))")
);
}

@Test
public void shouldProcessArrayNegativeIndexExpressionCorrectly() {
// Given:
Expression expression = new SubscriptExpression(
ARRAYCOL,
ArithmeticUnaryExpression.negative(Optional.empty(), new IntegerLiteral(1))
);

// When:
String javaExpression = sqlToJavaVisitor.process(expression);

// Then:
assertThat(
javaExpression,
equalTo(
"((Double) ((java.util.List)TEST1_COL4).get((int)((java.util.List)TEST1_COL4).size()-1))")
equalTo("((Double) (ArrayAccess.arrayAccess((java.util.List) TEST1_COL4, ((int) 0))))")
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.codegen.helpers;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

import com.google.common.collect.ImmutableList;
import java.util.List;
import org.junit.Test;

public class ArrayAccessTest {

@Test
public void shouldBeOneIndexed() {
// Given:
List<Integer> list = ImmutableList.of(1, 2);

// When:
Integer access = ArrayAccess.arrayAccess(list, 1);

// Then:
assertThat(access, is(1));
}

@Test
public void shouldSupportNegativeIndex() {
// Given:
List<Integer> list = ImmutableList.of(1, 2);

// When:
Integer access = ArrayAccess.arrayAccess(list, -1);

// Then:
assertThat(access, is(2));
}

@Test
public void shouldReturnNullOnOutOfBoundsIndex() {
// Given:
List<Integer> list = ImmutableList.of(1, 2);

// When:
Integer access = ArrayAccess.arrayAccess(list, 3);

// Then:
assertThat(access, nullValue());
}

@Test
public void shouldReturnNullOnNegativeOutOfBoundsIndex() {
// Given:
List<Integer> list = ImmutableList.of(1, 2);

// When:
Integer access = ArrayAccess.arrayAccess(list, -3);

// Then:
assertThat(access, nullValue());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"name": "select the first element of an Array",
"statements": [
"CREATE STREAM test (colors ARRAY<STRING>) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT colors[0] as C FROM test;"
"CREATE STREAM OUTPUT AS SELECT colors[1] as C FROM test;"
],
"inputs": [
{"topic": "test_topic", "value": {"colors": ["Red", "Green"]}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@
"name": "test BETWEEN with array dereference",
"statements": [
"CREATE STREAM TEST (source array<int>) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT source[1] AS THING FROM TEST WHERE source[1] BETWEEN 2 AND 4;"
"CREATE STREAM OUTPUT AS SELECT source[2] AS THING FROM TEST WHERE source[2] BETWEEN 2 AND 4;"
],
"inputs": [
{"topic": "test_topic", "key": 1, "value": {"source": [10,1]}, "timestamp": 0},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"name": "extract JSON array field",
"statements": [
"CREATE STREAM TEST (array_field ARRAY<VARCHAR>) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT EXTRACTJSONFIELD(array_field[0], '$.nested') AS Col1, EXTRACTJSONFIELD(array_field[1], '$.nested') AS Col2 FROM TEST;"
"CREATE STREAM OUTPUT AS SELECT EXTRACTJSONFIELD(array_field[1], '$.nested') AS Col1, EXTRACTJSONFIELD(array_field[2], '$.nested') AS Col2 FROM TEST;"
],
"inputs": [
{"topic": "test_topic", "value": {"array_field": ["{\"nested\": \"nest0\"}","{\"nested\": \"nest1\"}"]}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@
"name": "complex struct select array and map items",
"statements": [
"CREATE STREAM foo (NESTED_STRUCT_FIELD STRUCT <SL1_STRUCT STRUCT< SL2_STRUCT STRUCT<SL3_STRUCT STRUCT< SL4_STRING VARCHAR, SL4_INT INT, SL4_DOUBLE_ARRAY ARRAY<DOUBLE>>, SL3_DOUBLE_ARRAY ARRAY<DOUBLE>, SL3_STRING VARCHAR>, SL2_STRUCT_MAP MAP<VARCHAR, STRUCT<SL2_3_STRING VARCHAR, SL2_3_DOUBLE DOUBLE>>, SL2_DOUBLE DOUBLE, SL2_BOOLEAN BOOLEAN >, SL1_STRING VARCHAR, SL1_STRUCT_ARRAY ARRAY<STRUCT<SL1_2_STRING VARCHAR, SL1_2_DOUBLE DOUBLE>> > , COL1 BIGINT, COL2 VARCHAR) WITH (kafka_topic='test_topic', value_format='json');",
"CREATE STREAM s3 AS SELECT NESTED_STRUCT_FIELD->SL1_STRUCT->SL2_STRUCT->SL3_STRUCT->SL4_DOUBLE_ARRAY[0] AS COL1, NESTED_STRUCT_FIELD->SL1_STRUCT->SL2_STRUCT_MAP['mapkey'] AS COL2, NESTED_STRUCT_FIELD->SL1_STRUCT->SL2_STRUCT->SL3_STRUCT->SL4_DOUBLE_ARRAY[10-5-5] AS COL3, NESTED_STRUCT_FIELD->SL1_STRUCT->SL2_STRUCT_MAP['mapkey']->SL2_3_DOUBLE AS COL4 from foo;"
"CREATE STREAM s3 AS SELECT NESTED_STRUCT_FIELD->SL1_STRUCT->SL2_STRUCT->SL3_STRUCT->SL4_DOUBLE_ARRAY[1] AS COL1, NESTED_STRUCT_FIELD->SL1_STRUCT->SL2_STRUCT_MAP['mapkey'] AS COL2, NESTED_STRUCT_FIELD->SL1_STRUCT->SL2_STRUCT->SL3_STRUCT->SL4_DOUBLE_ARRAY[10-5-4] AS COL3, NESTED_STRUCT_FIELD->SL1_STRUCT->SL2_STRUCT_MAP['mapkey']->SL2_3_DOUBLE AS COL4 from foo;"
],
"inputs": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
"name": "Json Multi Dimensional Array 2",
"statements": [
"CREATE STREAM array_array (ID BIGINT, ARRAY_COL ARRAY<ARRAY<VARCHAR>>) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM S3 as SELECT ID, ARRAY_COL[0][1] AS array_item FROM array_array;"
"CREATE STREAM S3 as SELECT ID, ARRAY_COL[1][2] AS array_item FROM array_array;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"id": 1, "array_col": [["item_00_1","item_01_1"],["item_10_1","item_11_1"]]}},
Expand All @@ -106,7 +106,7 @@
"name": "Json Multi Dimensional Array",
"statements": [
"CREATE STREAM array_array (ID BIGINT, ARRAY_COL ARRAY<ARRAY<ARRAY<VARCHAR>>>) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM S3 as SELECT ID, ARRAY_COL[0][0][0] AS array_item FROM array_array;"
"CREATE STREAM S3 as SELECT ID, ARRAY_COL[1][1][1] AS array_item FROM array_array;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"id": 1, "array_col": [[["item_000_1","item_001_1"],["item_010_1","item_011_1"]],[["item_100_1","item_101_1"],["item_110_1","item_111_1"]]]}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
"name": "split a message by commas and display pos 0 and 2 of the returned array",
"statements": [
"CREATE STREAM TEST (message VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT SPLIT(message, ',')[0] as s1, SPLIT(message, ',')[2] as s2 FROM TEST;"
"CREATE STREAM OUTPUT AS SELECT SPLIT(message, ',')[1] as s1, SPLIT(message, ',')[3] as s2 FROM TEST;"
],
"inputs": [
{"topic": "test_topic", "key": 1, "value": {"message": "a,b,c"}, "timestamp": 0},
Expand Down

0 comments on commit f09f797

Please sign in to comment.