Skip to content

Commit

Permalink
[iceberg] Refactor IcebergWritableTableHandle's schema and partitionS…
Browse files Browse the repository at this point in the history
…pec fields

As part of the effort to support iceberg writes with Presto C++, since we don't have iceberg C++ library support to parse the json strings for Schema and PartitionSpec, we will send the respective Presto wrapper classes
  • Loading branch information
imjalpreet authored and aditi-pandit committed Aug 9, 2024
1 parent 6566e7f commit 6d55f58
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -145,6 +144,8 @@
import static com.facebook.presto.iceberg.PartitionFields.getPartitionColumnName;
import static com.facebook.presto.iceberg.PartitionFields.getTransformTerm;
import static com.facebook.presto.iceberg.PartitionFields.toPartitionFields;
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
import static com.facebook.presto.iceberg.TableStatisticsMaker.getSupportedColumnStatistics;
import static com.facebook.presto.iceberg.TypeConverter.toIcebergType;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
Expand Down Expand Up @@ -441,8 +442,8 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(ConnectorSession se
return new IcebergInsertTableHandle(
table.getSchemaName(),
table.getIcebergTableName(),
SchemaParser.toJson(icebergTable.schema()),
PartitionSpecParser.toJson(icebergTable.spec()),
toPrestoSchema(icebergTable.schema(), typeManager),
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
icebergTable.location(),
getFileFormat(icebergTable),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
Expand Down Expand Up @@ -124,6 +122,8 @@
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
import static com.facebook.presto.iceberg.util.StatisticsUtil.calculateAndSetTableSize;
import static com.facebook.presto.iceberg.util.StatisticsUtil.mergeHiveStatistics;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
Expand Down Expand Up @@ -315,8 +315,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
return new IcebergOutputTableHandle(
schemaName,
new IcebergTableName(tableName, DATA, Optional.empty(), Optional.empty()),
SchemaParser.toJson(metadata.schema()),
PartitionSpecParser.toJson(metadata.spec()),
toPrestoSchema(metadata.schema(), typeManager),
toPrestoPartitionSpec(metadata.spec(), typeManager),
getColumns(metadata.schema(), metadata.spec(), typeManager),
targetPath,
fileFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public class IcebergInsertTableHandle
public IcebergInsertTableHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") IcebergTableName tableName,
@JsonProperty("schemaAsJson") String schemaAsJson,
@JsonProperty("partitionSpecAsJson") String partitionSpecAsJson,
@JsonProperty("schema") PrestoIcebergSchema schema,
@JsonProperty("partitionSpec") PrestoIcebergPartitionSpec partitionSpec,
@JsonProperty("inputColumns") List<IcebergColumnHandle> inputColumns,
@JsonProperty("outputPath") String outputPath,
@JsonProperty("fileFormat") FileFormat fileFormat,
Expand All @@ -40,8 +40,8 @@ public IcebergInsertTableHandle(
super(
schemaName,
tableName,
schemaAsJson,
partitionSpecAsJson,
schema,
partitionSpec,
inputColumns,
outputPath,
fileFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import com.facebook.presto.spi.relation.RowExpressionService;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -53,6 +51,8 @@
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergNamespace;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -183,8 +183,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
return new IcebergOutputTableHandle(
schemaName,
new IcebergTableName(tableName, DATA, Optional.empty(), Optional.empty()),
SchemaParser.toJson(icebergTable.schema()),
PartitionSpecParser.toJson(icebergTable.spec()),
toPrestoSchema(icebergTable.schema(), typeManager),
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
icebergTable.location(),
fileFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public class IcebergOutputTableHandle
public IcebergOutputTableHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") IcebergTableName tableName,
@JsonProperty("schemaAsJson") String schemaAsJson,
@JsonProperty("partitionSpecAsJson") String partitionSpecAsJson,
@JsonProperty("schema") PrestoIcebergSchema schema,
@JsonProperty("partitionSpec") PrestoIcebergPartitionSpec partitionSpec,
@JsonProperty("inputColumns") List<IcebergColumnHandle> inputColumns,
@JsonProperty("outputPath") String outputPath,
@JsonProperty("fileFormat") FileFormat fileFormat,
Expand All @@ -40,8 +40,8 @@ public IcebergOutputTableHandle(
super(
schemaName,
tableName,
schemaAsJson,
partitionSpecAsJson,
schema,
partitionSpec,
inputColumns,
outputPath,
fileFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.io.LocationProvider;

import javax.inject.Inject;

import static com.facebook.presto.iceberg.IcebergUtil.getLocationProvider;
import static com.facebook.presto.iceberg.PartitionSpecConverter.toIcebergPartitionSpec;
import static com.facebook.presto.iceberg.SchemaConverter.toIcebergSchema;
import static java.util.Objects.requireNonNull;

public class IcebergPageSinkProvider
Expand Down Expand Up @@ -76,8 +76,8 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritableTableHandle tableHandle)
{
HdfsContext hdfsContext = new HdfsContext(session, tableHandle.getSchemaName(), tableHandle.getTableName().getTableName());
Schema schema = SchemaParser.fromJson(tableHandle.getSchemaAsJson());
PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, tableHandle.getPartitionSpecAsJson());
Schema schema = toIcebergSchema(tableHandle.getSchema());
PartitionSpec partitionSpec = toIcebergPartitionSpec(tableHandle.getPartitionSpec()).toUnbound().bind(schema);
LocationProvider locationProvider = getLocationProvider(new SchemaTableName(tableHandle.getSchemaName(), tableHandle.getTableName().getTableName()),
tableHandle.getOutputPath(), tableHandle.getStorageProperties());
return new IcebergPageSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public class IcebergWritableTableHandle
{
private final String schemaName;
private final IcebergTableName tableName;
private final String schemaAsJson;
private final String partitionSpecAsJson;
private final PrestoIcebergSchema schema;
private final PrestoIcebergPartitionSpec partitionSpec;
private final List<IcebergColumnHandle> inputColumns;
private final String outputPath;
private final FileFormat fileFormat;
Expand All @@ -37,8 +37,8 @@ public class IcebergWritableTableHandle
public IcebergWritableTableHandle(
String schemaName,
IcebergTableName tableName,
String schemaAsJson,
String partitionSpecAsJson,
PrestoIcebergSchema schema,
PrestoIcebergPartitionSpec partitionSpec,
List<IcebergColumnHandle> inputColumns,
String outputPath,
FileFormat fileFormat,
Expand All @@ -47,8 +47,8 @@ public IcebergWritableTableHandle(
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.schemaAsJson = requireNonNull(schemaAsJson, "schemaAsJson is null");
this.partitionSpecAsJson = requireNonNull(partitionSpecAsJson, "partitionSpecAsJson is null");
this.schema = requireNonNull(schema, "schema is null");
this.partitionSpec = requireNonNull(partitionSpec, "partitionSpec is null");
this.inputColumns = ImmutableList.copyOf(requireNonNull(inputColumns, "inputColumns is null"));
this.outputPath = requireNonNull(outputPath, "filePrefix is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
Expand All @@ -69,15 +69,15 @@ public IcebergTableName getTableName()
}

@JsonProperty
public String getSchemaAsJson()
public PrestoIcebergSchema getSchema()
{
return schemaAsJson;
return schema;
}

@JsonProperty
public String getPartitionSpecAsJson()
public PrestoIcebergPartitionSpec getPartitionSpec()
{
return partitionSpecAsJson;
return partitionSpec;
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Term;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.regex.MatchResult;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -58,7 +61,15 @@ private PartitionFields() {}

public static PartitionSpec parsePartitionFields(Schema schema, List<String> fields)
{
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
return parsePartitionFields(schema, fields, null);
}

public static PartitionSpec parsePartitionFields(Schema schema, List<String> fields, @Nullable Integer specId)
{
PartitionSpec.Builder builder = Optional.ofNullable(specId)
.map(id -> PartitionSpec.builderFor(schema).withSpecId(id))
.orElseGet(() -> PartitionSpec.builderFor(schema));

for (String field : fields) {
parsePartitionField(builder, field);
}
Expand Down

0 comments on commit 6d55f58

Please sign in to comment.