Skip to content

Commit

Permalink
Add a wrapper for the ChangelogOperation enum from Iceberg Library
Browse files Browse the repository at this point in the history
  • Loading branch information
imjalpreet authored and yingsu00 committed Feb 5, 2024
1 parent 4bedf67 commit 6588bab
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.changelog;

import com.facebook.presto.spi.PrestoException;

import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;

/**
* This wrapper class is added to avoid adding special classes and simplifying
* the presto protocol generation for Prestissimo
*/
public enum ChangelogOperation
{
INSERT,
DELETE,
UPDATE_BEFORE,
UPDATE_AFTER;

public static ChangelogOperation fromIcebergChangelogOperation(org.apache.iceberg.ChangelogOperation operation)
{
ChangelogOperation prestoOperation;
switch (operation) {
case INSERT:
prestoOperation = INSERT;
break;
case DELETE:
prestoOperation = DELETE;
break;
case UPDATE_BEFORE:
prestoOperation = UPDATE_BEFORE;
break;
case UPDATE_AFTER:
prestoOperation = UPDATE_AFTER;
break;
default:
throw new PrestoException(NOT_SUPPORTED, "Unsupported Changelog Operation: " + operation);
}

return prestoOperation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.facebook.presto.iceberg.IcebergColumnHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.iceberg.ChangelogOperation;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys;
import static com.facebook.presto.iceberg.changelog.ChangelogOperation.fromIcebergChangelogOperation;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.google.common.collect.Iterators.limit;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -127,7 +128,7 @@ private IcebergSplit splitFromContentScanTask(ContentScanTask<DataFile> task, Ch
getNodeSelectionStrategy(session),
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)),
ImmutableList.of(),
Optional.of(new ChangelogSplitInfo(changeTask.operation(),
Optional.of(new ChangelogSplitInfo(fromIcebergChangelogOperation(changeTask.operation()),
changeTask.changeOrdinal(),
changeTask.commitSnapshotId(),
columnHandles)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.iceberg.changelog.ChangelogOperation;
import com.facebook.presto.spi.function.AggregationFunction;
import com.facebook.presto.spi.function.AggregationState;
import com.facebook.presto.spi.function.BlockIndex;
Expand All @@ -26,10 +27,10 @@
import com.facebook.presto.spi.function.SqlType;
import com.facebook.presto.spi.function.TypeParameter;
import io.airlift.slice.Slice;
import org.apache.iceberg.ChangelogOperation;

import java.util.Optional;

import static com.facebook.presto.iceberg.changelog.ChangelogOperation.DELETE;
import static com.facebook.presto.spi.function.SqlFunctionVisibility.HIDDEN;

@AggregationFunction(value = "apply_changelog", visibility = HIDDEN)
Expand Down Expand Up @@ -86,7 +87,7 @@ public static void output(
return;
}

if (ChangelogOperation.valueOf(record.get().getLastOperation().toStringUtf8().toUpperCase()).equals(ChangelogOperation.DELETE)) {
if (ChangelogOperation.valueOf(record.get().getLastOperation().toStringUtf8().toUpperCase()).equals(DELETE)) {
out.appendNull();
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@
import com.facebook.presto.common.type.IntegerType;
import com.facebook.presto.common.type.RowType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.iceberg.changelog.ChangelogOperation;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.apache.iceberg.ChangelogOperation;
import org.openjdk.jol.info.ClassLayout;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.iceberg.changelog.ChangelogOperation.INSERT;
import static java.util.Objects.requireNonNull;

public class ChangelogRecord
Expand Down Expand Up @@ -98,7 +99,7 @@ else if (other.lastOrdinal == lastOrdinal) {
switch (operation) {
case UPDATE_AFTER:
case INSERT:
if (ChangelogOperation.valueOf(lastOperation.toStringUtf8().toUpperCase()).equals(ChangelogOperation.INSERT)) {
if (ChangelogOperation.valueOf(lastOperation.toStringUtf8().toUpperCase()).equals(INSERT)) {
throw new PrestoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "unresolvable order for two inserts");
}
lastOperation = other.lastOperation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.iceberg.changelog.ChangelogOperation;
import com.facebook.presto.iceberg.function.changelog.ApplyChangelogFunction;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.FunctionExtractor;
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.spi.function.JavaAggregationFunctionImplementation;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.apache.iceberg.ChangelogOperation;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

Expand All @@ -33,6 +33,8 @@
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.iceberg.changelog.ChangelogOperation.DELETE;
import static com.facebook.presto.iceberg.changelog.ChangelogOperation.INSERT;
import static com.facebook.presto.iceberg.function.changelog.ApplyChangelogFunction.NAME;
import static com.facebook.presto.operator.aggregation.AggregationTestUtils.assertAggregation;
import static com.facebook.presto.sql.analyzer.TypeSignatureProvider.fromTypes;
Expand All @@ -55,7 +57,7 @@ public void testInsert()
assertAggregation(impl,
2,
toBlocks(
record(1, ChangelogOperation.INSERT, 2)));
record(1, INSERT, 2)));
}

@Test
Expand All @@ -66,9 +68,9 @@ public void testUpdate()
assertAggregation(impl,
2,
toBlocks(
record(0, ChangelogOperation.INSERT, 1),
record(1, ChangelogOperation.DELETE, 1),
record(1, ChangelogOperation.INSERT, 2)));
record(0, INSERT, 1),
record(1, DELETE, 1),
record(1, INSERT, 2)));
}

@Test
Expand All @@ -79,7 +81,7 @@ public void testDelete()
assertAggregation(impl,
null,
toBlocks(
record(0, ChangelogOperation.DELETE, 1)));
record(0, DELETE, 1)));
}

@Test
Expand All @@ -100,12 +102,12 @@ public void testMultiUpdate()
assertAggregation(impl,
5,
toBlocks(
record(0, ChangelogOperation.DELETE, 1),
record(1, ChangelogOperation.INSERT, 2),
record(2, ChangelogOperation.DELETE, 2),
record(2, ChangelogOperation.INSERT, 3),
record(3, ChangelogOperation.DELETE, 3),
record(4, ChangelogOperation.INSERT, 5)));
record(0, DELETE, 1),
record(1, INSERT, 2),
record(2, DELETE, 2),
record(2, INSERT, 3),
record(3, DELETE, 3),
record(4, INSERT, 5)));
}

private static class ChangelogRecord
Expand Down

0 comments on commit 6588bab

Please sign in to comment.