diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java new file mode 100644 index 0000000000000..8bf3aa63ca095 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java @@ -0,0 +1,23 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +public enum CatalogType +{ + HIVE, + // TODO: dummy type to pass IcebergConfig test, remove it after adding actual catalog types + UNKNOWN, + + /**/; +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HiveTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HiveTableOperations.java index 255a7442e1e88..312082f0e9baa 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HiveTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HiveTableOperations.java @@ -51,6 +51,8 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.trino.plugin.hive.HiveType.toHiveType; +import static io.trino.plugin.hive.ViewReaderUtil.isHiveOrPrestoView; +import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView; import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider; @@ -133,6 +135,10 @@ public TableMetadata refresh() Table table = getTable(); + if (isPrestoView(table) && isHiveOrPrestoView(table)) { + // this is a Hive view, hence not a table + throw new TableNotFoundException(getSchemaTableName()); + } if (!isIcebergTable(table)) { throw new UnknownTableTypeException(getSchemaTableName()); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index d2fb2b02d8407..c538fe6445781 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -22,6 +22,7 @@ import javax.validation.constraints.NotNull; import static io.trino.plugin.hive.HiveCompressionCodec.GZIP; +import static io.trino.plugin.iceberg.CatalogType.HIVE; import static io.trino.plugin.iceberg.IcebergFileFormat.ORC; public class IcebergConfig @@ -31,6 +32,19 @@ public class IcebergConfig private boolean useFileSizeFromMetadata = true; private int maxPartitionsPerWriter = 100; private boolean uniqueTableLocation; + private CatalogType catalogType = HIVE; + + public CatalogType getCatalogType() + { + return catalogType; + } + + @Config("iceberg.catalog.type") + public IcebergConfig setCatalogType(CatalogType catalogType) + { + this.catalogType = catalogType; + return this; + } @NotNull public FileFormat getFileFormat() diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index df4767cb32ccf..8c1cdb2cefdd9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -20,27 +20,9 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import io.airlift.json.JsonCodec; -import io.airlift.log.Logger; import io.airlift.slice.Slice; -import io.trino.plugin.base.CatalogName; import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable; -import io.trino.plugin.hive.HdfsEnvironment; -import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; -import io.trino.plugin.hive.HiveMetadata; -import io.trino.plugin.hive.HiveSchemaProperties; -import io.trino.plugin.hive.HiveViewNotSupportedException; import io.trino.plugin.hive.HiveWrittenPartitions; -import io.trino.plugin.hive.TableAlreadyExistsException; -import io.trino.plugin.hive.ViewAlreadyExistsException; -import io.trino.plugin.hive.ViewReaderUtil; -import io.trino.plugin.hive.authentication.HiveIdentity; -import io.trino.plugin.hive.metastore.Column; -import io.trino.plugin.hive.metastore.Database; -import io.trino.plugin.hive.metastore.HiveMetastore; -import io.trino.plugin.hive.metastore.HivePrincipal; -import io.trino.plugin.hive.metastore.PrincipalPrivileges; -import io.trino.plugin.hive.metastore.Table; -import io.trino.plugin.hive.util.HiveUtil; import io.trino.spi.TrinoException; import io.trino.spi.connector.CatalogSchemaName; import io.trino.spi.connector.CatalogSchemaTableName; @@ -61,13 +43,10 @@ import io.trino.spi.connector.ConstraintApplicationResult; import io.trino.spi.connector.DiscretePredicates; import io.trino.spi.connector.MaterializedViewFreshness; -import io.trino.spi.connector.MaterializedViewNotFoundException; -import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; import io.trino.spi.connector.SystemTable; import io.trino.spi.connector.TableNotFoundException; -import io.trino.spi.connector.ViewNotFoundException; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.NullableValue; import io.trino.spi.predicate.TupleDomain; @@ -75,218 +54,112 @@ import io.trino.spi.statistics.ComputedStatistics; import io.trino.spi.statistics.TableStatistics; import io.trino.spi.type.TypeManager; -import org.apache.hadoop.fs.Path; import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFiles; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionField; -import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; -import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Snapshot; -import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.Transaction; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.types.Types.NestedField; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiPredicate; import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Stream; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; -import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; -import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE; -import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; -import static io.trino.plugin.hive.HiveType.HIVE_STRING; -import static io.trino.plugin.hive.ViewReaderUtil.PRESTO_VIEW_FLAG; -import static io.trino.plugin.hive.ViewReaderUtil.encodeViewData; -import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView; -import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; -import static io.trino.plugin.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT; -import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema; -import static io.trino.plugin.hive.util.HiveWriteUtils.getTableDefaultLocation; import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; import static io.trino.plugin.iceberg.IcebergColumnHandle.primitiveIcebergColumnHandle; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; -import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.decodeMaterializedViewData; -import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.encodeMaterializedViewData; -import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.fromConnectorMaterializedViewDefinition; -import static io.trino.plugin.iceberg.IcebergSchemaProperties.getSchemaLocation; import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY; -import static io.trino.plugin.iceberg.IcebergTableProperties.getFileFormat; -import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning; -import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation; import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue; import static io.trino.plugin.iceberg.IcebergUtil.getColumns; import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat; -import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; import static io.trino.plugin.iceberg.IcebergUtil.getTableComment; -import static io.trino.plugin.iceberg.IcebergUtil.isIcebergTable; -import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; -import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields; +import static io.trino.plugin.iceberg.IcebergUtil.newCreateTableTransaction; import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields; import static io.trino.plugin.iceberg.TableType.DATA; +import static io.trino.plugin.iceberg.TrinoHiveCatalog.DEPENDS_ON_TABLES; import static io.trino.plugin.iceberg.TypeConverter.toIcebergType; import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; -import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; -import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; -import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY; -import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; import static io.trino.spi.type.BigintType.BIGINT; import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; -import static java.util.UUID.randomUUID; import static java.util.function.Function.identity; import static java.util.stream.Collectors.joining; -import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW; -import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; -import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; -import static org.apache.iceberg.TableMetadata.newTableMetadata; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; -import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH; -import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; -import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION; -import static org.apache.iceberg.Transactions.createTableTransaction; public class IcebergMetadata implements ConnectorMetadata { - private static final Logger log = Logger.get(IcebergMetadata.class); - private static final String ICEBERG_MATERIALIZED_VIEW_COMMENT = "Presto Materialized View"; - public static final String DEPENDS_ON_TABLES = "dependsOnTables"; - - // Be compatible with views defined by the Hive connector, which can be useful under certain conditions. - private static final String TRINO_CREATED_BY = HiveMetadata.TRINO_CREATED_BY; - private static final String TRINO_CREATED_BY_VALUE = "Trino Iceberg connector"; - private static final String PRESTO_VIEW_COMMENT = HiveMetadata.PRESTO_VIEW_COMMENT; - private static final String PRESTO_VERSION_NAME = HiveMetadata.PRESTO_VERSION_NAME; - private static final String PRESTO_QUERY_ID_NAME = HiveMetadata.PRESTO_QUERY_ID_NAME; - private static final String PRESTO_VIEW_EXPANDED_TEXT_MARKER = HiveMetadata.PRESTO_VIEW_EXPANDED_TEXT_MARKER; - - private final CatalogName catalogName; - private final HiveMetastore metastore; - private final HdfsEnvironment hdfsEnvironment; private final TypeManager typeManager; private final JsonCodec commitTaskCodec; - private final HiveTableOperationsProvider tableOperationsProvider; - private final String trinoVersion; - private final boolean useUniqueTableLocation; + private final TrinoCatalog catalog; private final Map snapshotIds = new ConcurrentHashMap<>(); - private final Map tableMetadataCache = new ConcurrentHashMap<>(); - private final ViewReaderUtil.PrestoViewReader viewReader = new ViewReaderUtil.PrestoViewReader(); private Transaction transaction; public IcebergMetadata( - CatalogName catalogName, - HiveMetastore metastore, - HdfsEnvironment hdfsEnvironment, TypeManager typeManager, JsonCodec commitTaskCodec, - HiveTableOperationsProvider tableOperationsProvider, - String trinoVersion, - boolean useUniqueTableLocation) + TrinoCatalog catalog) { - this.catalogName = requireNonNull(catalogName, "catalogName is null"); - this.metastore = requireNonNull(metastore, "metastore is null"); - this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); - this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); - this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null"); - this.useUniqueTableLocation = useUniqueTableLocation; + this.catalog = requireNonNull(catalog, "catalog is null"); } @Override public List listSchemaNames(ConnectorSession session) { - return metastore.getAllDatabases().stream() - .filter(schemaName -> !HiveUtil.isHiveSystemSchema(schemaName)) - .collect(toImmutableList()); - } - - private List listSchemas(ConnectorSession session, Optional schemaName) - { - if (schemaName.isPresent()) { - if (isHiveSystemSchema(schemaName.get())) { - return ImmutableList.of(); - } - return ImmutableList.of(schemaName.get()); - } - return listSchemaNames(session); + return catalog.listNamespaces(session); } @Override public Map getSchemaProperties(ConnectorSession session, CatalogSchemaName schemaName) { - Optional db = metastore.getDatabase(schemaName.getSchemaName()); - if (db.isPresent()) { - return HiveSchemaProperties.fromDatabase(db.get()); - } - - throw new SchemaNotFoundException(schemaName.getSchemaName()); + return catalog.loadNamespaceMetadata(session, schemaName.getSchemaName()); } @Override public Optional getSchemaOwner(ConnectorSession session, CatalogSchemaName schemaName) { - Optional database = metastore.getDatabase(schemaName.getSchemaName()); - if (database.isPresent()) { - return database.flatMap(db -> Optional.of(new TrinoPrincipal(db.getOwnerType(), db.getOwnerName()))); - } - - throw new SchemaNotFoundException(schemaName.getSchemaName()); + return catalog.getNamespacePrincipal(session, schemaName.getSchemaName()); } @Override public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { IcebergTableName name = IcebergTableName.from(tableName.getTableName()); - verify(name.getTableType() == DATA, "Wrong table type: " + name.getTableType()); + verify(name.getTableType() == DATA, "Wrong table type: " + name.getTableNameWithType()); - Optional hiveTable = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), name.getTableName()); - if (hiveTable.isEmpty()) { - return null; + Table table; + try { + table = catalog.loadTable(session, new SchemaTableName(tableName.getSchemaName(), name.getTableName())); } - if (isMaterializedView(hiveTable.get())) { + catch (TableNotFoundException e) { return null; } - if (!isIcebergTable(hiveTable.get())) { - throw new UnknownTableTypeException(tableName); - } - - org.apache.iceberg.Table table = getIcebergTable(session, hiveTable.get().getSchemaTableName()); Optional snapshotId = getSnapshotId(table, name.getSnapshotId()); return new IcebergTableHandle( @@ -312,12 +185,12 @@ private Optional getRawSystemTable(ConnectorSession session, Schema return Optional.empty(); } - org.apache.iceberg.Table table; + // load the base table for the system table + Table table; try { - table = getIcebergTable(session, new SchemaTableName(tableName.getSchemaName(), name.getTableName())); + table = catalog.loadTable(session, new SchemaTableName(tableName.getSchemaName(), name.getTableName())); } - catch (TableNotFoundException | UnknownTableTypeException e) { - // not found or not an Iceberg table + catch (TableNotFoundException e) { return Optional.empty(); } @@ -357,7 +230,7 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con return new ConnectorTableProperties(TupleDomain.none(), Optional.empty(), Optional.empty(), Optional.empty(), ImmutableList.of()); } - org.apache.iceberg.Table icebergTable = getIcebergTable(session, table.getSchemaTableName()); + Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); // Extract identity partition fields that are present in all partition specs, for creating the discrete predicates. Set partitionSourceIds = identityPartitionColumnsInAllSpecs(icebergTable); @@ -437,29 +310,14 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect @Override public List listTables(ConnectorSession session, Optional schemaName) { - ImmutableList.Builder tablesListBuilder = ImmutableList.builder(); - listSchemas(session, schemaName) - .stream() - .flatMap(schema -> Stream.concat( - // Get tables with parameter table_type set to "ICEBERG" or "iceberg". This is required because - // Trino uses lowercase value whereas Spark and Flink use uppercase. - // TODO: use one metastore call to pass both the filters: https://github.com/trinodb/trino/issues/7710 - metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toLowerCase(Locale.ENGLISH)).stream() - .map(table -> new SchemaTableName(schema, table)), - metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)).stream() - .map(table -> new SchemaTableName(schema, table))) - .distinct()) // distinct() to avoid duplicates for case-insensitive HMS backends - .forEach(tablesListBuilder::add); - - tablesListBuilder.addAll(listMaterializedViews(session, schemaName)); - return tablesListBuilder.build(); + return catalog.listTables(session, schemaName); } @Override public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) { IcebergTableHandle table = (IcebergTableHandle) tableHandle; - org.apache.iceberg.Table icebergTable = getIcebergTable(session, table.getSchemaTableName()); + Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); return getColumns(icebergTable.schema(), typeManager).stream() .collect(toImmutableMap(IcebergColumnHandle::getName, identity())); } @@ -500,47 +358,25 @@ public Map> listTableColumns(ConnectorSess @Override public void createSchema(ConnectorSession session, String schemaName, Map properties, TrinoPrincipal owner) { - Optional location = getSchemaLocation(properties).map(uri -> { - try { - hdfsEnvironment.getFileSystem(new HdfsContext(session), new Path(uri)); - } - catch (IOException | IllegalArgumentException e) { - throw new TrinoException(INVALID_SCHEMA_PROPERTY, "Invalid location URI: " + uri, e); - } - return uri; - }); - - Database database = Database.builder() - .setDatabaseName(schemaName) - .setLocation(location) - .setOwnerType(owner.getType()) - .setOwnerName(owner.getName()) - .build(); - - metastore.createDatabase(new HiveIdentity(session), database); + catalog.createNamespace(session, schemaName, properties, owner); } @Override public void dropSchema(ConnectorSession session, String schemaName) { - // basic sanity check to provide a better error message - if (!listTables(session, Optional.of(schemaName)).isEmpty() || - !listViews(session, Optional.of(schemaName)).isEmpty()) { - throw new TrinoException(SCHEMA_NOT_EMPTY, "Schema not empty: " + schemaName); - } - metastore.dropDatabase(new HiveIdentity(session), schemaName); + catalog.dropNamespace(session, schemaName); } @Override public void renameSchema(ConnectorSession session, String source, String target) { - metastore.renameDatabase(new HiveIdentity(session), source, target); + catalog.renameNamespace(session, source, target); } @Override public void setSchemaAuthorization(ConnectorSession session, String source, TrinoPrincipal principal) { - metastore.setDatabaseOwner(new HiveIdentity(session), source, HivePrincipal.from(principal)); + catalog.setNamespacePrincipal(session, source, principal); } @Override @@ -553,76 +389,22 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe @Override public void setTableComment(ConnectorSession session, ConnectorTableHandle tableHandle, Optional comment) { - IcebergTableHandle handle = (IcebergTableHandle) tableHandle; - metastore.commentTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName(), comment); - org.apache.iceberg.Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); - if (comment.isEmpty()) { - icebergTable.updateProperties().remove(TABLE_COMMENT).commit(); - } - else { - icebergTable.updateProperties().set(TABLE_COMMENT, comment.get()).commit(); - } + catalog.updateTableComment(session, ((IcebergTableHandle) tableHandle).getSchemaTableName(), comment); } @Override public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout) { - SchemaTableName schemaTableName = tableMetadata.getTable(); - String schemaName = schemaTableName.getSchemaName(); - String tableName = schemaTableName.getTableName(); - - Schema schema = toIcebergSchema(tableMetadata.getColumns()); - - PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties())); - - Database database = metastore.getDatabase(schemaName) - .orElseThrow(() -> new SchemaNotFoundException(schemaName)); - - HdfsContext hdfsContext = new HdfsContext(session); - HiveIdentity identity = new HiveIdentity(session); - String targetPath = getTableLocation(tableMetadata.getProperties()); - if (targetPath == null) { - String tableNameForLocation = tableName; - if (useUniqueTableLocation) { - tableNameForLocation += "-" + randomUUID().toString().replace("-", ""); - } - targetPath = getTableDefaultLocation(database, hdfsContext, hdfsEnvironment, schemaName, tableNameForLocation).toString(); - } - - TableOperations operations = tableOperationsProvider.createTableOperations( - hdfsContext, - session.getQueryId(), - identity, - schemaName, - tableName, - Optional.of(session.getUser()), - Optional.of(targetPath)); - - if (operations.current() != null) { - throw new TableAlreadyExistsException(schemaTableName); - } - - ImmutableMap.Builder propertiesBuilder = ImmutableMap.builderWithExpectedSize(2); - FileFormat fileFormat = getFileFormat(tableMetadata.getProperties()); - propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString()); - if (tableMetadata.getComment().isPresent()) { - propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get()); - } - - Map properties = propertiesBuilder.build(); - TableMetadata metadata = newTableMetadata(schema, partitionSpec, targetPath, properties); - - transaction = createTableTransaction(tableName, operations, metadata); - + transaction = newCreateTableTransaction(catalog, tableMetadata, session); return new IcebergWritableTableHandle( - schemaName, - tableName, - SchemaParser.toJson(metadata.schema()), - PartitionSpecParser.toJson(metadata.spec()), - getColumns(metadata.schema(), typeManager), - targetPath, - fileFormat, - properties); + tableMetadata.getTable().getSchemaName(), + tableMetadata.getTable().getTableName(), + SchemaParser.toJson(transaction.table().schema()), + PartitionSpecParser.toJson(transaction.table().spec()), + getColumns(transaction.table().schema(), typeManager), + transaction.table().location(), + getFileFormat(transaction.table()), + transaction.table().properties()); } @Override @@ -635,7 +417,7 @@ public Optional finishCreateTable(ConnectorSession sess public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle) { IcebergTableHandle table = (IcebergTableHandle) tableHandle; - org.apache.iceberg.Table icebergTable = getIcebergTable(session, table.getSchemaTableName()); + Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); transaction = icebergTable.newTransaction(); @@ -654,7 +436,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto public Optional finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments, Collection computedStatistics) { IcebergWritableTableHandle table = (IcebergWritableTableHandle) insertHandle; - org.apache.iceberg.Table icebergTable = transaction.table(); + Table icebergTable = transaction.table(); List commitTasks = fragments.stream() .map(slice -> commitTaskCodec.fromJson(slice.getBytes())) @@ -706,47 +488,35 @@ public Optional getInfo(ConnectorTableHandle tableHandle) @Override public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) { - IcebergTableHandle handle = (IcebergTableHandle) tableHandle; - // TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861 - org.apache.iceberg.Table table = getIcebergTable(session, handle.getSchemaTableName()); - if (table.properties().containsKey(OBJECT_STORE_PATH) || - table.properties().containsKey(WRITE_NEW_DATA_LOCATION) || - table.properties().containsKey(WRITE_METADATA_LOCATION)) { - throw new TrinoException(NOT_SUPPORTED, "Table " + handle.getSchemaTableName() + " contains Iceberg path override properties and cannot be dropped from Trino"); - } - metastore.dropTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName(), true); + catalog.dropTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName(), true); } @Override public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTable) { - IcebergTableHandle handle = (IcebergTableHandle) tableHandle; - metastore.renameTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName(), newTable.getSchemaName(), newTable.getTableName()); + catalog.renameTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName(), newTable); } @Override public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column) { - IcebergTableHandle handle = (IcebergTableHandle) tableHandle; - org.apache.iceberg.Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); + Table icebergTable = catalog.loadTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName()); icebergTable.updateSchema().addColumn(column.getName(), toIcebergType(column.getType())).commit(); } @Override public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column) { - IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle; IcebergColumnHandle handle = (IcebergColumnHandle) column; - org.apache.iceberg.Table icebergTable = getIcebergTable(session, icebergTableHandle.getSchemaTableName()); + Table icebergTable = catalog.loadTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName()); icebergTable.updateSchema().deleteColumn(handle.getName()).commit(); } @Override public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle source, String target) { - IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle; IcebergColumnHandle columnHandle = (IcebergColumnHandle) source; - org.apache.iceberg.Table icebergTable = getIcebergTable(session, icebergTableHandle.getSchemaTableName()); + Table icebergTable = catalog.loadTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName()); icebergTable.updateSchema().renameColumn(columnHandle.getName(), target).commit(); } @@ -755,8 +525,7 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan */ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName table) { - // getIcebergTable throws TableNotFoundException when table not found - org.apache.iceberg.Table icebergTable = getIcebergTable(session, table); + Table icebergTable = catalog.loadTable(session, table); List columns = getColumnMetadatas(icebergTable); @@ -769,7 +538,7 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema return new ConnectorTableMetadata(table, columns, properties.build(), getTableComment(icebergTable)); } - private List getColumnMetadatas(org.apache.iceberg.Table table) + private List getColumnMetadatas(Table table) { return table.schema().columns().stream() .map(column -> { @@ -783,25 +552,6 @@ private List getColumnMetadatas(org.apache.iceberg.Table table) .collect(toImmutableList()); } - private static Schema toIcebergSchema(List columns) - { - List icebergColumns = new ArrayList<>(); - for (ColumnMetadata column : columns) { - if (!column.isHidden()) { - int index = icebergColumns.size(); - Type type = toIcebergType(column.getType()); - NestedField field = column.isNullable() - ? NestedField.optional(index, column.getName(), type, column.getComment()) - : NestedField.required(index, column.getName(), type, column.getComment()); - icebergColumns.add(field); - } - } - Type icebergSchema = Types.StructType.of(icebergColumns); - AtomicInteger nextFieldId = new AtomicInteger(1); - icebergSchema = TypeUtil.assignFreshIds(icebergSchema, nextFieldId::getAndIncrement); - return new Schema(icebergSchema.asStructType().fields()); - } - @Override public Optional applyDelete(ConnectorSession session, ConnectorTableHandle handle) { @@ -817,139 +567,43 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable @Override public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, boolean replace) { - HiveIdentity identity = new HiveIdentity(session); - Map properties = ImmutableMap.builder() - .put(PRESTO_VIEW_FLAG, "true") - .put(TRINO_CREATED_BY, TRINO_CREATED_BY_VALUE) - .put(PRESTO_VERSION_NAME, trinoVersion) - .put(PRESTO_QUERY_ID_NAME, session.getQueryId()) - .put(TABLE_COMMENT, PRESTO_VIEW_COMMENT) - .build(); - - Table.Builder tableBuilder = Table.builder() - .setDatabaseName(viewName.getSchemaName()) - .setTableName(viewName.getTableName()) - .setOwner(session.getUser()) - .setTableType(org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW.name()) - .setDataColumns(ImmutableList.of(new Column("dummy", HIVE_STRING, Optional.empty()))) - .setPartitionColumns(ImmutableList.of()) - .setParameters(properties) - .setViewOriginalText(Optional.of(encodeViewData(definition))) - .setViewExpandedText(Optional.of(PRESTO_VIEW_EXPANDED_TEXT_MARKER)); - - tableBuilder.getStorageBuilder() - .setStorageFormat(VIEW_STORAGE_FORMAT) - .setLocation(""); - Table table = tableBuilder.build(); - PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(session.getUser()); - - Optional
existing = metastore.getTable(identity, viewName.getSchemaName(), viewName.getTableName()); - if (existing.isPresent()) { - if (!replace || !isPrestoView(existing.get())) { - throw new ViewAlreadyExistsException(viewName); - } - - metastore.replaceTable(identity, viewName.getSchemaName(), viewName.getTableName(), table, principalPrivileges); - return; - } - - try { - metastore.createTable(identity, table, principalPrivileges); - } - catch (TableAlreadyExistsException e) { - throw new ViewAlreadyExistsException(e.getTableName()); - } + catalog.createView(session, viewName, definition, replace); } @Override public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target) { - // Not checking if source view exists as this is already done in RenameViewTask - metastore.renameTable(new HiveIdentity(session), source.getSchemaName(), source.getTableName(), target.getSchemaName(), target.getTableName()); + catalog.renameView(session, source, target); } @Override public void setViewAuthorization(ConnectorSession session, SchemaTableName viewName, TrinoPrincipal principal) { - // Not checking if view exists as this is already done in SetViewAuthorizationTask - setTableAuthorization(session, viewName, principal); + catalog.setViewPrincipal(session, viewName, principal); } @Override public void dropView(ConnectorSession session, SchemaTableName viewName) { - if (getView(session, viewName).isEmpty()) { - throw new ViewNotFoundException(viewName); - } - - try { - metastore.dropTable(new HiveIdentity(session), viewName.getSchemaName(), viewName.getTableName(), true); - } - catch (TableNotFoundException e) { - throw new ViewNotFoundException(e.getTableName()); - } + catalog.dropView(session, viewName); } @Override public List listViews(ConnectorSession session, Optional schemaName) { - // Filter on PRESTO_VIEW_COMMENT to distinguish from materialized views - return listSchemas(session, schemaName).stream() - .flatMap(schema -> - metastore.getTablesWithParameter(schema, TABLE_COMMENT, PRESTO_VIEW_COMMENT).stream() - .map(table -> new SchemaTableName(schema, table))) - .collect(toImmutableList()); + return catalog.listViews(session, schemaName); } @Override public Map getViews(ConnectorSession session, Optional schemaName) { - ImmutableMap.Builder views = ImmutableMap.builder(); - for (SchemaTableName name : listViews(session, schemaName)) { - try { - getView(session, name).ifPresent(view -> views.put(name, view)); - } - catch (TrinoException e) { - if (e.getErrorCode().equals(TABLE_NOT_FOUND.toErrorCode())) { - // Ignore view that was dropped during query execution (race condition) - } - else { - throw e; - } - } - } - return views.build(); + return catalog.getViews(session, schemaName); } @Override public Optional getView(ConnectorSession session, SchemaTableName viewName) { - if (isHiveSystemSchema(viewName.getSchemaName())) { - return Optional.empty(); - } - return metastore.getTable(new HiveIdentity(session), viewName.getSchemaName(), viewName.getTableName()) - .filter(table -> HiveMetadata.PRESTO_VIEW_COMMENT.equals(table.getParameters().get(TABLE_COMMENT))) // filter out materialized views - .filter(ViewReaderUtil::canDecodeView) - .map(view -> { - if (!isPrestoView(view)) { - throw new HiveViewNotSupportedException(viewName); - } - - ConnectorViewDefinition definition = viewReader - .decodeViewData(view.getViewOriginalText().get(), view, catalogName); - // use owner from table metadata if it exists - if (view.getOwner() != null && !definition.isRunAsInvoker()) { - definition = new ConnectorViewDefinition( - definition.getOriginalSql(), - definition.getCatalog(), - definition.getSchema(), - definition.getColumns(), - definition.getComment(), - Optional.of(view.getOwner()), - false); - } - return definition; - }); + return catalog.getView(session, viewName); } @Override @@ -957,7 +611,7 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle { IcebergTableHandle handle = (IcebergTableHandle) tableHandle; - org.apache.iceberg.Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); + Table icebergTable = catalog.loadTable(session, handle.getSchemaTableName()); icebergTable.newDelete() .deleteFromRowFilter(toIcebergExpression(handle.getEnforcedPredicate())) @@ -982,7 +636,7 @@ public void rollback() public Optional> applyFilter(ConnectorSession session, ConnectorTableHandle handle, Constraint constraint) { IcebergTableHandle table = (IcebergTableHandle) handle; - org.apache.iceberg.Table icebergTable = getIcebergTable(session, table.getSchemaTableName()); + Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); Set partitionSourceIds = identityPartitionColumnsInAllSpecs(icebergTable); BiPredicate isIdentityPartition = (column, domain) -> partitionSourceIds.contains(column.getId()); @@ -1015,7 +669,7 @@ public Optional> applyFilter(C false)); } - private static Set identityPartitionColumnsInAllSpecs(org.apache.iceberg.Table table) + private static Set identityPartitionColumnsInAllSpecs(Table table) { // Extract identity partition column source ids common to ALL specs return table.spec().fields().stream() @@ -1029,11 +683,17 @@ private static Set identityPartitionColumnsInAllSpecs(org.apache.iceber public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint constraint) { IcebergTableHandle handle = (IcebergTableHandle) tableHandle; - org.apache.iceberg.Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); + Table icebergTable = catalog.loadTable(session, handle.getSchemaTableName()); return TableStatisticsMaker.getTableStatistics(typeManager, constraint, handle, icebergTable); } - private Optional getSnapshotId(org.apache.iceberg.Table table, Optional snapshotId) + @Override + public void setTableAuthorization(ConnectorSession session, SchemaTableName tableName, TrinoPrincipal principal) + { + catalog.setTablePrincipal(session, tableName, principal); + } + + private Optional getSnapshotId(Table table, Optional snapshotId) { // table.name() is an encoded version of SchemaTableName return snapshotId @@ -1044,103 +704,21 @@ private Optional getSnapshotId(org.apache.iceberg.Table table, Optional Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId)); } - org.apache.iceberg.Table getIcebergTable(ConnectorSession session, SchemaTableName schemaTableName) + Table getIcebergTable(ConnectorSession session, SchemaTableName schemaTableName) { - TableMetadata metadata = tableMetadataCache.computeIfAbsent( - schemaTableName, - ignore -> ((BaseTable) loadIcebergTable(tableOperationsProvider, session, schemaTableName)).operations().current()); - - return getIcebergTableWithMetadata(tableOperationsProvider, session, schemaTableName, metadata); + return catalog.loadTable(session, schemaTableName); } @Override public void createMaterializedView(ConnectorSession session, SchemaTableName viewName, ConnectorMaterializedViewDefinition definition, boolean replace, boolean ignoreExisting) { - HiveIdentity identity = new HiveIdentity(session); - Optional
existing = metastore.getTable(identity, viewName.getSchemaName(), viewName.getTableName()); - - // It's a create command where the materialized view already exists and 'if not exists' clause is not specified - if (!replace && existing.isPresent()) { - if (ignoreExisting) { - return; - } - throw new TrinoException(ALREADY_EXISTS, "Materialized view already exists: " + viewName); - } - - // Generate a storage table name and create a storage table. The properties in the definition are table properties for the - // storage table as indicated in the materialized view definition. - String storageTableName = "st_" + randomUUID().toString().replace("-", ""); - Map storageTableProperties = new HashMap<>(definition.getProperties()); - storageTableProperties.putIfAbsent(FILE_FORMAT_PROPERTY, DEFAULT_FILE_FORMAT_DEFAULT); - - SchemaTableName storageTable = new SchemaTableName(viewName.getSchemaName(), storageTableName); - List columns = definition.getColumns().stream() - .map(column -> new ColumnMetadata(column.getName(), typeManager.getType(column.getType()))) - .collect(toImmutableList()); - - ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, storageTableProperties, Optional.empty()); - Optional layout = getNewTableLayout(session, tableMetadata); - finishCreateTable(session, beginCreateTable(session, tableMetadata, layout), ImmutableList.of(), ImmutableList.of()); - - // Create a view indicating the storage table - Map viewProperties = ImmutableMap.builder() - .put(PRESTO_QUERY_ID_NAME, session.getQueryId()) - .put(STORAGE_TABLE, storageTableName) - .put(PRESTO_VIEW_FLAG, "true") - .put(TRINO_CREATED_BY, TRINO_CREATED_BY_VALUE) - .put(TABLE_COMMENT, ICEBERG_MATERIALIZED_VIEW_COMMENT) - .build(); - - Column dummyColumn = new Column("dummy", HIVE_STRING, Optional.empty()); - - String schemaName = viewName.getSchemaName(); - String tableName = viewName.getTableName(); - Table.Builder tableBuilder = Table.builder() - .setDatabaseName(schemaName) - .setTableName(tableName) - .setOwner(session.getUser()) - .setTableType(VIRTUAL_VIEW.name()) - .setDataColumns(ImmutableList.of(dummyColumn)) - .setPartitionColumns(ImmutableList.of()) - .setParameters(viewProperties) - .withStorage(storage -> storage.setStorageFormat(VIEW_STORAGE_FORMAT)) - .withStorage(storage -> storage.setLocation("")) - .setViewOriginalText(Optional.of( - encodeMaterializedViewData(fromConnectorMaterializedViewDefinition(definition)))) - .setViewExpandedText(Optional.of("/* Presto Materialized View */")); - Table table = tableBuilder.build(); - PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(session.getUser()); - if (existing.isPresent() && replace) { - // drop the current storage table - String oldStorageTable = existing.get().getParameters().get(STORAGE_TABLE); - if (oldStorageTable != null) { - metastore.dropTable(identity, viewName.getSchemaName(), oldStorageTable, true); - } - // Replace the existing view definition - metastore.replaceTable(identity, viewName.getSchemaName(), viewName.getTableName(), table, principalPrivileges); - return; - } - // create the view definition - metastore.createTable(identity, table, principalPrivileges); + catalog.createMaterializedView(session, viewName, definition, replace, ignoreExisting); } @Override public void dropMaterializedView(ConnectorSession session, SchemaTableName viewName) { - final HiveIdentity identity = new HiveIdentity(session); - Table view = metastore.getTable(identity, viewName.getSchemaName(), viewName.getTableName()) - .orElseThrow(() -> new MaterializedViewNotFoundException(viewName)); - - String storageTableName = view.getParameters().get(STORAGE_TABLE); - if (storageTableName != null) { - try { - metastore.dropTable(identity, viewName.getSchemaName(), storageTableName, true); - } - catch (TrinoException e) { - log.warn(e, "Failed to drop storage table '%s' for materialized view '%s'", storageTableName, viewName); - } - } - metastore.dropTable(identity, viewName.getSchemaName(), viewName.getTableName(), true); + catalog.dropMaterializedView(session, viewName); } @Override @@ -1153,7 +731,7 @@ public boolean delegateMaterializedViewRefreshToConnector(ConnectorSession sessi public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List sourceTableHandles) { IcebergTableHandle table = (IcebergTableHandle) tableHandle; - org.apache.iceberg.Table icebergTable = getIcebergTable(session, table.getSchemaTableName()); + Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); transaction = icebergTable.newTransaction(); return new IcebergWritableTableHandle( @@ -1181,7 +759,7 @@ public Optional finishRefreshMaterializedView( IcebergWritableTableHandle table = (IcebergWritableTableHandle) insertHandle; - org.apache.iceberg.Table icebergTable = transaction.table(); + Table icebergTable = transaction.table(); List commitTasks = fragments.stream() .map(slice -> commitTaskCodec.fromJson(slice.getBytes())) .collect(toImmutableList()); @@ -1224,61 +802,22 @@ public Optional finishRefreshMaterializedView( .collect(toImmutableList()))); } - private boolean isMaterializedView(Table table) - { - return table.getTableType().equals(VIRTUAL_VIEW.name()) - && "true".equals(table.getParameters().get(PRESTO_VIEW_FLAG)) - && table.getParameters().containsKey(STORAGE_TABLE); - } - @Override public List listMaterializedViews(ConnectorSession session, Optional schemaName) { - // Filter on ICEBERG_MATERIALIZED_VIEW_COMMENT is used to avoid listing hive views in case of a shared HMS and to distinguish from standard views - return listSchemas(session, schemaName).stream() - .flatMap(schema -> metastore.getTablesWithParameter(schema, TABLE_COMMENT, ICEBERG_MATERIALIZED_VIEW_COMMENT).stream() - .map(table -> new SchemaTableName(schema, table))) - .collect(toImmutableList()); + return catalog.listMaterializedViews(session, schemaName); } @Override public Optional getMaterializedView(ConnectorSession session, SchemaTableName viewName) { - Optional
tableOptional = metastore.getTable(new HiveIdentity(session), viewName.getSchemaName(), viewName.getTableName()); - if (tableOptional.isEmpty()) { - return Optional.empty(); - } - - if (!isMaterializedView(tableOptional.get())) { - return Optional.empty(); - } - - Table materializedView = tableOptional.get(); - String storageTable = materializedView.getParameters().get(STORAGE_TABLE); - checkState(storageTable != null, "Storage table missing in definition of materialized view " + viewName); - - IcebergMaterializedViewDefinition definition = decodeMaterializedViewData(materializedView.getViewOriginalText() - .orElseThrow(() -> new TrinoException(HIVE_INVALID_METADATA, "No view original text: " + viewName))); - - SchemaTableName storageTableName = new SchemaTableName(viewName.getSchemaName(), storageTable); - ConnectorTableMetadata tableMetadata = getTableMetadata(session, storageTableName); - return Optional.of(new ConnectorMaterializedViewDefinition( - definition.getOriginalSql(), - Optional.of(new CatalogSchemaTableName(catalogName.toString(), storageTableName)), - definition.getCatalog(), - definition.getSchema(), - definition.getColumns().stream() - .map(column -> new ConnectorMaterializedViewDefinition.Column(column.getName(), column.getType())) - .collect(toImmutableList()), - definition.getComment(), - materializedView.getOwner(), - ImmutableMap.copyOf(tableMetadata.getProperties()))); + return catalog.getMaterializedView(session, viewName); } public Optional getTableToken(ConnectorSession session, ConnectorTableHandle tableHandle) { IcebergTableHandle table = (IcebergTableHandle) tableHandle; - org.apache.iceberg.Table icebergTable = getIcebergTable(session, table.getSchemaTableName()); + Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); return Optional.ofNullable(icebergTable.currentSnapshot()) .map(snapshot -> new TableToken(snapshot.snapshotId())); } @@ -1333,7 +872,7 @@ private Map> getMaterializedViewToken(ConnectorSess .map(CatalogSchemaTableName::getSchemaTableName) .orElseThrow(() -> new IllegalStateException("Storage table missing in definition of materialized view " + name)); - org.apache.iceberg.Table icebergTable = getIcebergTable(session, storageTableName); + Table icebergTable = catalog.loadTable(session, storageTableName); String dependsOnTables = icebergTable.currentSnapshot().summary().getOrDefault(DEPENDS_ON_TABLES, ""); if (!dependsOnTables.isEmpty()) { Map tableToSnapshotIdMap = Splitter.on(',').withKeyValueSeparator('=').split(dependsOnTables); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java index 9648193117cf4..47d99fc2ace51 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java @@ -14,10 +14,6 @@ package io.trino.plugin.iceberg; import io.airlift.json.JsonCodec; -import io.trino.plugin.base.CatalogName; -import io.trino.plugin.hive.HdfsEnvironment; -import io.trino.plugin.hive.NodeVersion; -import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.spi.type.TypeManager; import javax.inject.Inject; @@ -26,51 +22,23 @@ public class IcebergMetadataFactory { - private final CatalogName catalogName; - private final HiveMetastore metastore; - private final HdfsEnvironment hdfsEnvironment; private final TypeManager typeManager; private final JsonCodec commitTaskCodec; - private final HiveTableOperationsProvider tableOperationsProvider; - private final String trinoVersion; - private final boolean useUniqueTableLocation; + private final TrinoCatalogFactory catalogFactory; @Inject public IcebergMetadataFactory( - CatalogName catalogName, - IcebergConfig config, - HiveMetastore metastore, - HdfsEnvironment hdfsEnvironment, - TypeManager typeManager, - JsonCodec commitTaskDataJsonCodec, - HiveTableOperationsProvider tableOperationsProvider, - NodeVersion nodeVersion) - { - this(catalogName, metastore, hdfsEnvironment, typeManager, commitTaskDataJsonCodec, tableOperationsProvider, nodeVersion, config.isUniqueTableLocation()); - } - - public IcebergMetadataFactory( - CatalogName catalogName, - HiveMetastore metastore, - HdfsEnvironment hdfsEnvironment, TypeManager typeManager, JsonCodec commitTaskCodec, - HiveTableOperationsProvider tableOperationsProvider, - NodeVersion nodeVersion, - boolean useUniqueTableLocation) + TrinoCatalogFactory catalogFactory) { - this.catalogName = requireNonNull(catalogName, "catalogName is null"); - this.metastore = requireNonNull(metastore, "metastore is null"); - this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); - this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); - this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString(); - this.useUniqueTableLocation = useUniqueTableLocation; + this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); } public IcebergMetadata create() { - return new IcebergMetadata(catalogName, metastore, hdfsEnvironment, typeManager, commitTaskCodec, tableOperationsProvider, trinoVersion, useUniqueTableLocation); + return new IcebergMetadata(typeManager, commitTaskCodec, catalogFactory.create()); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java index a6bf2c8c2c619..32a4d8ece40e4 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java @@ -62,6 +62,7 @@ public void configure(Binder binder) configBinder(binder).bindConfig(ParquetReaderConfig.class); configBinder(binder).bindConfig(ParquetWriterConfig.class); + binder.bind(TrinoCatalogFactory.class).in(Scopes.SINGLETON); binder.bind(IcebergMetadataFactory.class).in(Scopes.SINGLETON); jsonCodecBinder(binder).bindJsonCodec(CommitTaskData.class); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java index e30221b205f37..7fa3f68f9cc11 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java @@ -38,7 +38,7 @@ public class IcebergSplitManager private final IcebergTransactionManager transactionManager; @Inject - public IcebergSplitManager(IcebergTransactionManager transactionManager, HiveTableOperationsProvider tableOperationsProvider) + public IcebergSplitManager(IcebergTransactionManager transactionManager) { this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java index 2fbbf701d5917..f051d0383b50f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Optional; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.spi.session.PropertyMetadata.enumProperty; @@ -84,8 +85,8 @@ public static List getPartitioning(Map tableProperties) return partitioning == null ? ImmutableList.of() : ImmutableList.copyOf(partitioning); } - public static String getTableLocation(Map tableProperties) + public static Optional getTableLocation(Map tableProperties) { - return (String) tableProperties.get(LOCATION_PROPERTY); + return Optional.ofNullable((String) tableProperties.get(LOCATION_PROPERTY)); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index 51be25c5be4c6..ffa209f44c15d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -19,7 +19,9 @@ import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; import io.trino.plugin.hive.authentication.HiveIdentity; import io.trino.spi.TrinoException; +import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.type.DecimalType; import io.trino.spi.type.Decimals; @@ -39,13 +41,17 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.types.Type.PrimitiveType; -import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -53,6 +59,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import java.util.stream.Stream; @@ -63,6 +70,10 @@ import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID; +import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning; +import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation; +import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields; +import static io.trino.plugin.iceberg.TypeConverter.toIcebergType; import static io.trino.plugin.iceberg.util.Timestamps.timestampTzFromMicros; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; @@ -174,13 +185,13 @@ public static Map primitiveFieldTypes(Schema schema) .collect(toImmutableMap(Entry::getKey, Entry::getValue)); } - private static Stream> primitiveFieldTypes(List nestedFields) + private static Stream> primitiveFieldTypes(List nestedFields) { return nestedFields.stream() .flatMap(IcebergUtil::primitiveFieldTypes); } - private static Stream> primitiveFieldTypes(Types.NestedField nestedField) + private static Stream> primitiveFieldTypes(NestedField nestedField) { org.apache.iceberg.types.Type fieldType = nestedField.type(); if (fieldType.isPrimitiveType()) { @@ -332,4 +343,39 @@ public static LocationProvider getLocationProvider(SchemaTableName schemaTableNa } return locationsFor(tableLocation, storageProperties); } + + public static Schema toIcebergSchema(List columns) + { + List icebergColumns = new ArrayList<>(); + for (ColumnMetadata column : columns) { + if (!column.isHidden()) { + int index = icebergColumns.size(); + org.apache.iceberg.types.Type type = toIcebergType(column.getType()); + NestedField field = NestedField.of(index, column.isNullable(), column.getName(), type, column.getComment()); + icebergColumns.add(field); + } + } + org.apache.iceberg.types.Type icebergSchema = StructType.of(icebergColumns); + AtomicInteger nextFieldId = new AtomicInteger(1); + icebergSchema = TypeUtil.assignFreshIds(icebergSchema, nextFieldId::getAndIncrement); + return new Schema(icebergSchema.asStructType().fields()); + } + + public static Transaction newCreateTableTransaction(TrinoCatalog catalog, ConnectorTableMetadata tableMetadata, ConnectorSession session) + { + SchemaTableName schemaTableName = tableMetadata.getTable(); + Schema schema = toIcebergSchema(tableMetadata.getColumns()); + PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties())); + String targetPath = getTableLocation(tableMetadata.getProperties()) + .orElse(catalog.defaultTableLocation(session, schemaTableName)); + + ImmutableMap.Builder propertiesBuilder = ImmutableMap.builderWithExpectedSize(2); + FileFormat fileFormat = IcebergTableProperties.getFileFormat(tableMetadata.getProperties()); + propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString()); + if (tableMetadata.getComment().isPresent()) { + propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get()); + } + + return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, targetPath, propertiesBuilder.build()); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RollbackToSnapshotProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RollbackToSnapshotProcedure.java index ec82214de71e9..6bd8fe306428e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RollbackToSnapshotProcedure.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RollbackToSnapshotProcedure.java @@ -24,7 +24,6 @@ import java.lang.invoke.MethodHandle; -import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; import static io.trino.spi.block.MethodHandleUtil.methodHandle; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.VarcharType.VARCHAR; @@ -41,12 +40,12 @@ public class RollbackToSnapshotProcedure String.class, Long.class); - private final HiveTableOperationsProvider tableOperationsProvider; + private final TrinoCatalog catalog; @Inject - public RollbackToSnapshotProcedure(HiveTableOperationsProvider tableOperationsProvider) + public RollbackToSnapshotProcedure(TrinoCatalogFactory catalogFactory) { - this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); + this.catalog = requireNonNull(catalogFactory, "catalogFactory is null").create(); } @Override @@ -65,7 +64,7 @@ public Procedure get() public void rollbackToSnapshot(ConnectorSession clientSession, String schema, String table, Long snapshotId) { SchemaTableName schemaTableName = new SchemaTableName(schema, table); - Table icebergTable = loadIcebergTable(tableOperationsProvider, clientSession, schemaTableName); + Table icebergTable = catalog.loadTable(clientSession, schemaTableName); icebergTable.rollback().toSnapshotId(snapshotId).commit(); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalog.java new file mode 100644 index 0000000000000..9bfe6e769c3ff --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalog.java @@ -0,0 +1,118 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.spi.connector.ConnectorMaterializedViewDefinition; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.security.TrinoPrincipal; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * An interface to allow different Iceberg catalog implementations in IcebergMetadata. + *

+ * It mimics the Iceberg catalog interface, with the following modifications: + *

    + *
  • ConnectorSession is added at the front of each method signature
  • + *
  • String is used to identify namespace instead of Iceberg Namespace, Optional.empty() is used to represent Namespace.empty(). + * This delegates the handling of multi-level namespace to each implementation
  • + *
  • Similarly, SchemaTableName is used to identify table instead of Iceberg TableIdentifier
  • + *
  • Metadata is a map of string to object instead of string to string
  • + *
  • Additional methods related to authorization are added
  • + *
  • View related methods are currently mostly the same as ones in ConnectorMetadata. + * These methods will likely be updated once Iceberg view interface is added.
  • + *
+ */ +public interface TrinoCatalog +{ + List listNamespaces(ConnectorSession session); + + boolean dropNamespace(ConnectorSession session, String namespace); + + Map loadNamespaceMetadata(ConnectorSession session, String namespace); + + Optional getNamespacePrincipal(ConnectorSession session, String namespace); + + void createNamespace(ConnectorSession session, String namespace, Map properties, TrinoPrincipal owner); + + void setNamespacePrincipal(ConnectorSession session, String namespace, TrinoPrincipal principal); + + void renameNamespace(ConnectorSession session, String source, String target); + + List listTables(ConnectorSession session, Optional namespace); + + Transaction newCreateTableTransaction( + ConnectorSession session, + SchemaTableName schemaTableName, + Schema schema, + PartitionSpec partitionSpec, + String location, + Map properties); + + boolean dropTable(ConnectorSession session, SchemaTableName schemaTableName, boolean purgeData); + + void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to); + + /** + * load an Iceberg table + * @param session Trino session + * @param schemaTableName Trino schema and table name + * @return Iceberg table loaded + * @throws TableNotFoundException if table not found + * @throws UnknownTableTypeException if table is not of Iceberg type in the metastore + */ + Table loadTable(ConnectorSession session, SchemaTableName schemaTableName); + + void updateTableComment(ConnectorSession session, SchemaTableName schemaTableName, Optional comment); + + String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName); + + void setTablePrincipal(ConnectorSession session, SchemaTableName schemaTableName, TrinoPrincipal principal); + + void createView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorViewDefinition definition, boolean replace); + + void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target); + + void setViewPrincipal(ConnectorSession session, SchemaTableName schemaViewName, TrinoPrincipal principal); + + void dropView(ConnectorSession session, SchemaTableName schemaViewName); + + List listViews(ConnectorSession session, Optional namespace); + + Map getViews(ConnectorSession session, Optional namespace); + + Optional getView(ConnectorSession session, SchemaTableName viewIdentifier); + + List listMaterializedViews(ConnectorSession session, Optional namespace); + + void createMaterializedView( + ConnectorSession session, + SchemaTableName schemaViewName, + ConnectorMaterializedViewDefinition definition, + boolean replace, + boolean ignoreExisting); + + void dropMaterializedView(ConnectorSession session, SchemaTableName schemaViewName); + + Optional getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName); +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalogFactory.java new file mode 100644 index 0000000000000..2c8042b135099 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalogFactory.java @@ -0,0 +1,84 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.plugin.base.CatalogName; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.spi.TrinoException; +import io.trino.spi.type.TypeManager; + +import javax.inject.Inject; + +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.util.Objects.requireNonNull; + +public class TrinoCatalogFactory +{ + private final CatalogName catalogName; + private final HiveMetastore metastore; + private final HdfsEnvironment hdfsEnvironment; + private final TypeManager typeManager; + private final HiveTableOperationsProvider tableOperationsProvider; + private final String trinoVersion; + private final CatalogType catalogType; + private final boolean isUniqueTableLocation; + + @Inject + public TrinoCatalogFactory( + IcebergConfig config, + CatalogName catalogName, + HiveMetastore metastore, + HdfsEnvironment hdfsEnvironment, + TypeManager typeManager, + HiveTableOperationsProvider tableOperationsProvider, + NodeVersion nodeVersion) + { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + this.metastore = requireNonNull(metastore, "metastore is null"); + this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationProvider is null"); + this.trinoVersion = requireNonNull(nodeVersion, "trinoVersion is null").toString(); + requireNonNull(config, "config is null"); + this.catalogType = config.getCatalogType(); + this.isUniqueTableLocation = config.isUniqueTableLocation(); + } + + public TrinoCatalog create() + { + switch (catalogType) { + case HIVE: + return new TrinoHiveCatalog(catalogName, metastore, hdfsEnvironment, typeManager, tableOperationsProvider, trinoVersion, isUniqueTableLocation); + case UNKNOWN: + throw new TrinoException(NOT_SUPPORTED, "Unknown Trino Iceberg catalog type"); + } + throw new TrinoException(NOT_SUPPORTED, "Unsupported Trino Iceberg catalog type " + catalogType); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java new file mode 100644 index 0000000000000..1f1f2f0d2dada --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java @@ -0,0 +1,644 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.trino.plugin.base.CatalogName; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; +import io.trino.plugin.hive.HiveMetadata; +import io.trino.plugin.hive.HiveSchemaProperties; +import io.trino.plugin.hive.HiveViewNotSupportedException; +import io.trino.plugin.hive.TableAlreadyExistsException; +import io.trino.plugin.hive.ViewAlreadyExistsException; +import io.trino.plugin.hive.ViewReaderUtil; +import io.trino.plugin.hive.authentication.HiveIdentity; +import io.trino.plugin.hive.metastore.Column; +import io.trino.plugin.hive.metastore.Database; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.HivePrincipal; +import io.trino.plugin.hive.metastore.PrincipalPrivileges; +import io.trino.plugin.hive.util.HiveUtil; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.CatalogSchemaTableName; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorMaterializedViewDefinition; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.MaterializedViewNotFoundException; +import io.trino.spi.connector.SchemaNotFoundException; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.connector.ViewNotFoundException; +import io.trino.spi.security.TrinoPrincipal; +import io.trino.spi.type.TypeManager; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; +import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE; +import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; +import static io.trino.plugin.hive.HiveType.HIVE_STRING; +import static io.trino.plugin.hive.ViewReaderUtil.PRESTO_VIEW_FLAG; +import static io.trino.plugin.hive.ViewReaderUtil.encodeViewData; +import static io.trino.plugin.hive.ViewReaderUtil.isHiveOrPrestoView; +import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView; +import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; +import static io.trino.plugin.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT; +import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema; +import static io.trino.plugin.hive.util.HiveWriteUtils.getTableDefaultLocation; +import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.decodeMaterializedViewData; +import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.encodeMaterializedViewData; +import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.fromConnectorMaterializedViewDefinition; +import static io.trino.plugin.iceberg.IcebergSchemaProperties.getSchemaLocation; +import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY; +import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY; +import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; +import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; +import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields; +import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; +import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY; +import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; +import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; +import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW; +import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; +import static org.apache.iceberg.TableMetadata.newTableMetadata; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH; +import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; +import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION; +import static org.apache.iceberg.Transactions.createTableTransaction; + +class TrinoHiveCatalog + implements TrinoCatalog +{ + private static final Logger log = Logger.get(TrinoHiveCatalog.class); + private static final String ICEBERG_MATERIALIZED_VIEW_COMMENT = "Presto Materialized View"; + public static final String DEPENDS_ON_TABLES = "dependsOnTables"; + + // Be compatible with views defined by the Hive connector, which can be useful under certain conditions. + private static final String TRINO_CREATED_BY = HiveMetadata.TRINO_CREATED_BY; + private static final String TRINO_CREATED_BY_VALUE = "Trino Iceberg connector"; + private static final String PRESTO_VIEW_COMMENT = HiveMetadata.PRESTO_VIEW_COMMENT; + private static final String PRESTO_VERSION_NAME = HiveMetadata.PRESTO_VERSION_NAME; + private static final String PRESTO_QUERY_ID_NAME = HiveMetadata.PRESTO_QUERY_ID_NAME; + private static final String PRESTO_VIEW_EXPANDED_TEXT_MARKER = HiveMetadata.PRESTO_VIEW_EXPANDED_TEXT_MARKER; + + private final CatalogName catalogName; + private final HiveMetastore metastore; + private final HdfsEnvironment hdfsEnvironment; + private final TypeManager typeManager; + private final HiveTableOperationsProvider tableOperationsProvider; + private final String trinoVersion; + private final boolean useUniqueTableLocation; + + private final Map tableMetadataCache = new ConcurrentHashMap<>(); + private final ViewReaderUtil.PrestoViewReader viewReader = new ViewReaderUtil.PrestoViewReader(); + + public TrinoHiveCatalog( + CatalogName catalogName, + HiveMetastore metastore, + HdfsEnvironment hdfsEnvironment, + TypeManager typeManager, + HiveTableOperationsProvider tableOperationsProvider, + String trinoVersion, + boolean useUniqueTableLocation) + { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + this.metastore = requireNonNull(metastore, "metastore is null"); + this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); + this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null"); + this.useUniqueTableLocation = useUniqueTableLocation; + } + + @Override + public List listNamespaces(ConnectorSession session) + { + return metastore.getAllDatabases().stream() + .filter(schemaName -> !HiveUtil.isHiveSystemSchema(schemaName)) + .collect(Collectors.toList()); + } + + @Override + public Map loadNamespaceMetadata(ConnectorSession session, String namespace) + { + Optional db = metastore.getDatabase(namespace); + if (db.isPresent()) { + return HiveSchemaProperties.fromDatabase(db.get()); + } + + throw new SchemaNotFoundException(namespace); + } + + @Override + public Optional getNamespacePrincipal(ConnectorSession session, String namespace) + { + Optional database = metastore.getDatabase(namespace); + if (database.isPresent()) { + return database.flatMap(db -> Optional.of(new TrinoPrincipal(db.getOwnerType(), db.getOwnerName()))); + } + + throw new SchemaNotFoundException(namespace); + } + + @Override + public void createNamespace(ConnectorSession session, String namespace, Map properties, TrinoPrincipal owner) + { + Optional location = getSchemaLocation(properties).map(uri -> { + try { + hdfsEnvironment.getFileSystem(new HdfsContext(session), new Path(uri)); + } + catch (IOException | IllegalArgumentException e) { + throw new TrinoException(INVALID_SCHEMA_PROPERTY, "Invalid location URI: " + uri, e); + } + return uri; + }); + + Database database = Database.builder() + .setDatabaseName(namespace) + .setLocation(location) + .setOwnerType(owner.getType()) + .setOwnerName(owner.getName()) + .build(); + + metastore.createDatabase(new HiveIdentity(session), database); + } + + @Override + public boolean dropNamespace(ConnectorSession session, String namespace) + { + // basic sanity check to provide a better error message + if (!listTables(session, Optional.of(namespace)).isEmpty() || + !listViews(session, Optional.of(namespace)).isEmpty()) { + throw new TrinoException(SCHEMA_NOT_EMPTY, "Schema not empty: " + namespace); + } + metastore.dropDatabase(new HiveIdentity(session), namespace); + return true; + } + + @Override + public void renameNamespace(ConnectorSession session, String source, String target) + { + metastore.renameDatabase(new HiveIdentity(session), source, target); + } + + @Override + public void setNamespacePrincipal(ConnectorSession session, String namespace, TrinoPrincipal principal) + { + metastore.setDatabaseOwner(new HiveIdentity(session), namespace, HivePrincipal.from(principal)); + } + + @Override + public Transaction newCreateTableTransaction(ConnectorSession session, SchemaTableName schemaTableName, + Schema schema, PartitionSpec partitionSpec, String location, Map properties) + { + TableMetadata metadata = newTableMetadata(schema, partitionSpec, location, properties); + TableOperations ops = tableOperationsProvider.createTableOperations( + new HdfsContext(session), + session.getQueryId(), + new HiveIdentity(session), + schemaTableName.getSchemaName(), + schemaTableName.getTableName(), + Optional.of(session.getUser()), + Optional.of(location)); + return createTableTransaction(schemaTableName.toString(), ops, metadata); + } + + @Override + public List listTables(ConnectorSession session, Optional namespace) + { + ImmutableList.Builder tablesListBuilder = ImmutableList.builder(); + listNamespaces(session, namespace) + .stream() + .flatMap(schema -> Stream.concat( + // Get tables with parameter table_type set to "ICEBERG" or "iceberg". This is required because + // Trino uses lowercase value whereas Spark and Flink use uppercase. + // TODO: use one metastore call to pass both the filters: https://github.com/trinodb/trino/issues/7710 + metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toLowerCase(Locale.ENGLISH)).stream() + .map(table -> new SchemaTableName(schema, table)), + metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)).stream() + .map(table -> new SchemaTableName(schema, table))) + .distinct()) // distinct() to avoid duplicates for case-insensitive HMS backends + .forEach(tablesListBuilder::add); + + tablesListBuilder.addAll(listMaterializedViews(session, namespace)); + return tablesListBuilder.build(); + } + + @Override + public boolean dropTable(ConnectorSession session, SchemaTableName schemaTableName, boolean purgeData) + { + // TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861 + Table table = loadTable(session, schemaTableName); + if (table.properties().containsKey(OBJECT_STORE_PATH) || + table.properties().containsKey(WRITE_NEW_DATA_LOCATION) || + table.properties().containsKey(WRITE_METADATA_LOCATION)) { + throw new TrinoException(NOT_SUPPORTED, "Table " + schemaTableName + " contains Iceberg path override properties and cannot be dropped from Trino"); + } + metastore.dropTable(new HiveIdentity(session), schemaTableName.getSchemaName(), schemaTableName.getTableName(), purgeData); + return true; + } + + @Override + public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) + { + metastore.renameTable(new HiveIdentity(session), from.getSchemaName(), from.getTableName(), to.getSchemaName(), to.getTableName()); + } + + @Override + public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName) + { + TableMetadata metadata = tableMetadataCache.computeIfAbsent( + schemaTableName, + ignore -> ((BaseTable) loadIcebergTable(tableOperationsProvider, session, schemaTableName)).operations().current()); + + return getIcebergTableWithMetadata(tableOperationsProvider, session, schemaTableName, metadata); + } + + @Override + public void updateTableComment(ConnectorSession session, SchemaTableName schemaTableName, Optional comment) + { + metastore.commentTable(new HiveIdentity(session), schemaTableName.getSchemaName(), schemaTableName.getTableName(), comment); + Table icebergTable = loadTable(session, schemaTableName); + if (comment.isEmpty()) { + icebergTable.updateProperties().remove(TABLE_COMMENT).commit(); + } + else { + icebergTable.updateProperties().set(TABLE_COMMENT, comment.get()).commit(); + } + } + + @Override + public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName) + { + Database database = metastore.getDatabase(schemaTableName.getSchemaName()) + .orElseThrow(() -> new SchemaNotFoundException(schemaTableName.getSchemaName())); + String tableNameForLocation = schemaTableName.getTableName(); + if (useUniqueTableLocation) { + tableNameForLocation += "-" + randomUUID().toString().replace("-", ""); + } + return getTableDefaultLocation(database, new HdfsEnvironment.HdfsContext(session), hdfsEnvironment, + schemaTableName.getSchemaName(), tableNameForLocation).toString(); + } + + @Override + public void setTablePrincipal(ConnectorSession session, SchemaTableName schemaTableName, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support setting an owner on a table"); + } + + @Override + public void createView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorViewDefinition definition, boolean replace) + { + HiveIdentity identity = new HiveIdentity(session); + Map properties = ImmutableMap.builder() + .put(PRESTO_VIEW_FLAG, "true") + .put(TRINO_CREATED_BY, TRINO_CREATED_BY_VALUE) + .put(PRESTO_VERSION_NAME, trinoVersion) + .put(PRESTO_QUERY_ID_NAME, session.getQueryId()) + .put(TABLE_COMMENT, PRESTO_VIEW_COMMENT) + .build(); + + io.trino.plugin.hive.metastore.Table.Builder tableBuilder = io.trino.plugin.hive.metastore.Table.builder() + .setDatabaseName(schemaViewName.getSchemaName()) + .setTableName(schemaViewName.getTableName()) + .setOwner(session.getUser()) + .setTableType(org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW.name()) + .setDataColumns(ImmutableList.of(new Column("dummy", HIVE_STRING, Optional.empty()))) + .setPartitionColumns(ImmutableList.of()) + .setParameters(properties) + .setViewOriginalText(Optional.of(encodeViewData(definition))) + .setViewExpandedText(Optional.of(PRESTO_VIEW_EXPANDED_TEXT_MARKER)); + + tableBuilder.getStorageBuilder() + .setStorageFormat(VIEW_STORAGE_FORMAT) + .setLocation(""); + io.trino.plugin.hive.metastore.Table table = tableBuilder.build(); + PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(session.getUser()); + + Optional existing = metastore.getTable(identity, schemaViewName.getSchemaName(), schemaViewName.getTableName()); + if (existing.isPresent()) { + if (!replace || !isPrestoView(existing.get())) { + throw new ViewAlreadyExistsException(schemaViewName); + } + + metastore.replaceTable(identity, schemaViewName.getSchemaName(), schemaViewName.getTableName(), table, principalPrivileges); + return; + } + + try { + metastore.createTable(identity, table, principalPrivileges); + } + catch (TableAlreadyExistsException e) { + throw new ViewAlreadyExistsException(e.getTableName()); + } + } + + @Override + public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + // Not checking if source view exists as this is already done in RenameViewTask + metastore.renameTable(new HiveIdentity(session), source.getSchemaName(), source.getTableName(), target.getSchemaName(), target.getTableName()); + } + + @Override + public void setViewPrincipal(ConnectorSession session, SchemaTableName schemaViewName, TrinoPrincipal principal) + { + // Not checking if view exists as this is already done in SetViewAuthorizationTask + setTablePrincipal(session, schemaViewName, principal); + } + + @Override + public void dropView(ConnectorSession session, SchemaTableName schemaViewName) + { + if (getView(session, schemaViewName).isEmpty()) { + throw new ViewNotFoundException(schemaViewName); + } + + try { + metastore.dropTable(new HiveIdentity(session), schemaViewName.getSchemaName(), schemaViewName.getTableName(), true); + } + catch (TableNotFoundException e) { + throw new ViewNotFoundException(e.getTableName()); + } + } + + @Override + public List listViews(ConnectorSession session, Optional namespace) + { + // Filter on PRESTO_VIEW_COMMENT to distinguish from materialized views + return listNamespaces(session, namespace).stream() + .flatMap(schema -> + metastore.getTablesWithParameter(schema, TABLE_COMMENT, PRESTO_VIEW_COMMENT).stream() + .map(table -> new SchemaTableName(schema, table))) + .collect(toImmutableList()); + } + + @Override + public Map getViews(ConnectorSession session, Optional namespace) + { + ImmutableMap.Builder views = ImmutableMap.builder(); + for (SchemaTableName name : listViews(session, namespace)) { + try { + getView(session, name).ifPresent(view -> views.put(name, view)); + } + catch (TrinoException e) { + if (e.getErrorCode().equals(TABLE_NOT_FOUND.toErrorCode())) { + // Ignore view that was dropped during query execution (race condition) + } + else { + throw e; + } + } + } + return views.build(); + } + + @Override + public Optional getView(ConnectorSession session, SchemaTableName viewIdentifier) + { + if (isHiveSystemSchema(viewIdentifier.getSchemaName())) { + return Optional.empty(); + } + return metastore.getTable(new HiveIdentity(session), viewIdentifier.getSchemaName(), viewIdentifier.getTableName()) + .filter(table -> HiveMetadata.PRESTO_VIEW_COMMENT.equals(table.getParameters().get(TABLE_COMMENT))) // filter out materialized views + .filter(ViewReaderUtil::canDecodeView) + .map(view -> { + if (!isPrestoView(view)) { + throw new HiveViewNotSupportedException(viewIdentifier); + } + + ConnectorViewDefinition definition = viewReader + .decodeViewData(view.getViewOriginalText().get(), view, catalogName); + // use owner from table metadata if it exists + if (view.getOwner() != null && !definition.isRunAsInvoker()) { + definition = new ConnectorViewDefinition( + definition.getOriginalSql(), + definition.getCatalog(), + definition.getSchema(), + definition.getColumns(), + definition.getComment(), + Optional.of(view.getOwner()), + false); + } + return definition; + }); + } + + @Override + public List listMaterializedViews(ConnectorSession session, Optional namespace) + { + // Filter on ICEBERG_MATERIALIZED_VIEW_COMMENT is used to avoid listing hive views in case of a shared HMS and to distinguish from standard views + return listNamespaces(session, namespace).stream() + .flatMap(schema -> metastore.getTablesWithParameter(schema, TABLE_COMMENT, ICEBERG_MATERIALIZED_VIEW_COMMENT).stream() + .map(table -> new SchemaTableName(schema, table))) + .collect(toImmutableList()); + } + + @Override + public void createMaterializedView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorMaterializedViewDefinition definition, + boolean replace, boolean ignoreExisting) + { + HiveIdentity identity = new HiveIdentity(session); + Optional existing = metastore.getTable(identity, schemaViewName.getSchemaName(), schemaViewName.getTableName()); + + // It's a create command where the materialized view already exists and 'if not exists' clause is not specified + if (!replace && existing.isPresent()) { + if (ignoreExisting) { + return; + } + throw new TrinoException(ALREADY_EXISTS, "Materialized view already exists: " + schemaViewName); + } + + // Generate a storage table name and create a storage table. The properties in the definition are table properties for the + // storage table as indicated in the materialized view definition. + String storageTableName = "st_" + randomUUID().toString().replace("-", ""); + Map storageTableProperties = new HashMap<>(definition.getProperties()); + storageTableProperties.putIfAbsent(FILE_FORMAT_PROPERTY, DEFAULT_FILE_FORMAT_DEFAULT); + + SchemaTableName storageTable = new SchemaTableName(schemaViewName.getSchemaName(), storageTableName); + List columns = definition.getColumns().stream() + .map(column -> new ColumnMetadata(column.getName(), typeManager.getType(column.getType()))) + .collect(toImmutableList()); + + ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, storageTableProperties, Optional.empty()); + Transaction transaction = IcebergUtil.newCreateTableTransaction(this, tableMetadata, session); + transaction.newAppend().commit(); + transaction.commitTransaction(); + + // Create a view indicating the storage table + Map viewProperties = ImmutableMap.builder() + .put(PRESTO_QUERY_ID_NAME, session.getQueryId()) + .put(STORAGE_TABLE, storageTableName) + .put(PRESTO_VIEW_FLAG, "true") + .put(TRINO_CREATED_BY, TRINO_CREATED_BY_VALUE) + .put(TABLE_COMMENT, ICEBERG_MATERIALIZED_VIEW_COMMENT) + .build(); + + Column dummyColumn = new Column("dummy", HIVE_STRING, Optional.empty()); + + io.trino.plugin.hive.metastore.Table.Builder tableBuilder = io.trino.plugin.hive.metastore.Table.builder() + .setDatabaseName(schemaViewName.getSchemaName()) + .setTableName(schemaViewName.getTableName()) + .setOwner(session.getUser()) + .setTableType(VIRTUAL_VIEW.name()) + .setDataColumns(ImmutableList.of(dummyColumn)) + .setPartitionColumns(ImmutableList.of()) + .setParameters(viewProperties) + .withStorage(storage -> storage.setStorageFormat(VIEW_STORAGE_FORMAT)) + .withStorage(storage -> storage.setLocation("")) + .setViewOriginalText(Optional.of( + encodeMaterializedViewData(fromConnectorMaterializedViewDefinition(definition)))) + .setViewExpandedText(Optional.of("/* Presto Materialized View */")); + io.trino.plugin.hive.metastore.Table table = tableBuilder.build(); + PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(session.getUser()); + if (existing.isPresent() && replace) { + // drop the current storage table + String oldStorageTable = existing.get().getParameters().get(STORAGE_TABLE); + if (oldStorageTable != null) { + metastore.dropTable(identity, schemaViewName.getSchemaName(), oldStorageTable, true); + } + // Replace the existing view definition + metastore.replaceTable(identity, schemaViewName.getSchemaName(), schemaViewName.getTableName(), table, principalPrivileges); + return; + } + // create the view definition + metastore.createTable(identity, table, principalPrivileges); + } + + @Override + public void dropMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + final HiveIdentity identity = new HiveIdentity(session); + io.trino.plugin.hive.metastore.Table view = metastore.getTable(identity, schemaViewName.getSchemaName(), schemaViewName.getTableName()) + .orElseThrow(() -> new MaterializedViewNotFoundException(schemaViewName)); + + String storageTableName = view.getParameters().get(STORAGE_TABLE); + if (storageTableName != null) { + try { + metastore.dropTable(identity, schemaViewName.getSchemaName(), storageTableName, true); + } + catch (TrinoException e) { + log.warn(e, "Failed to drop storage table '%s' for materialized view '%s'", storageTableName, schemaViewName); + } + } + metastore.dropTable(identity, schemaViewName.getSchemaName(), schemaViewName.getTableName(), true); + } + + @Override + public Optional getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + Optional tableOptional = metastore.getTable(new HiveIdentity(session), schemaViewName.getSchemaName(), schemaViewName.getTableName()); + if (tableOptional.isEmpty()) { + return Optional.empty(); + } + + io.trino.plugin.hive.metastore.Table table = tableOptional.get(); + if (!isPrestoView(table) || !isHiveOrPrestoView(table) || !table.getParameters().containsKey(STORAGE_TABLE)) { + return Optional.empty(); + } + + io.trino.plugin.hive.metastore.Table materializedView = tableOptional.get(); + String storageTable = materializedView.getParameters().get(STORAGE_TABLE); + checkState(storageTable != null, "Storage table missing in definition of materialized view " + schemaViewName); + + IcebergMaterializedViewDefinition definition = decodeMaterializedViewData(materializedView.getViewOriginalText() + .orElseThrow(() -> new TrinoException(HIVE_INVALID_METADATA, "No view original text: " + schemaViewName))); + + Table icebergTable = loadTable(session, new SchemaTableName(schemaViewName.getSchemaName(), storageTable)); + ImmutableMap.Builder properties = ImmutableMap.builder(); + properties.put(FILE_FORMAT_PROPERTY, IcebergUtil.getFileFormat(icebergTable)); + if (!icebergTable.spec().fields().isEmpty()) { + properties.put(PARTITIONING_PROPERTY, toPartitionFields(icebergTable.spec())); + } + + return Optional.of(new ConnectorMaterializedViewDefinition( + definition.getOriginalSql(), + Optional.of(new CatalogSchemaTableName(catalogName.toString(), new SchemaTableName(schemaViewName.getSchemaName(), storageTable))), + definition.getCatalog(), + definition.getSchema(), + definition.getColumns().stream() + .map(column -> new ConnectorMaterializedViewDefinition.Column(column.getName(), column.getType())) + .collect(toImmutableList()), + definition.getComment(), + materializedView.getOwner(), + properties.build())); + } + + private List listNamespaces(ConnectorSession session, Optional namespace) + { + if (namespace.isPresent()) { + if (isHiveSystemSchema(namespace.get())) { + return ImmutableList.of(); + } + return ImmutableList.of(namespace.get()); + } + return listNamespaces(session); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index fc10ea4230a54..b496364d9df30 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -23,6 +23,8 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static io.trino.plugin.hive.HiveCompressionCodec.GZIP; +import static io.trino.plugin.iceberg.CatalogType.HIVE; +import static io.trino.plugin.iceberg.CatalogType.UNKNOWN; import static io.trino.plugin.iceberg.IcebergFileFormat.ORC; import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET; @@ -36,7 +38,8 @@ public void testDefaults() .setCompressionCodec(GZIP) .setUseFileSizeFromMetadata(true) .setMaxPartitionsPerWriter(100) - .setUniqueTableLocation(false)); + .setUniqueTableLocation(false) + .setCatalogType(HIVE)); } @Test @@ -48,6 +51,7 @@ public void testExplicitPropertyMappings() .put("iceberg.use-file-size-from-metadata", "false") .put("iceberg.max-partitions-per-writer", "222") .put("iceberg.unique-table-location", "true") + .put("iceberg.catalog.type", "UNKNOWN") .build(); IcebergConfig expected = new IcebergConfig() @@ -55,7 +59,8 @@ public void testExplicitPropertyMappings() .setCompressionCodec(HiveCompressionCodec.NONE) .setUseFileSizeFromMetadata(false) .setMaxPartitionsPerWriter(222) - .setUniqueTableLocation(true); + .setUniqueTableLocation(true) + .setCatalogType(UNKNOWN); assertFullMapping(properties, expected); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java index 111ae48c0dc6f..9b3066f5cf62a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java @@ -111,7 +111,7 @@ public void testSelect() assertMetastoreInvocations("SELECT * FROM test_select_from", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 4) + .addCopies(GET_TABLE, 3) .build()); } @@ -122,7 +122,7 @@ public void testSelectWithFilter() assertMetastoreInvocations("SELECT * FROM test_select_from_where WHERE age = 2", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 4) + .addCopies(GET_TABLE, 3) .build()); } @@ -134,7 +134,7 @@ public void testJoin() assertMetastoreInvocations("SELECT name, age FROM test_join_t1 JOIN test_join_t2 ON test_join_t2.id = test_join_t1.id", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 8) + .addCopies(GET_TABLE, 6) .build()); } @@ -145,7 +145,7 @@ public void testExplainSelect() assertMetastoreInvocations("EXPLAIN SELECT * FROM test_explain", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 4) + .addCopies(GET_TABLE, 3) .build()); } @@ -156,7 +156,7 @@ public void testShowStatsForTable() assertMetastoreInvocations("SHOW STATS FOR test_show_stats", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 4) + .addCopies(GET_TABLE, 3) .build()); } @@ -167,7 +167,7 @@ public void testShowStatsForTableWithFilter() assertMetastoreInvocations("SHOW STATS FOR (SELECT * FROM test_show_stats_with_filter where age >= 2)", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 4) + .addCopies(GET_TABLE, 3) .build()); } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergHiveTablesCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergHiveTablesCompatibility.java index cda4b42b268f9..7bc085ec89024 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergHiveTablesCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergHiveTablesCompatibility.java @@ -36,8 +36,7 @@ public void testIcebergSelectFromHiveTable() .hasMessageMatching("Query failed \\(#\\w+\\):\\Q Not an Iceberg table: default." + tableName); assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM iceberg.default.\"" + tableName + "$files\"")) - // TODO (https://github.com/trinodb/trino/issues/8690) return adequate error message - .hasMessageMatching("Query failed \\(#\\w+\\):\\Q Wrong table type: FILES"); + .hasMessageMatching("Query failed \\(#\\w+\\):\\Q Not an Iceberg table: default." + tableName); onTrino().executeQuery("DROP TABLE hive.default." + tableName); }