Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle sort order with nested columns on iceberg table #22099

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattheusv can we have test coverage for this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krvikash we already have a test case BaseIcebergConnectorTest.testSortingOnNestedField:1413 that expects an exception (it's failing right now), would make test to just change it to expect a success instead of an error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, yes.
This should be the purpose of this PR right? Allowing to define sort on nested fields.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the test. Can you folks please take a look? @findinpath @krvikash

Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderBuilder;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.TypeUtil;

import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
import static io.trino.plugin.iceberg.PartitionFields.fromIdentifierToColumn;
import static io.trino.plugin.iceberg.PartitionFields.quotedName;
Expand Down Expand Up @@ -60,11 +59,10 @@ public static SortOrder parseSortFields(Schema schema, List<String> fields)
throw new TrinoException(INVALID_TABLE_PROPERTY, "Invalid " + SORTED_BY_PROPERTY + " definition", e);
}

Set<Integer> baseColumnFieldIds = schema.columns().stream()
.map(Types.NestedField::fieldId)
.collect(toImmutableSet());
Map<Integer, String> baseColumnFieldIds = TypeUtil.indexNameById(schema.asStruct());

for (SortField field : sortOrder.fields()) {
if (!baseColumnFieldIds.contains(field.sourceId())) {
if (!baseColumnFieldIds.containsKey(field.sourceId())) {
throw new TrinoException(COLUMN_NOT_FOUND, "Column not found: " + schema.findColumnName(field.sourceId()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,28 @@ public void testSortedNationTable()
}
}

@Test
public void testSortedTableUsingNestedField()
{
Session withSmallRowGroups = withSmallRowGroups(getSession());

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_sorted_table_using_nested_fields",
" (id INT, row_t ROW(name VARCHAR)) WITH (format = '" + format.name() + "', sorted_by = ARRAY[ '\"row_t.name\"' ])")) {
assertUpdate(
withSmallRowGroups,
"INSERT INTO " + table.getName() + "(id, row_t)" +
"SELECT id, ROW(CONCAT('v', CAST(id as VARCHAR))) as row_t FROM UNNEST(sequence(1, 500)) AS t(id)",
500);

for (Object filePath : computeActual("SELECT file_path from \"" + table.getName() + "$files\"").getOnlyColumnAsSet()) {
assertThat(isFileSorted(Location.of((String) filePath), "row_t.name")).isTrue();
}
assertQuery("SELECT * FROM " + table.getName(), "SELECT * FROM " + table.getName() + " ORDER BY id");
}
}

@Test
public void testFileSortingWithLargerTable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1602,9 +1602,8 @@ public void testSortingOnNestedField()
assertThat(query("CREATE TABLE " + tableName + " (nationkey BIGINT, row_t ROW(name VARCHAR, regionkey BIGINT, comment VARCHAR)) " +
"WITH (sorted_by = ARRAY['\"row_t\".\"comment\"'])"))
.failure().hasMessageContaining("Unable to parse sort field: [\"row_t\".\"comment\"]");
assertThat(query("CREATE TABLE " + tableName + " (nationkey BIGINT, row_t ROW(name VARCHAR, regionkey BIGINT, comment VARCHAR)) " +
"WITH (sorted_by = ARRAY['\"row_t.comment\"'])"))
.failure().hasMessageContaining("Column not found: row_t.comment");
assertUpdate("CREATE TABLE " + tableName + " (nationkey BIGINT, row_t ROW(name VARCHAR, regionkey BIGINT, comment VARCHAR)) " +
"WITH (sorted_by = ARRAY['\"row_t.comment\"'])");
Comment on lines +1605 to +1606
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add test case where we verify the table files is sorted when using nested field. something like testSortedNationTable.

Copy link
Author

@mattheusv mattheusv May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krvikash I'm trying to add a test case for this and I having some problems/questions.

The test case that I've created is the following:

try (TestTable table = new TestTable(
        getQueryRunner()::execute,
        "test_sorted_table_using_nested_fields",
        " (id INT, row_t ROW(name VARCHAR)) WITH (format = '" + format.name() + "', sorted_by = ARRAY[ '\"row_t.name\"' ])")) {
assertUpdate(
        withSmallRowGroups,
        "INSERT INTO " + table.getName() + "(id, row_t)" +
        "SELECT id, ROW(CONCAT('v', CAST(id as VARCHAR))) as row_t FROM UNNEST(sequence(1, 500)) AS t(id)",
        500);

for (Object filePath : computeActual("SELECT file_path from \"" + table.getName() + "$files\"").getOnlyColumnAsSet()) {
    assertThat(isFileSorted(Location.of((String) filePath), "name")).isTrue();
}
assertQuery("SELECT * FROM " + table.getName(), "SELECT * FROM " + table.getName() + " ORDER BY id");

The method isFileSorted fails because it don' support nested fields. The IcebergTestUtils.checkParquetFileSorting method fails here because the column.getPath().iterator() returns two elements when the column object is nested, in this case it returns <row_t, name> which raise the following exception:

java.lang.IllegalArgumentException: expected one element but was: <row_t, name>
        at com.google.common.collect.Iterators.getOnlyElement(Iterators.java:322)
        at io.trino.plugin.iceberg.IcebergTestUtils.lambda$3(IcebergTestUtils.java:144)
        at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:193)
        at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1709)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:556)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:546)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:702)
        at io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting(IcebergTestUtils.java:145)
        at io.trino.plugin.iceberg.catalog.jdbc.TestIcebergJdbcCatalogConnectorSmokeTest.isFileSorted(TestIcebergJdbcCatalogConnectorSmokeTest.java:188)
        at io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest.testSortedTableUsingNestedField(BaseIcebergConnectorSmokeTest.java:551)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:507)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1491)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:2073)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2035)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:187)

The assert assertQuery("SELECT * FROM " + table.getName(), "SELECT * FROM " + table.getName() + " ORDER BY id"); also fails because the expected sql returns returns an error that the table don't exists

java.lang.AssertionError: Execution of 'expected' query failed: SELECT * FROM test_sorted_table_using_nested_fields15ugt0uecl ORDER BY id
        at io.trino.testing.QueryAssertions.assertDistributedQuery(QueryAssertions.java:322)
        at io.trino.testing.QueryAssertions.assertQuery(QueryAssertions.java:187)
        at io.trino.testing.QueryAssertions.assertQuery(QueryAssertions.java:160)
        at io.trino.testing.AbstractTestQueryFramework.assertQuery(AbstractTestQueryFramework.java:350)
        at io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest.testSortedTableUsingNestedField(BaseIcebergConnectorSmokeTest.java:553)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:507)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1491)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:2073)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2035)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:187)
Caused by: org.jdbi.v3.core.statement.UnableToCreateStatementException: org.h2.jdbc.JdbcSQLSyntaxErrorException: Table "TEST_SORTED_TABLE_USING_NESTED_FIELDS15UGT0UECL" not found; SQL statement:

Seems that the expected and the actual query is executed in different query runners; The expected is executed on h2QueryRunner and the actual is executed on queryRunner from AbstractTestQueryFramework class.

My question is how can I test this properly? Should I update the checkParquetFileSorting method to support nested fields or there is a better way to write this test?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi folks, could someone help me on this? @findepi @krvikash

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterators.getOnlyElement;
import static com.google.common.collect.MoreCollectors.onlyElement;
import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.createPerTransactionCache;
import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
Expand Down Expand Up @@ -141,7 +140,7 @@ public static boolean checkParquetFileSorting(TrinoInputFile inputFile, String s
verify(parquetMetadata.getBlocks().size() > 1, "Test must produce at least two row groups");
for (BlockMetadata blockMetaData : parquetMetadata.getBlocks()) {
ColumnChunkMetadata columnMetadata = blockMetaData.columns().stream()
.filter(column -> getOnlyElement(column.getPath().iterator()).equalsIgnoreCase(sortColumnName))
.filter(column -> column.getPath().toDotString().equalsIgnoreCase(sortColumnName))
.collect(onlyElement());
if (previousMax != null) {
if (previousMax.compareTo(columnMetadata.getStatistics().genericGetMin()) > 0) {
Expand Down
Loading