Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iceberg: support row-level delete and update #8565

Closed
wants to merge 1 commit into from

Conversation

jackye1995
Copy link
Member

This PR adds support for writing Iceberg position delete. Similar to #8534 , I first present our working internal implementation backported to Trino, some parts might not work because of internal differences, but once we agree with the general approach I will make the fix and add unit tests.

Also, there is a missing piece that has to be added after #8534 is first merged, so that the IcebergPageSource has the ability to retain the row position channel and pass it to the updatable page source.

A few key points:

  1. we choose to support row level delete through position delete spec instead of equality delete, this is based on the general guidance from Iceberg that position delete is preferred when possible. Plus the delete write mechanism in Trino fits perfectly with the position delete spec, as data is first scanned and filtered anyway, recording equality wastes the computation effort comparing to recording positions.
  2. Trino delete row ID column has Trino type ROW(string file_path, long pos, row(table schema)), which matches Iceberg's position delete file schema.
  3. update and delete share exactly the same row id type and also beginXXX and finishXXX operation implementation. The only difference is that update writes new data files after writing the delete files. This is because update in Iceberg is modelled as delete + insert.
  4. In the page source provider, if it detects the operation is for DELETE or UPDATE (columns contain row id column), it will automatically read all the table columns (except for the identity partition columns), because the entire row is a part of the position delete schema anyway.
  5. I directly reused the current Iceberg sink implementation to write back the position delete rows and updated rows. It's probably not the most optimal, but simple enough for the first iteration.

This is a bare minimum backport, I left some inline TODOs, and also there are many optimizaitons we can make after the base version is checked in, I tried to keep this as simple as possible to avoid too many disagreements around optimization related changes. Please let me know if this looks good or not, thanks!

@phd3 @electrum @findepi @losipiuk @caneGuy @rdblue @hashhar

@cla-bot cla-bot bot added the cla-signed label Jul 15, 2021
@findepi
Copy link
Member

findepi commented Jul 29, 2021

@jackye1995 can you please add product test that would assert compatibility between Trino and Spark?
see

private static void verifySelectForTrinoAndHive(String select, String whereClause, QueryAssert.Row... rows)
for how we test Trino/Hive compatibility for ORC ACID tables. This approach has proven to be useful and helped find some bugs too.

@lhofhansl
Copy link
Member

lhofhansl commented Aug 30, 2021

If I wanted to try to this out, I'd need to create an Iceberg Table adhering to the Iceberg Format Specification V2, since you are proposing using delete snapshots, right?

And should we bump Iceberg to 0.12 (that version has the final V2 spec)?

@jackye1995

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -112,7 +112,7 @@ private RowBlock(int startOffset, int positionCount, @Nullable boolean[] rowIsNu
}

@Override
protected Block[] getRawFieldBlocks()
public Block[] getRawFieldBlocks()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder why this is needed, and whether this is actually used correctly.


public static IcebergColumnHandle createUpdateRowIdColumnHandle(Schema tableSchema, TypeManager typeManager)
{
return create(required(ROW_ID_COLUMN_INDEX, ROW_ID_COLUMN_NAME, DeleteSchemaUtil.posDeleteSchema(tableSchema).asStruct()), typeManager);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it used for deletes only, or for updates as well?

Comment on lines +263 to +264
serializeToBytes(table.schema()),
serializeToBytes(table.spec()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there already was an idea to add schema to IcebergTableHandle and it was rejected (?) for some reason.

@phd3 do you remember?

}
else {
Schema posDeleteSchema = DeleteSchemaUtil.posDeleteSchema(table.getSchema());
ConnectorPageSink posDeleteSink = new IcebergPageSink(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

positionalDeletesSink

private final List<IcebergColumnHandle> allTableColumns;
private final List<IcebergColumnHandle> updateColumns;
private final ConnectorPageSource source;
private final ConnectorPageSink posDeleteSink;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

positionalDeletesSink

FileContent.POSITION_DELETES,
maxOpenPartitions);

ConnectorPageSink updateRowSink = new IcebergPageSink(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updatedDataSink

private final List<IcebergColumnHandle> updateColumns;
private final ConnectorPageSource source;
private final ConnectorPageSink posDeleteSink;
private final ConnectorPageSink updateRowSink;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updatedDataSink

}

Block[] updatedRows = new Block[allTableColumns.size()];
Block[] oldRows = ((RowBlock) rowIdBlock.getRawFieldBlocks()[2]).getRawFieldBlocks();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cast to RowBlock isn't entirely correct.
See #9354 and perhaps we should use ColumnarRow here.
cc @djsstarburst

resultBlocks[i] = RowBlock.fromFieldBlocks(pageSize, Optional.empty(), rowIdComponentBlocks);
}
else {
resultBlocks[i] = sourcePage.getBlock(allTableColumns.indexOf(columnHandle));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indexOf use here looks quadratic, and we seem to be doing this for every page.

@jackye1995
Copy link
Member Author

close in favor of #10075

@jackye1995 jackye1995 closed this Nov 26, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

3 participants