Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Iceberg merge, update, delete, for tables with equality deletes #24062

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ public class IcebergColumnHandle
private static final int INSTANCE_SIZE = instanceSize(IcebergColumnHandle.class);

// Iceberg reserved row ids begin at INTEGER.MAX_VALUE and count down. Starting with MIN_VALUE here to avoid conflicts.
public static final int TRINO_UPDATE_ROW_ID = Integer.MIN_VALUE;
public static final int TRINO_MERGE_ROW_ID = Integer.MIN_VALUE + 1;
public static final int TRINO_MERGE_ROW_ID = Integer.MIN_VALUE;
public static final String TRINO_ROW_ID_NAME = "$row_id";

public static final int TRINO_MERGE_PARTITION_SPEC_ID = Integer.MIN_VALUE + 2;
Copy link
Contributor

@findinpath findinpath Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can likely adjust the values (decrease by 1) of the other ID fields now that we don't have anymore Integer.MIN_VALUE + 1

Expand Down Expand Up @@ -182,12 +181,6 @@ public boolean isRowPositionColumn()
return id == ROW_POSITION.fieldId();
}

@JsonIgnore
public boolean isUpdateRowIdColumn()
{
return id == TRINO_UPDATE_ROW_ID;
}

@JsonIgnore
public boolean isMergeRowIdColumn()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static io.trino.plugin.base.util.Closables.closeAllSuppress;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class IcebergPageSource
Expand All @@ -46,10 +46,9 @@ public class IcebergPageSource
private final ConnectorPageSource delegate;
private final Optional<ReaderProjectionsAdapter> projectionsAdapter;
private final Supplier<Optional<RowPredicate>> deletePredicate;
// An array with one element per field in the $row_id column. The value in the array points to the
// channel where the data can be read from.
private int[] rowIdChildColumnIndexes = new int[0];
private final Function<Block, RowBlock> rowIdBlockFactory;
// The $row_id's index in 'expectedColumns', or -1 if there isn't one
// this column with contain row position populated in the source, and must be wrapped with constant data for full row id
private int rowIdColumnIndex = -1;
// Maps the Iceberg field ids of unmodified columns to their indexes in updateRowIdChildColumnIndexes

Expand All @@ -58,7 +57,8 @@ public IcebergPageSource(
List<IcebergColumnHandle> requiredColumns,
ConnectorPageSource delegate,
Optional<ReaderProjectionsAdapter> projectionsAdapter,
Supplier<Optional<RowPredicate>> deletePredicate)
Supplier<Optional<RowPredicate>> deletePredicate,
Function<Block, RowBlock> rowIdBlockFactory)
{
// expectedColumns should contain columns which should be in the final Page
// requiredColumns should include all expectedColumns as well as any columns needed by the DeleteFilter
Expand All @@ -70,22 +70,15 @@ public IcebergPageSource(
checkArgument(expectedColumn.equals(requiredColumns.get(i)), "Expected columns must be a prefix of required columns");
expectedColumnIndexes[i] = i;

if (expectedColumn.isUpdateRowIdColumn() || expectedColumn.isMergeRowIdColumn()) {
if (expectedColumn.isMergeRowIdColumn()) {
this.rowIdColumnIndex = i;

Map<Integer, Integer> fieldIdToColumnIndex = mapFieldIdsToIndex(requiredColumns);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mapFieldIdsToIndex function is not used anymore.

List<ColumnIdentity> rowIdFields = expectedColumn.getColumnIdentity().getChildren();
this.rowIdChildColumnIndexes = new int[rowIdFields.size()];
for (int columnIndex = 0; columnIndex < rowIdFields.size(); columnIndex++) {
int fieldId = rowIdFields.get(columnIndex).getId();
rowIdChildColumnIndexes[columnIndex] = requireNonNull(fieldIdToColumnIndex.get(fieldId), () -> format("Column %s not found in requiredColumns", fieldId));
}
}
}

this.delegate = requireNonNull(delegate, "delegate is null");
this.projectionsAdapter = requireNonNull(projectionsAdapter, "projectionsAdapter is null");
this.deletePredicate = requireNonNull(deletePredicate, "deletePredicate is null");
this.rowIdBlockFactory = requireNonNull(rowIdBlockFactory, "rowIdBlockFactory is null");
}

@Override
Expand Down Expand Up @@ -161,21 +154,11 @@ private Page withRowIdBlock(Page page)
return page;
}

Block[] rowIdFields = new Block[rowIdChildColumnIndexes.length];
for (int childIndex = 0; childIndex < rowIdChildColumnIndexes.length; childIndex++) {
rowIdFields[childIndex] = page.getBlock(rowIdChildColumnIndexes[childIndex]);
}

RowBlock rowIdBlock = rowIdBlockFactory.apply(page.getBlock(rowIdColumnIndex));
Block[] fullPage = new Block[page.getChannelCount()];
for (int channel = 0; channel < page.getChannelCount(); channel++) {
if (channel == rowIdColumnIndex) {
fullPage[channel] = RowBlock.fromFieldBlocks(page.getPositionCount(), rowIdFields);
continue;
}

fullPage[channel] = page.getBlock(channel);
fullPage[channel] = channel == rowIdColumnIndex ? rowIdBlock : page.getBlock(channel);
}

return new Page(page.getPositionCount(), fullPage);
}

Expand Down
Loading