Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle sort order with nested columns on iceberg table #22099

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

mattheusv
Copy link

@mattheusv mattheusv commented May 23, 2024

Description

Previously the parseSortFields from SortFieldUtils was only collecting the field id from the top level columns don't considering nested fields of nested types, so in case a query with a sorted_by property use a nested field of a nested type trino would throw an expcetion that the column don't exists, because the field id of the nested column don't exists on baseColumnFieldIds set.

This PR fix this issue by recursively collecting the field ids from table columns which the column type is a nested type.

Fix: #19620

Additional context and related issues

This is my first time contributing to trino code base, so I'm not 100% sure that this is correct, so please let me know if anything is wrong.

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text:

# Section
* Correctly handle sort order for nested columns on iceberg table ({issue}`19620`)

Copy link

cla-bot bot commented May 23, 2024

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

@github-actions github-actions bot added the iceberg Iceberg connector label May 23, 2024
Copy link

cla-bot bot commented May 23, 2024

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

@mattheusv
Copy link
Author

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

Just for notice; I've already sent the cla. Maybe there is some time to sync.

@mattheusv mattheusv changed the title Handle nested types when parsing sort fields Handle sort order for nested columns on iceberg table May 23, 2024
Copy link

cla-bot bot commented May 24, 2024

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

@mattheusv mattheusv changed the title Handle sort order for nested columns on iceberg table Handle sort order with nested columns on iceberg table May 24, 2024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattheusv can we have test coverage for this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krvikash we already have a test case BaseIcebergConnectorTest.testSortingOnNestedField:1413 that expects an exception (it's failing right now), would make test to just change it to expect a success instead of an error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, yes.
This should be the purpose of this PR right? Allowing to define sort on nested fields.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the test. Can you folks please take a look? @findinpath @krvikash

Copy link

cla-bot bot commented May 26, 2024

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

Copy link

cla-bot bot commented May 26, 2024

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

Set<Integer> baseColumnFieldIds = schema.columns().stream()
.map(Types.NestedField::fieldId)
.collect(toImmutableSet());
Set<Integer> baseColumnFieldIds = collectColumnFieldIds(schema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we could use Iceberg library to get the field ids of all the fields org.apache.iceberg.types.TypeUtil.indexNameById(schema.asStruct())?

Comment on lines +1408 to +1606
assertUpdate("CREATE TABLE " + tableName + " (nationkey BIGINT, row_t ROW(name VARCHAR, regionkey BIGINT, comment VARCHAR)) " +
"WITH (sorted_by = ARRAY['\"row_t.comment\"'])");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add test case where we verify the table files is sorted when using nested field. something like testSortedNationTable.

Copy link
Author

@mattheusv mattheusv May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krvikash I'm trying to add a test case for this and I having some problems/questions.

The test case that I've created is the following:

try (TestTable table = new TestTable(
        getQueryRunner()::execute,
        "test_sorted_table_using_nested_fields",
        " (id INT, row_t ROW(name VARCHAR)) WITH (format = '" + format.name() + "', sorted_by = ARRAY[ '\"row_t.name\"' ])")) {
assertUpdate(
        withSmallRowGroups,
        "INSERT INTO " + table.getName() + "(id, row_t)" +
        "SELECT id, ROW(CONCAT('v', CAST(id as VARCHAR))) as row_t FROM UNNEST(sequence(1, 500)) AS t(id)",
        500);

for (Object filePath : computeActual("SELECT file_path from \"" + table.getName() + "$files\"").getOnlyColumnAsSet()) {
    assertThat(isFileSorted(Location.of((String) filePath), "name")).isTrue();
}
assertQuery("SELECT * FROM " + table.getName(), "SELECT * FROM " + table.getName() + " ORDER BY id");

The method isFileSorted fails because it don' support nested fields. The IcebergTestUtils.checkParquetFileSorting method fails here because the column.getPath().iterator() returns two elements when the column object is nested, in this case it returns <row_t, name> which raise the following exception:

java.lang.IllegalArgumentException: expected one element but was: <row_t, name>
        at com.google.common.collect.Iterators.getOnlyElement(Iterators.java:322)
        at io.trino.plugin.iceberg.IcebergTestUtils.lambda$3(IcebergTestUtils.java:144)
        at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:193)
        at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1709)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:556)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:546)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:702)
        at io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting(IcebergTestUtils.java:145)
        at io.trino.plugin.iceberg.catalog.jdbc.TestIcebergJdbcCatalogConnectorSmokeTest.isFileSorted(TestIcebergJdbcCatalogConnectorSmokeTest.java:188)
        at io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest.testSortedTableUsingNestedField(BaseIcebergConnectorSmokeTest.java:551)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:507)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1491)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:2073)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2035)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:187)

The assert assertQuery("SELECT * FROM " + table.getName(), "SELECT * FROM " + table.getName() + " ORDER BY id"); also fails because the expected sql returns returns an error that the table don't exists

java.lang.AssertionError: Execution of 'expected' query failed: SELECT * FROM test_sorted_table_using_nested_fields15ugt0uecl ORDER BY id
        at io.trino.testing.QueryAssertions.assertDistributedQuery(QueryAssertions.java:322)
        at io.trino.testing.QueryAssertions.assertQuery(QueryAssertions.java:187)
        at io.trino.testing.QueryAssertions.assertQuery(QueryAssertions.java:160)
        at io.trino.testing.AbstractTestQueryFramework.assertQuery(AbstractTestQueryFramework.java:350)
        at io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest.testSortedTableUsingNestedField(BaseIcebergConnectorSmokeTest.java:553)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:507)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1491)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:2073)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2035)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:187)
Caused by: org.jdbi.v3.core.statement.UnableToCreateStatementException: org.h2.jdbc.JdbcSQLSyntaxErrorException: Table "TEST_SORTED_TABLE_USING_NESTED_FIELDS15UGT0UECL" not found; SQL statement:

Seems that the expected and the actual query is executed in different query runners; The expected is executed on h2QueryRunner and the actual is executed on queryRunner from AbstractTestQueryFramework class.

My question is how can I test this properly? Should I update the checkParquetFileSorting method to support nested fields or there is a better way to write this test?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi folks, could someone help me on this? @findepi @krvikash

Comment on lines 78 to 94
Set<Integer> ids = new HashSet<Integer>();
schema.columns().forEach(column -> addNestedField(ids, column));
return ids;
}

private static void addNestedField(Set<Integer> baseColumnFieldIds, NestedField field)
{
baseColumnFieldIds.add(field.fieldId());

Type type = field.type();
if (type.isNestedType()) {
NestedType nestedType = type.asNestedType();
for (NestedField nestedField : nestedType.fields()) {
addNestedField(baseColumnFieldIds, nestedField);
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can do this with com.google.common.graph.Traverser, without need for helper method.

@mattheusv mattheusv requested a review from krvikash May 29, 2024 18:05
Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Jun 24, 2024
@mosabua
Copy link
Member

mosabua commented Jun 24, 2024

@cla-bot check

Copy link

cla-bot bot commented Jun 24, 2024

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

Copy link

cla-bot bot commented Jun 24, 2024

The cla-bot has been summoned, and re-checked this pull request!

Copy link

cla-bot bot commented Jun 24, 2024

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

Copy link

cla-bot bot commented Jun 24, 2024

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

Copy link

cla-bot bot commented Jun 24, 2024

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

Copy link

cla-bot bot commented Jun 24, 2024

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

Copy link

cla-bot bot commented Jun 24, 2024

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

@github-actions github-actions bot removed the stale label Jun 25, 2024
@cla-bot cla-bot bot added the cla-signed label Jun 26, 2024
@mattheusv mattheusv force-pushed the fix-sorting-nested-fields branch 2 times, most recently from 5892ee6 to bf763d2 Compare June 28, 2024 20:20
@bitsondatadev
Copy link
Member

@mattheusv This is in my list still, I am just catching up on things after vacation.

Don't hesitate with any questions.

@mattheusv
Copy link
Author

mattheusv commented Jul 1, 2024

@mattheusv This is in my list still, I am just catching up on things after vacation.

Don't hesitate with any questions.

Thanks @bitsondatadev

I'll just give some context of some changes:

As I mentioned here, the IcebergTestUtils#checkParquetFileSorting method was not handling nested columns, so I've changed the filter to find the columns to consider nested columns, so we can call the isFileSorted method like isFileSorted(Location.of((String) filePath), "row_t.name"

The problem now seems that the file is not being sorted, because the isFileSorted returns false which I don't quite understand why.

Another problem that I've notice is that if I execute ALTER TABLE iceberg.test.t execute optmize on a table sorted using nested columns it raise an exception:

trino> CREATE TABLE IF NOT EXISTS iceberg.test.t2 (
    ->     id INT,
    ->     row_t ROW(name VARCHAR)
    -> ) WITH (
    ->   format = 'PARQUET',
    ->   sorted_by = ARRAY ['"row_t.name"']
    -> );
CREATE TABLE

trino> insert into iceberg.test.t2(id, row_t) SELECT id, ROW(CONCAT('v', cast(id as varchar))) as row_t FROM UNNEST(sequence(1, 30)) AS t(id);
INSERT: 30 rows

trino> alter table iceberg.test.t2 execute optimize;

Query 20240701_125829_00002_62u7s, FAILED, 1 node
Splits: 12 total, 2 done (16,67%)
1,04 [21 rows, 856B] [20 rows/s, 826B/s]

Query 20240701_125829_00002_62u7s failed: Index -1 out of bounds for length 2
Server logs
2024-07-01T09:59:07.561-0300    DEBUG   SplitRunner-0   io.trino.execution.executor.dedicated.SplitProcessor    Index -1 out of bounds for length 2
java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 2
        at com.google.common.collect.RegularImmutableList.get(RegularImmutableList.java:79)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:212)
        at java.base/java.util.Collections$2.tryAdvance(Collections.java:5074)
        at java.base/java.util.Collections$2.forEachRemaining(Collections.java:5082)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:556)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:546)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:702)
        at io.trino.operator.PagesIndex.createPagesIndexComparator(PagesIndex.java:453)
        at io.trino.operator.PagesIndex.sort(PagesIndex.java:424)
        at io.trino.operator.PagesIndex.sort(PagesIndex.java:418)
        at io.trino.operator.PagesIndexPageSorter.sort(PagesIndexPageSorter.java:44)
        at io.trino.plugin.hive.util.SortBuffer.flushTo(SortBuffer.java:107)
        at io.trino.plugin.hive.SortingFileWriter.commit(SortingFileWriter.java:148)
        at io.trino.plugin.iceberg.IcebergSortingFileWriter.commit(IcebergSortingFileWriter.java:92)
        at io.trino.plugin.iceberg.IcebergPageSink.closeWriter(IcebergPageSink.java:416)
        at io.trino.plugin.iceberg.IcebergPageSink.finish(IcebergPageSink.java:230)
        at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSink.finish(ClassLoaderSafeConnectorPageSink.java:84)
        at io.trino.operator.TableWriterOperator.finish(TableWriterOperator.java:235)
        at io.trino.operator.Driver.processInternal(Driver.java:421)
        at io.trino.operator.Driver.lambda$process$8(Driver.java:306)
        at io.trino.operator.Driver.tryWithLock(Driver.java:709)
        at io.trino.operator.Driver.process(Driver.java:298)
        at io.trino.operator.Driver.processForDuration(Driver.java:269)
        at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:890)
        at io.trino.execution.executor.dedicated.SplitProcessor.run(SplitProcessor.java:77)
        at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.lambda$run$0(TaskEntry.java:191)
        at io.trino.$gen.Trino_451_8_gbf763d2____20240701_125715_2.run(Unknown Source)
        at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.run(TaskEntry.java:192)
        at io.trino.execution.executor.scheduler.FairScheduler.runTask(FairScheduler.java:168)
        at io.trino.execution.executor.scheduler.FairScheduler.lambda$submit$0(FairScheduler.java:155)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
        at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
        at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
        at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1570)


2024-07-01T09:59:07.561-0300    DEBUG   dispatcher-query-3      io.trino.execution.StageStateMachine    Stage 20240701_125907_00003_62u7s.2 is PENDING
2024-07-01T09:59:07.562-0300    DEBUG   stage-scheduler io.trino.execution.scheduler.PipelinedStageExecution    Pipelined stage execution 20240701_125907_00003_62u7s.2 is FINISHED
2024-07-01T09:59:07.563-0300    DEBUG   Query-20240701_125907_00003_62u7s-269   io.trino.execution.scheduler.policy.PhasedExecutionSchedule     scheduledStages: [PipelinedStageStateMachine{stageId=20240701_125907_00003_62u7s.2, state=FINISHED}]
2024-07-01T09:59:07.563-0300    DEBUG   Query-20240701_125907_00003_62u7s-269   io.trino.execution.scheduler.policy.PhasedExecutionSchedule     blockedFragments: []
2024-07-01T09:59:07.563-0300    DEBUG   Query-20240701_125907_00003_62u7s-269   io.trino.execution.scheduler.policy.PhasedExecutionSchedule     selectedForExecution: []
2024-07-01T09:59:07.563-0300    DEBUG   Query-20240701_125907_00003_62u7s-269   io.trino.execution.scheduler.policy.PhasedExecutionSchedule     scheduledStages: [PipelinedStageStateMachine{stageId=20240701_125907_00003_62u7s.1, state=SCHEDULED}, PipelinedStageStateMachine{stageId=20240701_125907_00003_62u7s.2, state=FINISHED}]
2024-07-01T09:59:07.563-0300    DEBUG   Query-20240701_125907_00003_62u7s-269   io.trino.execution.scheduler.policy.PhasedExecutionSchedule     blockedFragments: []
2024-07-01T09:59:07.563-0300    DEBUG   Query-20240701_125907_00003_62u7s-269   io.trino.execution.scheduler.policy.PhasedExecutionSchedule     selectedForExecution: []
2024-07-01T09:59:07.563-0300    DEBUG   stage-scheduler io.trino.execution.scheduler.PipelinedStageExecution    Pipelined stage execution 20240701_125907_00003_62u7s.1 is SCHEDULED
2024-07-01T09:59:07.563-0300    DEBUG   remote-task-callback-1  io.trino.execution.scheduler.PipelinedStageExecution    Pipelined stage execution for stage 20240701_125907_00003_62u7s.1 failed
java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 2
        at com.google.common.collect.RegularImmutableList.get(RegularImmutableList.java:79)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:212)
        at java.base/java.util.Collections$2.tryAdvance(Collections.java:5074)
        at java.base/java.util.Collections$2.forEachRemaining(Collections.java:5082)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:556)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:546)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:702)
        at io.trino.operator.PagesIndex.createPagesIndexComparator(PagesIndex.java:453)
        at io.trino.operator.PagesIndex.sort(PagesIndex.java:424)
        at io.trino.operator.PagesIndex.sort(PagesIndex.java:418)
        at io.trino.operator.PagesIndexPageSorter.sort(PagesIndexPageSorter.java:44)
        at io.trino.plugin.hive.util.SortBuffer.flushTo(SortBuffer.java:107)
        at io.trino.plugin.hive.SortingFileWriter.commit(SortingFileWriter.java:148)
        at io.trino.plugin.iceberg.IcebergSortingFileWriter.commit(IcebergSortingFileWriter.java:92)
        at io.trino.plugin.iceberg.IcebergPageSink.closeWriter(IcebergPageSink.java:416)
        at io.trino.plugin.iceberg.IcebergPageSink.finish(IcebergPageSink.java:230)
        at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSink.finish(ClassLoaderSafeConnectorPageSink.java:84)
        at io.trino.operator.TableWriterOperator.finish(TableWriterOperator.java:235)
        at io.trino.operator.Driver.processInternal(Driver.java:421)
        at io.trino.operator.Driver.lambda$process$8(Driver.java:306)
        at io.trino.operator.Driver.tryWithLock(Driver.java:709)
        at io.trino.operator.Driver.process(Driver.java:298)
        at io.trino.operator.Driver.processForDuration(Driver.java:269)
        at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:890)
        at io.trino.execution.executor.dedicated.SplitProcessor.run(SplitProcessor.java:77)
        at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.lambda$run$0(TaskEntry.java:191)
        at io.trino.$gen.Trino_451_8_gbf763d2____20240701_125715_2.run(Unknown Source)
        at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.run(TaskEntry.java:192)
        at io.trino.execution.executor.scheduler.FairScheduler.runTask(FairScheduler.java:168)
        at io.trino.execution.executor.scheduler.FairScheduler.lambda$submit$0(FairScheduler.java:155)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
        at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
        at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
        at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1570)


2024-07-01T09:59:07.563-0300    DEBUG   stage-scheduler io.trino.execution.scheduler.PipelinedStageExecution    Pipelined stage execution 20240701_125907_00003_62u7s.1 is FAILED
2024-07-01T09:59:07.564-0300    DEBUG   stage-scheduler io.trino.execution.scheduler.PipelinedQueryScheduler    Failure in distributed stage for query 20240701_125907_00003_62u7s
java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 2
        at com.google.common.collect.RegularImmutableList.get(RegularImmutableList.java:79)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:212)
        at java.base/java.util.Collections$2.tryAdvance(Collections.java:5074)
        at java.base/java.util.Collections$2.forEachRemaining(Collections.java:5082)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:556)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:546)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:702)
        at io.trino.operator.PagesIndex.createPagesIndexComparator(PagesIndex.java:453)
        at io.trino.operator.PagesIndex.sort(PagesIndex.java:424)
        at io.trino.operator.PagesIndex.sort(PagesIndex.java:418)
        at io.trino.operator.PagesIndexPageSorter.sort(PagesIndexPageSorter.java:44)
        at io.trino.plugin.hive.util.SortBuffer.flushTo(SortBuffer.java:107)
        at io.trino.plugin.hive.SortingFileWriter.commit(SortingFileWriter.java:148)
        at io.trino.plugin.iceberg.IcebergSortingFileWriter.commit(IcebergSortingFileWriter.java:92)
        at io.trino.plugin.iceberg.IcebergPageSink.closeWriter(IcebergPageSink.java:416)
        at io.trino.plugin.iceberg.IcebergPageSink.finish(IcebergPageSink.java:230)
        at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSink.finish(ClassLoaderSafeConnectorPageSink.java:84)
        at io.trino.operator.TableWriterOperator.finish(TableWriterOperator.java:235)
        at io.trino.operator.Driver.processInternal(Driver.java:421)
        at io.trino.operator.Driver.lambda$process$8(Driver.java:306)
        at io.trino.operator.Driver.tryWithLock(Driver.java:709)
        at io.trino.operator.Driver.process(Driver.java:298)
        at io.trino.operator.Driver.processForDuration(Driver.java:269)
        at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:890)
        at io.trino.execution.executor.dedicated.SplitProcessor.run(SplitProcessor.java:77)
        at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.lambda$run$0(TaskEntry.java:191)
        at io.trino.$gen.Trino_451_8_gbf763d2____20240701_125715_2.run(Unknown Source)
        at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.run(TaskEntry.java:192)
        at io.trino.execution.executor.scheduler.FairScheduler.runTask(FairScheduler.java:168)
        at io.trino.execution.executor.scheduler.FairScheduler.lambda$submit$0(FairScheduler.java:155)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
        at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
        at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
        at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1570)


2024-07-01T09:59:07.564-0300    DEBUG   stage-scheduler io.trino.execution.StageStateMachine    Stage 20240701_125907_00003_62u7s.1 failed
java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 2
        at com.google.common.collect.RegularImmutableList.get(RegularImmutableList.java:79)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:212)
        at java.base/java.util.Collections$2.tryAdvance(Collections.java:5074)
        at java.base/java.util.Collections$2.forEachRemaining(Collections.java:5082)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:556)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:546)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:702)
        at io.trino.operator.PagesIndex.createPagesIndexComparator(PagesIndex.java:453)
        at io.trino.operator.PagesIndex.sort(PagesIndex.java:424)
        at io.trino.operator.PagesIndex.sort(PagesIndex.java:418)
        at io.trino.operator.PagesIndexPageSorter.sort(PagesIndexPageSorter.java:44)
        at io.trino.plugin.hive.util.SortBuffer.flushTo(SortBuffer.java:107)
        at io.trino.plugin.hive.SortingFileWriter.commit(SortingFileWriter.java:148)
        at io.trino.plugin.iceberg.IcebergSortingFileWriter.commit(IcebergSortingFileWriter.java:92)
        at io.trino.plugin.iceberg.IcebergPageSink.closeWriter(IcebergPageSink.java:416)
        at io.trino.plugin.iceberg.IcebergPageSink.finish(IcebergPageSink.java:230)
        at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSink.finish(ClassLoaderSafeConnectorPageSink.java:84)
        at io.trino.operator.TableWriterOperator.finish(TableWriterOperator.java:235)
        at io.trino.operator.Driver.processInternal(Driver.java:421)
        at io.trino.operator.Driver.lambda$process$8(Driver.java:306)
        at io.trino.operator.Driver.tryWithLock(Driver.java:709)
        at io.trino.operator.Driver.process(Driver.java:298)
        at io.trino.operator.Driver.processForDuration(Driver.java:269)
        at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:890)
        at io.trino.execution.executor.dedicated.SplitProcessor.run(SplitProcessor.java:77)
        at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.lambda$run$0(TaskEntry.java:191)
        at io.trino.$gen.Trino_451_8_gbf763d2____20240701_125715_2.run(Unknown Source)
        at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.run(TaskEntry.java:192)
        at io.trino.execution.executor.scheduler.FairScheduler.runTask(FairScheduler.java:168)
        at io.trino.execution.executor.scheduler.FairScheduler.lambda$submit$0(FairScheduler.java:155)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
        at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
        at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
        at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1570)


2024-07-01T09:59:07.564-0300    DEBUG   stage-scheduler io.trino.execution.QueryStateMachine    Query 20240701_125907_00003_62u7s failed
java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 2
        at com.google.common.collect.RegularImmutableList.get(RegularImmutableList.java:79)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:212)
        at java.base/java.util.Collections$2.tryAdvance(Collections.java:5074)
        at java.base/java.util.Collections$2.forEachRemaining(Collections.java:5082)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:556)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:546)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:702)
        at io.trino.operator.PagesIndex.createPagesIndexComparator(PagesIndex.java:453)
        at io.trino.operator.PagesIndex.sort(PagesIndex.java:424)
        at io.trino.operator.PagesIndex.sort(PagesIndex.java:418)
        at io.trino.operator.PagesIndexPageSorter.sort(PagesIndexPageSorter.java:44)
        at io.trino.plugin.hive.util.SortBuffer.flushTo(SortBuffer.java:107)
        at io.trino.plugin.hive.SortingFileWriter.commit(SortingFileWriter.java:148)
        at io.trino.plugin.iceberg.IcebergSortingFileWriter.commit(IcebergSortingFileWriter.java:92)
        at io.trino.plugin.iceberg.IcebergPageSink.closeWriter(IcebergPageSink.java:416)
        at io.trino.plugin.iceberg.IcebergPageSink.finish(IcebergPageSink.java:230)
        at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSink.finish(ClassLoaderSafeConnectorPageSink.java:84)
        at io.trino.operator.TableWriterOperator.finish(TableWriterOperator.java:235)
        at io.trino.operator.Driver.processInternal(Driver.java:421)
        at io.trino.operator.Driver.lambda$process$8(Driver.java:306)
        at io.trino.operator.Driver.tryWithLock(Driver.java:709)
        at io.trino.operator.Driver.process(Driver.java:298)
        at io.trino.operator.Driver.processForDuration(Driver.java:269)
        at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:890)
        at io.trino.execution.executor.dedicated.SplitProcessor.run(SplitProcessor.java:77)
        at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.lambda$run$0(TaskEntry.java:191)
        at io.trino.$gen.Trino_451_8_gbf763d2____20240701_125715_2.run(Unknown Source)
        at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.run(TaskEntry.java:192)
        at io.trino.execution.executor.scheduler.FairScheduler.runTask(FairScheduler.java:168)
        at io.trino.execution.executor.scheduler.FairScheduler.lambda$submit$0(FairScheduler.java:155)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
        at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
        at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
        at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1570)

It seems that we need to make some other changes to make the sorted_by using nested columns works properly, and I don't know if we should change all the related code on this PR or create multiple PRs, WYT?

@mattheusv
Copy link
Author

Another problem that I've notice is that if I execute ALTER TABLE iceberg.test.t execute optmize on a table sorted using nested columns it raise an exception:

I've tried to get a little deep on this error and the problem is how Trino search for columns on a table schema when performing the sorting.

The class IcebergPageSink store the index of the column that will be used to sort a file at the field sortColumnIndexes, and the way that this List of indexes is filled is not considering nested columns:
https://github.com/trinodb/trino/blob/master/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java#L189

So when .indexOf(column) is called with a nested column it is returned -1 and when the sort operation is performed on SortBuffer#flushTo method it raise the java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 2 exception.

I don't have many ideas on how this can be fixed, since several classes used in this process use the sortColumnIndexes field to sort the file

Previously the `parseSortFields` was only collecting the field id from
the top level columns don't considering nested fields of nested types,
so in case a query with a `sorted_by` property use a nested field of a
nested type trino would throw an expcetion that the column don't exists,
because the field id of the nested column don't exists on
`baseColumnFieldIds` set.

This commit fix this issue by recursively collecting the field ids from
table columns which the column type is a nested type.

Fix: trinodb#19620
Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Jul 26, 2024
@ebyhr ebyhr added stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed. and removed stale labels Aug 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed iceberg Iceberg connector stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed.
Development

Successfully merging this pull request may close these issues.

Cannot create Iceberg table with sort order having nested column
7 participants