Skip to content

Commit

Permalink
Refactor IcebergMetadata to support multiple catalogs
Browse files Browse the repository at this point in the history
  • Loading branch information
jackye1995 authored and electrum committed Aug 16, 2021
1 parent 3791bae commit 9d73101
Show file tree
Hide file tree
Showing 16 changed files with 1,037 additions and 590 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

public enum CatalogType
{
HIVE,
// TODO: dummy type to pass IcebergConfig test, remove it after adding actual catalog types
UNKNOWN,

/**/;
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.hive.HiveType.toHiveType;
import static io.trino.plugin.hive.ViewReaderUtil.isHiveOrPrestoView;
import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView;
import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider;
Expand Down Expand Up @@ -133,6 +135,10 @@ public TableMetadata refresh()

Table table = getTable();

if (isPrestoView(table) && isHiveOrPrestoView(table)) {
// this is a Hive view, hence not a table
throw new TableNotFoundException(getSchemaTableName());
}
if (!isIcebergTable(table)) {
throw new UnknownTableTypeException(getSchemaTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import javax.validation.constraints.NotNull;

import static io.trino.plugin.hive.HiveCompressionCodec.GZIP;
import static io.trino.plugin.iceberg.CatalogType.HIVE;
import static io.trino.plugin.iceberg.IcebergFileFormat.ORC;

public class IcebergConfig
Expand All @@ -31,6 +32,19 @@ public class IcebergConfig
private boolean useFileSizeFromMetadata = true;
private int maxPartitionsPerWriter = 100;
private boolean uniqueTableLocation;
private CatalogType catalogType = HIVE;

public CatalogType getCatalogType()
{
return catalogType;
}

@Config("iceberg.catalog.type")
public IcebergConfig setCatalogType(CatalogType catalogType)
{
this.catalogType = catalogType;
return this;
}

@NotNull
public FileFormat getFileFormat()
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
package io.trino.plugin.iceberg;

import io.airlift.json.JsonCodec;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.spi.type.TypeManager;

import javax.inject.Inject;
Expand All @@ -26,51 +22,23 @@

public class IcebergMetadataFactory
{
private final CatalogName catalogName;
private final HiveMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
private final TypeManager typeManager;
private final JsonCodec<CommitTaskData> commitTaskCodec;
private final HiveTableOperationsProvider tableOperationsProvider;
private final String trinoVersion;
private final boolean useUniqueTableLocation;
private final TrinoCatalogFactory catalogFactory;

@Inject
public IcebergMetadataFactory(
CatalogName catalogName,
IcebergConfig config,
HiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskDataJsonCodec,
HiveTableOperationsProvider tableOperationsProvider,
NodeVersion nodeVersion)
{
this(catalogName, metastore, hdfsEnvironment, typeManager, commitTaskDataJsonCodec, tableOperationsProvider, nodeVersion, config.isUniqueTableLocation());
}

public IcebergMetadataFactory(
CatalogName catalogName,
HiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec,
HiveTableOperationsProvider tableOperationsProvider,
NodeVersion nodeVersion,
boolean useUniqueTableLocation)
TrinoCatalogFactory catalogFactory)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null");
this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString();
this.useUniqueTableLocation = useUniqueTableLocation;
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
}

public IcebergMetadata create()
{
return new IcebergMetadata(catalogName, metastore, hdfsEnvironment, typeManager, commitTaskCodec, tableOperationsProvider, trinoVersion, useUniqueTableLocation);
return new IcebergMetadata(typeManager, commitTaskCodec, catalogFactory.create());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void configure(Binder binder)
configBinder(binder).bindConfig(ParquetReaderConfig.class);
configBinder(binder).bindConfig(ParquetWriterConfig.class);

binder.bind(TrinoCatalogFactory.class).in(Scopes.SINGLETON);
binder.bind(IcebergMetadataFactory.class).in(Scopes.SINGLETON);

jsonCodecBinder(binder).bindJsonCodec(CommitTaskData.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class IcebergSplitManager
private final IcebergTransactionManager transactionManager;

@Inject
public IcebergSplitManager(IcebergTransactionManager transactionManager, HiveTableOperationsProvider tableOperationsProvider)
public IcebergSplitManager(IcebergTransactionManager transactionManager)
{
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.session.PropertyMetadata.enumProperty;
Expand Down Expand Up @@ -84,8 +85,8 @@ public static List<String> getPartitioning(Map<String, Object> tableProperties)
return partitioning == null ? ImmutableList.of() : ImmutableList.copyOf(partitioning);
}

public static String getTableLocation(Map<String, Object> tableProperties)
public static Optional<String> getTableLocation(Map<String, Object> tableProperties)
{
return (String) tableProperties.get(LOCATION_PROPERTY);
return Optional.ofNullable((String) tableProperties.get(LOCATION_PROPERTY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
Expand All @@ -39,20 +41,25 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Stream;

Expand All @@ -63,6 +70,10 @@
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation;
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
import static io.trino.plugin.iceberg.TypeConverter.toIcebergType;
import static io.trino.plugin.iceberg.util.Timestamps.timestampTzFromMicros;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -174,13 +185,13 @@ public static Map<Integer, PrimitiveType> primitiveFieldTypes(Schema schema)
.collect(toImmutableMap(Entry::getKey, Entry::getValue));
}

private static Stream<Entry<Integer, PrimitiveType>> primitiveFieldTypes(List<Types.NestedField> nestedFields)
private static Stream<Entry<Integer, PrimitiveType>> primitiveFieldTypes(List<NestedField> nestedFields)
{
return nestedFields.stream()
.flatMap(IcebergUtil::primitiveFieldTypes);
}

private static Stream<Entry<Integer, PrimitiveType>> primitiveFieldTypes(Types.NestedField nestedField)
private static Stream<Entry<Integer, PrimitiveType>> primitiveFieldTypes(NestedField nestedField)
{
org.apache.iceberg.types.Type fieldType = nestedField.type();
if (fieldType.isPrimitiveType()) {
Expand Down Expand Up @@ -332,4 +343,39 @@ public static LocationProvider getLocationProvider(SchemaTableName schemaTableNa
}
return locationsFor(tableLocation, storageProperties);
}

public static Schema toIcebergSchema(List<ColumnMetadata> columns)
{
List<NestedField> icebergColumns = new ArrayList<>();
for (ColumnMetadata column : columns) {
if (!column.isHidden()) {
int index = icebergColumns.size();
org.apache.iceberg.types.Type type = toIcebergType(column.getType());
NestedField field = NestedField.of(index, column.isNullable(), column.getName(), type, column.getComment());
icebergColumns.add(field);
}
}
org.apache.iceberg.types.Type icebergSchema = StructType.of(icebergColumns);
AtomicInteger nextFieldId = new AtomicInteger(1);
icebergSchema = TypeUtil.assignFreshIds(icebergSchema, nextFieldId::getAndIncrement);
return new Schema(icebergSchema.asStructType().fields());
}

public static Transaction newCreateTableTransaction(TrinoCatalog catalog, ConnectorTableMetadata tableMetadata, ConnectorSession session)
{
SchemaTableName schemaTableName = tableMetadata.getTable();
Schema schema = toIcebergSchema(tableMetadata.getColumns());
PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties()));
String targetPath = getTableLocation(tableMetadata.getProperties())
.orElse(catalog.defaultTableLocation(session, schemaTableName));

ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builderWithExpectedSize(2);
FileFormat fileFormat = IcebergTableProperties.getFileFormat(tableMetadata.getProperties());
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString());
if (tableMetadata.getComment().isPresent()) {
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
}

return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, targetPath, propertiesBuilder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.lang.invoke.MethodHandle;

import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable;
import static io.trino.spi.block.MethodHandleUtil.methodHandle;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.VarcharType.VARCHAR;
Expand All @@ -41,12 +40,12 @@ public class RollbackToSnapshotProcedure
String.class,
Long.class);

private final HiveTableOperationsProvider tableOperationsProvider;
private final TrinoCatalog catalog;

@Inject
public RollbackToSnapshotProcedure(HiveTableOperationsProvider tableOperationsProvider)
public RollbackToSnapshotProcedure(TrinoCatalogFactory catalogFactory)
{
this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null");
this.catalog = requireNonNull(catalogFactory, "catalogFactory is null").create();
}

@Override
Expand All @@ -65,7 +64,7 @@ public Procedure get()
public void rollbackToSnapshot(ConnectorSession clientSession, String schema, String table, Long snapshotId)
{
SchemaTableName schemaTableName = new SchemaTableName(schema, table);
Table icebergTable = loadIcebergTable(tableOperationsProvider, clientSession, schemaTableName);
Table icebergTable = catalog.loadTable(clientSession, schemaTableName);
icebergTable.rollback().toSnapshotId(snapshotId).commit();
}
}
Loading

0 comments on commit 9d73101

Please sign in to comment.