From b027ee29241c6be310ee92c29ae312686817e416 Mon Sep 17 00:00:00 2001 From: cai can <94670132+caican00@users.noreply.github.com> Date: Mon, 1 Jul 2024 12:26:16 +0800 Subject: [PATCH] [#3892] feat(catalog-lakehouse-paimon): Support table operations for Paimon Catalog (#3939) ### What changes were proposed in this pull request? Support table operations for Paimon Catalog. ### Why are the changes needed? Fix: https://github.com/datastrato/gravitino/issues/3892 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UTs and ITs. --------- Co-authored-by: caican --- LICENSE | 3 + .../catalog-lakehouse-paimon/build.gradle.kts | 19 + .../paimon/GravitinoPaimonColumn.java | 98 +++++ .../paimon/GravitinoPaimonTable.java | 94 +++++ .../lakehouse/paimon/PaimonCatalog.java | 6 +- .../paimon/PaimonCatalogCapability.java | 11 +- .../paimon/PaimonCatalogOperations.java | 137 +++++- .../lakehouse/paimon/PaimonSchema.java | 5 +- .../catalog/lakehouse/paimon/PaimonTable.java | 24 -- .../paimon/PaimonTablePropertiesMetadata.java | 39 ++ .../paimon/ops/PaimonCatalogOps.java | 30 +- .../lakehouse/paimon/utils/TableOpsUtils.java | 35 ++ .../lakehouse/paimon/utils/TypeUtils.java | 255 +++++++++++ .../paimon/TestGravitinoPaimonTable.java | 344 +++++++++++++++ .../lakehouse/paimon/TestPaimonCatalog.java | 3 +- .../lakehouse/paimon/TestPaimonSchema.java | 2 +- .../integration/test/CatalogPaimonBaseIT.java | 395 +++++++++++++++++- .../test/CatalogPaimonFileSystemIT.java | 10 +- .../paimon/ops/TestPaimonCatalogOps.java | 172 ++++++++ .../lakehouse/paimon/utils/TestTypeUtils.java | 196 +++++++++ 20 files changed, 1824 insertions(+), 54 deletions(-) create mode 100644 catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/GravitinoPaimonColumn.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java delete mode 100644 catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonTable.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TableOpsUtils.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TypeUtils.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/ops/TestPaimonCatalogOps.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TestTypeUtils.java diff --git a/LICENSE b/LICENSE index d80ec880eca..9d2c5a805eb 100644 --- a/LICENSE +++ b/LICENSE @@ -247,6 +247,9 @@ ./clients/client-java/src/main/java/com/datastrato/gravitino/client/OAuth2ClientUtil.java ./gradlew + Apache Paimon + ./catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TypeUtils.java + Apache Hive ./catalogs/catalog-hive/src/test/resources/hive-schema-3.1.0.derby.sql diff --git a/catalogs/catalog-lakehouse-paimon/build.gradle.kts b/catalogs/catalog-lakehouse-paimon/build.gradle.kts index 2e46a008513..ff595cb11ac 100644 --- a/catalogs/catalog-lakehouse-paimon/build.gradle.kts +++ b/catalogs/catalog-lakehouse-paimon/build.gradle.kts @@ -10,6 +10,11 @@ plugins { id("idea") } +val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() +val sparkVersion: String = libs.versions.spark34.get() +val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") +val paimonVersion: String = libs.versions.paimon.get() + dependencies { implementation(project(":api")) implementation(project(":common")) @@ -18,6 +23,7 @@ dependencies { exclude("com.sun.jersey") exclude("javax.servlet") } + implementation(libs.bundles.log4j) implementation(libs.commons.lang3) implementation(libs.guava) implementation(libs.hadoop2.common) { @@ -41,6 +47,19 @@ dependencies { testImplementation(project(":integration-test-common", "testArtifacts")) testImplementation(project(":server")) testImplementation(project(":server-common")) + testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") { + exclude("org.apache.hadoop") + } + testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") { + exclude("org.apache.avro") + exclude("org.apache.hadoop") + exclude("org.apache.zookeeper") + exclude("io.dropwizard.metrics") + exclude("org.rocksdb") + } + testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion") { + exclude("org.apache.hadoop") + } testImplementation(libs.slf4j.api) testImplementation(libs.junit.jupiter.api) testImplementation(libs.mysql.driver) diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/GravitinoPaimonColumn.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/GravitinoPaimonColumn.java new file mode 100644 index 00000000000..378f4e53912 --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/GravitinoPaimonColumn.java @@ -0,0 +1,98 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon; + +import static com.datastrato.gravitino.catalog.lakehouse.paimon.utils.TypeUtils.fromPaimonType; +import static com.datastrato.gravitino.catalog.lakehouse.paimon.utils.TypeUtils.toPaimonType; + +import com.datastrato.gravitino.connector.BaseColumn; +import com.datastrato.gravitino.rel.Column; +import java.util.List; +import java.util.stream.Collectors; +import lombok.EqualsAndHashCode; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.RowType; + +/** Implementation of {@link Column} that represents a column in the Paimon column. */ +@EqualsAndHashCode(callSuper = true) +public class GravitinoPaimonColumn extends BaseColumn { + + private GravitinoPaimonColumn() {} + + /** + * Converts {@link GravitinoPaimonColumn} instance to inner column. + * + * @param id The id of inner column. + * @return The converted inner column. + */ + public static DataField toPaimonColumn(int id, Column gravitinoColumn) { + DataType paimonType = toPaimonType(gravitinoColumn.dataType()); + DataType paimonTypeWithNullable = + gravitinoColumn.nullable() ? paimonType.nullable() : paimonType.notNull(); + return new DataField( + id, gravitinoColumn.name(), paimonTypeWithNullable, gravitinoColumn.comment()); + } + + /** + * Creates new {@link GravitinoPaimonColumn} instance from Paimon columns. + * + * @param rowType The {@link RowType} instance of Paimon column. + * @return New {@link GravitinoPaimonColumn} instances. + */ + public static List fromPaimonRowType(RowType rowType) { + return rowType.getFields().stream() + .map(GravitinoPaimonColumn::fromPaimonColumn) + .collect(Collectors.toList()); + } + + /** + * Creates a new {@link GravitinoPaimonColumn} instance from inner column. + * + * @param dataField The {@link DataField} instance of inner column. + * @return A new {@link GravitinoPaimonColumn} instance. + */ + public static GravitinoPaimonColumn fromPaimonColumn(DataField dataField) { + return builder() + .withName(dataField.name()) + .withType(fromPaimonType(dataField.type())) + .withComment(dataField.description()) + .withNullable(dataField.type().isNullable()) + .build(); + } + + /** A builder class for constructing {@link GravitinoPaimonColumn} instance. */ + public static class Builder extends BaseColumnBuilder { + + /** Creates a new instance of {@link Builder}. */ + private Builder() {} + + /** + * Internal method to build a {@link GravitinoPaimonColumn} instance using the provided values. + * + * @return A new {@link GravitinoPaimonColumn} instance with the configured values. + */ + @Override + protected GravitinoPaimonColumn internalBuild() { + GravitinoPaimonColumn paimonColumn = new GravitinoPaimonColumn(); + paimonColumn.name = name; + paimonColumn.comment = comment; + paimonColumn.dataType = dataType; + paimonColumn.nullable = nullable; + paimonColumn.autoIncrement = autoIncrement; + paimonColumn.defaultValue = defaultValue == null ? DEFAULT_VALUE_NOT_SET : defaultValue; + return paimonColumn; + } + } + + /** + * Creates a new instance of {@link Builder}. + * + * @return The new instance. + */ + public static Builder builder() { + return new Builder(); + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java new file mode 100644 index 00000000000..27155638629 --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java @@ -0,0 +1,94 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon; + +import static com.datastrato.gravitino.catalog.lakehouse.paimon.GravitinoPaimonColumn.fromPaimonRowType; +import static com.datastrato.gravitino.catalog.lakehouse.paimon.GravitinoPaimonColumn.toPaimonColumn; +import static com.datastrato.gravitino.meta.AuditInfo.EMPTY; + +import com.datastrato.gravitino.connector.BaseTable; +import com.datastrato.gravitino.connector.TableOperations; +import com.google.common.collect.Maps; +import lombok.Getter; +import lombok.ToString; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataField; + +/** Implementation of {@link Table} that represents a Paimon Table entity in the Paimon table. */ +@ToString +@Getter +public class GravitinoPaimonTable extends BaseTable { + + private GravitinoPaimonTable() {} + + @Override + protected TableOperations newOps() { + // TODO: Implement this interface when we have the Paimon table operations. + throw new UnsupportedOperationException("PaimonTable does not support TableOperations."); + } + + /** + * Converts {@link GravitinoPaimonTable} instance to Paimon table. + * + * @return The converted Paimon table. + */ + public Schema toPaimonTableSchema() { + Schema.Builder builder = Schema.newBuilder().comment(comment).options(properties); + for (int index = 0; index < columns.length; index++) { + DataField dataField = toPaimonColumn(index, columns[index]); + builder.column(dataField.name(), dataField.type(), dataField.description()); + } + return builder.build(); + } + + /** + * Creates a new {@link GravitinoPaimonTable} instance from Paimon table. + * + * @param table The {@link Table} instance of Paimon table. + * @return A new {@link GravitinoPaimonTable} instance. + */ + public static GravitinoPaimonTable fromPaimonTable(Table table) { + return builder() + .withName(table.name()) + .withColumns(fromPaimonRowType(table.rowType()).toArray(new GravitinoPaimonColumn[0])) + .withComment(table.comment().orElse(null)) + .withProperties(table.options()) + .withAuditInfo(EMPTY) + .build(); + } + + /** A builder class for constructing {@link GravitinoPaimonTable} instance. */ + public static class Builder extends BaseTableBuilder { + + /** Creates a new instance of {@link Builder}. */ + private Builder() {} + + /** + * Internal method to build a {@link GravitinoPaimonTable} instance using the provided values. + * + * @return A new {@link GravitinoPaimonTable} instance with the configured values. + */ + @Override + protected GravitinoPaimonTable internalBuild() { + GravitinoPaimonTable paimonTable = new GravitinoPaimonTable(); + paimonTable.name = name; + paimonTable.comment = comment; + paimonTable.columns = columns; + paimonTable.properties = properties == null ? Maps.newHashMap() : Maps.newHashMap(properties); + paimonTable.auditInfo = auditInfo; + return paimonTable; + } + } + + /** + * Creates a new instance of {@link Builder}. + * + * @return The new instance. + */ + public static Builder builder() { + return new Builder(); + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalog.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalog.java index 6c74076df13..f3a92ccda2b 100644 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalog.java +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalog.java @@ -20,6 +20,9 @@ public class PaimonCatalog extends BaseCatalog { static final PaimonSchemaPropertiesMetadata SCHEMA_PROPERTIES_META = new PaimonSchemaPropertiesMetadata(); + static final PaimonTablePropertiesMetadata TABLE_PROPERTIES_META = + new PaimonTablePropertiesMetadata(); + /** @return The short name of the catalog. */ @Override public String shortName() { @@ -44,8 +47,7 @@ public Capability newCapability() { @Override public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException { - throw new UnsupportedOperationException( - "The catalog does not support table properties metadata"); + return TABLE_PROPERTIES_META; } @Override diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogCapability.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogCapability.java index 8421f233034..b20603f9cd3 100644 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogCapability.java +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogCapability.java @@ -5,5 +5,14 @@ package com.datastrato.gravitino.catalog.lakehouse.paimon; import com.datastrato.gravitino.connector.capability.Capability; +import com.datastrato.gravitino.connector.capability.CapabilityResult; -public class PaimonCatalogCapability implements Capability {} +public class PaimonCatalogCapability implements Capability { + + @Override + public CapabilityResult columnDefaultValue() { + // See https://github.com/apache/paimon/pull/1425/files + return CapabilityResult.unsupported( + "Paimon set column default value through table properties instead of column info."); + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java index c3fed72a088..4a598404bd7 100644 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java @@ -4,7 +4,9 @@ */ package com.datastrato.gravitino.catalog.lakehouse.paimon; +import static com.datastrato.gravitino.catalog.lakehouse.paimon.GravitinoPaimonTable.fromPaimonTable; import static com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonSchema.fromPaimonProperties; +import static com.datastrato.gravitino.catalog.lakehouse.paimon.utils.TableOpsUtils.checkColumnCapability; import static com.datastrato.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX; import com.datastrato.gravitino.NameIdentifier; @@ -26,16 +28,23 @@ import com.datastrato.gravitino.rel.TableCatalog; import com.datastrato.gravitino.rel.TableChange; import com.datastrato.gravitino.rel.expressions.distributions.Distribution; +import com.datastrato.gravitino.rel.expressions.distributions.Distributions; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.indexes.Index; import com.datastrato.gravitino.utils.MapUtils; import com.datastrato.gravitino.utils.PrincipalUtils; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import java.time.Instant; +import java.util.Arrays; +import java.util.List; import java.util.Map; +import org.apache.commons.lang3.ArrayUtils; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +64,8 @@ public class PaimonCatalogOperations implements CatalogOperations, SupportsSchem "Paimon schema (database) %s is not empty. One or more tables exist."; private static final String SCHEMA_ALREADY_EXISTS_EXCEPTION = "Paimon schema (database) %s already exists."; + private static final String NO_SUCH_TABLE_EXCEPTION = "Paimon table %s does not exist."; + private static final String TABLE_ALREADY_EXISTS_EXCEPTION = "Paimon table %s already exists."; /** * Initializes the Paimon catalog operations with the provided configuration. @@ -119,7 +130,8 @@ public PaimonSchema createSchema( AuditInfo.builder().withCreator(currentUser).withCreateTime(Instant.now()).build()) .build(); try { - paimonCatalogOps.createDatabase(createdSchema.toPaimonProperties()); + Map paimonSchemaProperties = createdSchema.toPaimonProperties(); + paimonCatalogOps.createDatabase(identifier.name(), paimonSchemaProperties); } catch (Catalog.DatabaseAlreadyExistException e) { throw new SchemaAlreadyExistsException(e, SCHEMA_ALREADY_EXISTS_EXCEPTION, identifier); } catch (Exception e) { @@ -165,7 +177,7 @@ public PaimonSchema loadSchema(NameIdentifier identifier) throws NoSuchSchemaExc @Override public PaimonSchema alterSchema(NameIdentifier identifier, SchemaChange... changes) throws NoSuchSchemaException { - throw new UnsupportedOperationException("Alter schema is not supported in Paimon Catalog."); + throw new UnsupportedOperationException("AlterSchema is unsupported now for Paimon Catalog."); } /** @@ -202,19 +214,42 @@ public boolean dropSchema(NameIdentifier identifier, boolean cascade) */ @Override public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException { - throw new UnsupportedOperationException(); + String[] levels = namespace.levels(); + NameIdentifier schemaIdentifier = NameIdentifier.of(levels[levels.length - 1]); + if (!schemaExists(schemaIdentifier)) { + throw new NoSuchSchemaException(NO_SUCH_SCHEMA_EXCEPTION, namespace.toString()); + } + List tables; + try { + tables = paimonCatalogOps.listTables(schemaIdentifier.name()); + } catch (Catalog.DatabaseNotExistException e) { + throw new NoSuchSchemaException(NO_SUCH_SCHEMA_EXCEPTION, namespace.toString()); + } + return tables.stream() + .map( + tableIdentifier -> + NameIdentifier.of(ArrayUtils.add(namespace.levels(), tableIdentifier))) + .toArray(NameIdentifier[]::new); } /** * Loads the table with the provided identifier. * * @param identifier The identifier of the table to load. - * @return The loaded {@link PaimonTable} instance representing the table. + * @return The loaded {@link GravitinoPaimonTable} instance representing the table. * @throws NoSuchTableException If the table with the provided identifier does not exist. */ @Override - public PaimonTable loadTable(NameIdentifier identifier) throws NoSuchTableException { - throw new UnsupportedOperationException(); + public GravitinoPaimonTable loadTable(NameIdentifier identifier) throws NoSuchTableException { + Table table; + try { + NameIdentifier tableIdentifier = buildPaimonNameIdentifier(identifier); + table = paimonCatalogOps.loadTable(tableIdentifier.toString()); + } catch (Catalog.TableNotExistException e) { + throw new NoSuchTableException(e, NO_SUCH_TABLE_EXCEPTION, identifier); + } + LOG.info("Loaded Paimon table {}.", identifier); + return fromPaimonTable(table); } /** @@ -226,12 +261,12 @@ public PaimonTable loadTable(NameIdentifier identifier) throws NoSuchTableExcept * @param properties The properties for the new table. * @param partitioning The partitioning for the new table. * @param indexes The indexes for the new table. - * @return The newly created {@link PaimonTable} instance. + * @return The newly created {@link GravitinoPaimonTable} instance. * @throws NoSuchSchemaException If the schema with the provided namespace does not exist. * @throws TableAlreadyExistsException If the table with the same identifier already exists. */ @Override - public PaimonTable createTable( + public GravitinoPaimonTable createTable( NameIdentifier identifier, Column[] columns, String comment, @@ -241,7 +276,63 @@ public PaimonTable createTable( SortOrder[] sortOrders, Index[] indexes) throws NoSuchSchemaException, TableAlreadyExistsException { - throw new UnsupportedOperationException(); + NameIdentifier nameIdentifier = buildPaimonNameIdentifier(identifier); + NameIdentifier schemaIdentifier = NameIdentifier.of(nameIdentifier.namespace().levels()); + if (!schemaExists(schemaIdentifier)) { + throw new NoSuchSchemaException(NO_SUCH_SCHEMA_EXCEPTION, schemaIdentifier); + } + Preconditions.checkArgument( + partitioning == null || partitioning.length == 0, + "Table Partitions are not supported when creating a Paimon table in Gravitino now."); + Preconditions.checkArgument( + sortOrders == null || sortOrders.length == 0, + "Sort orders are not supported for Paimon in Gravitino."); + Preconditions.checkArgument( + indexes == null || indexes.length == 0, + "Indexes are not supported for Paimon in Gravitino."); + Preconditions.checkArgument( + distribution == null || distribution.strategy() == Distributions.NONE.strategy(), + "Distribution is not supported for Paimon in Gravitino now."); + String currentUser = currentUser(); + GravitinoPaimonTable createdTable = + GravitinoPaimonTable.builder() + .withName(identifier.name()) + .withColumns( + Arrays.stream(columns) + .map( + column -> { + checkColumnCapability( + column.name(), column.defaultValue(), column.autoIncrement()); + return GravitinoPaimonColumn.builder() + .withName(column.name()) + .withType(column.dataType()) + .withComment(column.comment()) + .withNullable(column.nullable()) + .withAutoIncrement(column.autoIncrement()) + .withDefaultValue(column.defaultValue()) + .build(); + }) + .toArray(GravitinoPaimonColumn[]::new)) + .withComment(comment) + .withProperties(properties) + .withAuditInfo( + AuditInfo.builder().withCreator(currentUser).withCreateTime(Instant.now()).build()) + .build(); + try { + Schema paimonTableSchema = createdTable.toPaimonTableSchema(); + paimonCatalogOps.createTable(nameIdentifier.toString(), paimonTableSchema); + } catch (Catalog.DatabaseNotExistException e) { + throw new NoSuchSchemaException(e, NO_SUCH_SCHEMA_EXCEPTION, identifier); + } catch (Catalog.TableAlreadyExistException e) { + throw new TableAlreadyExistsException(e, TABLE_ALREADY_EXISTS_EXCEPTION, identifier); + } + LOG.info( + "Created Paimon table: {}. Current user: {}. Comment: {}. Metadata: {}.", + identifier, + currentUser, + comment, + properties); + return createdTable; } /** @@ -250,14 +341,14 @@ public PaimonTable createTable( * * @param identifier The identifier of the table to alter. * @param changes The changes to apply to the table. - * @return The altered {@link PaimonTable} instance. + * @return The altered {@link GravitinoPaimonTable} instance. * @throws NoSuchTableException If the table with the provided identifier does not exist. * @throws IllegalArgumentException This exception will not be thrown in this method. */ @Override - public PaimonTable alterTable(NameIdentifier identifier, TableChange... changes) + public GravitinoPaimonTable alterTable(NameIdentifier identifier, TableChange... changes) throws NoSuchTableException, IllegalArgumentException { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("alterTable is unsupported now for Paimon Catalog."); } /** @@ -268,7 +359,15 @@ public PaimonTable alterTable(NameIdentifier identifier, TableChange... changes) */ @Override public boolean dropTable(NameIdentifier identifier) { - throw new UnsupportedOperationException(); + try { + NameIdentifier tableIdentifier = buildPaimonNameIdentifier(identifier); + paimonCatalogOps.dropTable(tableIdentifier.toString()); + } catch (Catalog.TableNotExistException e) { + LOG.warn("Paimon table {} does not exist.", identifier); + return false; + } + LOG.info("Dropped Paimon table {}.", identifier); + return true; } /** @@ -280,7 +379,7 @@ public boolean dropTable(NameIdentifier identifier) { */ @Override public boolean purgeTable(NameIdentifier identifier) throws UnsupportedOperationException { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("purgeTable is unsupported now for Paimon Catalog."); } @Override @@ -297,4 +396,14 @@ public void close() { private static String currentUser() { return PrincipalUtils.getCurrentUserName(); } + + private NameIdentifier buildPaimonNameIdentifier(NameIdentifier identifier) { + Preconditions.checkArgument( + identifier != null + && identifier.namespace() != null + && identifier.namespace().levels().length > 0, + "Namespace can not be null or empty."); + String[] levels = identifier.namespace().levels(); + return NameIdentifier.of(levels[levels.length - 1], identifier.name()); + } } diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonSchema.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonSchema.java index 4b3bae36bce..564f7625a55 100644 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonSchema.java +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonSchema.java @@ -11,7 +11,6 @@ import java.util.Map; import java.util.Optional; import lombok.ToString; -import org.apache.commons.lang3.tuple.Pair; /** * Implementation of {@link Schema} that represents a Paimon Schema (Database) entity in the Paimon @@ -27,8 +26,8 @@ private PaimonSchema() {} * * @return The converted inner schema. */ - public Pair> toPaimonProperties() { - return Pair.of(name, properties); + public Map toPaimonProperties() { + return properties; } /** diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonTable.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonTable.java deleted file mode 100644 index c313fc2da87..00000000000 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonTable.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2024 Datastrato Pvt Ltd. - * This software is licensed under the Apache License version 2. - */ -package com.datastrato.gravitino.catalog.lakehouse.paimon; - -import com.datastrato.gravitino.connector.BaseTable; -import com.datastrato.gravitino.connector.TableOperations; -import lombok.Getter; -import lombok.ToString; -import org.apache.paimon.table.Table; - -/** Implementation of {@link Table} that represents a Paimon Table entity in the Paimon table. */ -@ToString -@Getter -public class PaimonTable extends BaseTable { - - private PaimonTable() {} - - @Override - protected TableOperations newOps() throws UnsupportedOperationException { - throw new UnsupportedOperationException(); - } -} diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java new file mode 100644 index 00000000000..5f58adb04ff --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java @@ -0,0 +1,39 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon; + +import static com.datastrato.gravitino.connector.PropertyEntry.stringReservedPropertyEntry; + +import com.datastrato.gravitino.connector.BasePropertiesMetadata; +import com.datastrato.gravitino.connector.PropertiesMetadata; +import com.datastrato.gravitino.connector.PropertyEntry; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import java.util.List; +import java.util.Map; + +/** + * Implementation of {@link PropertiesMetadata} that represents Paimon table properties metadata. + */ +public class PaimonTablePropertiesMetadata extends BasePropertiesMetadata { + + public static final String COMMENT = "comment"; + public static final String CREATOR = "creator"; + + private static final Map> PROPERTIES_METADATA; + + static { + List> propertyEntries = + ImmutableList.of( + stringReservedPropertyEntry(COMMENT, "The table comment", true), + stringReservedPropertyEntry(CREATOR, "The table creator", false)); + PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName); + } + + @Override + protected Map> specificPropertyEntries() { + return PROPERTIES_METADATA; + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/ops/PaimonCatalogOps.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/ops/PaimonCatalogOps.java index 4c12a6e7da7..4add261198d 100644 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/ops/PaimonCatalogOps.java +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/ops/PaimonCatalogOps.java @@ -9,11 +9,14 @@ import com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonConfig; import java.util.List; import java.util.Map; -import org.apache.commons.lang3.tuple.Pair; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Catalog.DatabaseAlreadyExistException; import org.apache.paimon.catalog.Catalog.DatabaseNotEmptyException; import org.apache.paimon.catalog.Catalog.DatabaseNotExistException; +import org.apache.paimon.catalog.Catalog.TableNotExistException; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.Table; /** Table operation proxy that handles table operations of an underlying Paimon catalog. */ public class PaimonCatalogOps implements AutoCloseable { @@ -39,13 +42,34 @@ public Map loadDatabase(String databaseName) throws DatabaseNotE return catalog.loadDatabaseProperties(databaseName); } - public void createDatabase(Pair> database) + public void createDatabase(String databaseName, Map properties) throws DatabaseAlreadyExistException { - catalog.createDatabase(database.getKey(), false, database.getRight()); + catalog.createDatabase(databaseName, false, properties); } public void dropDatabase(String databaseName, boolean cascade) throws DatabaseNotExistException, DatabaseNotEmptyException { catalog.dropDatabase(databaseName, false, cascade); } + + public List listTables(String databaseName) throws DatabaseNotExistException { + return catalog.listTables(databaseName); + } + + public Table loadTable(String tableName) throws TableNotExistException { + return catalog.getTable(tableIdentifier(tableName)); + } + + public void createTable(String tablename, Schema schema) + throws Catalog.TableAlreadyExistException, DatabaseNotExistException { + catalog.createTable(tableIdentifier(tablename), schema, false); + } + + public void dropTable(String tableName) throws TableNotExistException { + catalog.dropTable(tableIdentifier(tableName), false); + } + + private Identifier tableIdentifier(String tableName) { + return Identifier.fromString(tableName); + } } diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TableOpsUtils.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TableOpsUtils.java new file mode 100644 index 00000000000..f7cb925d01a --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TableOpsUtils.java @@ -0,0 +1,35 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon.utils; + +import com.datastrato.gravitino.catalog.lakehouse.paimon.ops.PaimonCatalogOps; +import com.datastrato.gravitino.rel.Column; +import com.datastrato.gravitino.rel.expressions.Expression; +import com.google.common.base.Preconditions; + +/** Utilities of {@link PaimonCatalogOps} to support table operation. */ +public class TableOpsUtils { + + public static void checkColumnCapability( + String fieldName, Expression defaultValue, boolean autoIncrement) { + checkColumnDefaultValue(fieldName, defaultValue); + checkColumnAutoIncrement(fieldName, autoIncrement); + } + + private static void checkColumnDefaultValue(String fieldName, Expression defaultValue) { + Preconditions.checkArgument( + defaultValue.equals(Column.DEFAULT_VALUE_NOT_SET), + String.format( + "Paimon set column default value through table properties instead of column info. Illegal column: %s.", + fieldName)); + } + + private static void checkColumnAutoIncrement(String fieldName, boolean autoIncrement) { + Preconditions.checkArgument( + !autoIncrement, + String.format( + "Paimon does not support auto increment column. Illegal column: %s.", fieldName)); + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TypeUtils.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TypeUtils.java new file mode 100644 index 00000000000..9022135c778 --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TypeUtils.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastrato.gravitino.catalog.lakehouse.paimon.utils; + +import com.datastrato.gravitino.rel.types.Type; +import com.datastrato.gravitino.rel.types.Types; +import java.util.Arrays; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeDefaultVisitor; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.MultisetType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimeType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarCharType; + +// Referred to org/apache/paimon/spark/SparkTypeUtils.java +/** Utilities of {@link Type} to support type conversion. */ +public class TypeUtils { + + private TypeUtils() {} + + /** + * Convert Paimon {@link DataType} data type to Gravitino {@link Type} data type. + * + * @param dataType Paimon {@link DataType} data type. + * @return Gravitino {@link Type} data type. + */ + public static Type fromPaimonType(DataType dataType) { + return dataType.accept(PaimonToGravitinoTypeVisitor.INSTANCE); + } + + /** + * Convert Gravitino {@link Type} data type to Paimon {@link DataType} data type. + * + * @param type Gravitino {@link Type} data type. + * @return Paimon {@link DataType} data type. + */ + public static DataType toPaimonType(Type type) { + return GravitinoToPaimonTypeVisitor.visit(type); + } + + private static class PaimonToGravitinoTypeVisitor extends DataTypeDefaultVisitor { + + private static final PaimonToGravitinoTypeVisitor INSTANCE = new PaimonToGravitinoTypeVisitor(); + + @Override + public Type visit(VarCharType varCharType) { + if (varCharType.getLength() == Integer.MAX_VALUE) { + return Types.StringType.get(); + } else { + return Types.VarCharType.of(varCharType.getLength()); + } + } + + @Override + public Type visit(BooleanType booleanType) { + return Types.BooleanType.get(); + } + + @Override + public Type visit(BinaryType binaryType) { + return Types.BinaryType.get(); + } + + @Override + public Type visit(DecimalType decimalType) { + return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale()); + } + + @Override + public Type visit(TinyIntType tinyIntType) { + return Types.ByteType.get(); + } + + @Override + public Type visit(SmallIntType smallIntType) { + return Types.ShortType.get(); + } + + @Override + public Type visit(IntType intType) { + return Types.IntegerType.get(); + } + + @Override + public Type visit(BigIntType bigIntType) { + return Types.LongType.get(); + } + + @Override + public Type visit(FloatType floatType) { + return Types.FloatType.get(); + } + + @Override + public Type visit(DoubleType doubleType) { + return Types.DoubleType.get(); + } + + @Override + public Type visit(DateType dateType) { + return Types.DateType.get(); + } + + @Override + public Type visit(TimeType timeType) { + return Types.TimeType.get(); + } + + @Override + public Type visit(TimestampType timestampType) { + return Types.TimestampType.withoutTimeZone(); + } + + @Override + public Type visit(LocalZonedTimestampType localZonedTimestampType) { + return Types.TimestampType.withTimeZone(); + } + + @Override + public Type visit(ArrayType arrayType) { + return Types.ListType.of( + arrayType.getElementType().accept(this), arrayType.getElementType().isNullable()); + } + + @Override + public Type visit(MultisetType multisetType) { + // Unlike a Java Set, MultisetType allows for multiple instances for each of its + // elements with a common subtype. And a conversion is possible through a map + // that assigns each value to an integer to represent the multiplicity of the values. + // For example, a `MULTISET` is converted to a `MAP`, the key of the + // map represents the elements of the Multiset and the value represents the multiplicity of + // the elements in the Multiset. + return Types.MapType.of( + multisetType.getElementType().accept(this), Types.IntegerType.get(), false); + } + + @Override + public Type visit(MapType mapType) { + return Types.MapType.of( + mapType.getKeyType().accept(this), + mapType.getValueType().accept(this), + mapType.getValueType().isNullable()); + } + + @Override + public Type visit(RowType rowType) { + return Types.StructType.of( + rowType.getFields().stream() + .map( + field -> + Types.StructType.Field.of( + field.name(), + field.type().accept(this), + field.type().isNullable(), + field.description())) + .toArray(Types.StructType.Field[]::new)); + } + + @Override + protected Type defaultMethod(DataType dataType) { + return Types.UnparsedType.of(dataType.asSQLString()); + } + } + + private static class GravitinoToPaimonTypeVisitor { + + public static DataType visit(Type type) { + switch (type.name()) { + case BOOLEAN: + return DataTypes.BOOLEAN(); + case BYTE: + return DataTypes.TINYINT(); + case SHORT: + return DataTypes.SMALLINT(); + case INTEGER: + return DataTypes.INT(); + case LONG: + return DataTypes.BIGINT(); + case FLOAT: + return DataTypes.FLOAT(); + case DOUBLE: + return DataTypes.DOUBLE(); + case DECIMAL: + Types.DecimalType decimalType = (Types.DecimalType) type; + return DataTypes.DECIMAL(decimalType.precision(), decimalType.scale()); + case BINARY: + return DataTypes.BINARY(BinaryType.MAX_LENGTH); + case STRING: + return DataTypes.STRING(); + case VARCHAR: + return DataTypes.VARCHAR(((Types.VarCharType) type).length()); + case DATE: + return DataTypes.DATE(); + case TIME: + return DataTypes.TIME(); + case TIMESTAMP: + return ((Types.TimestampType) type).hasTimeZone() + ? DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE() + : DataTypes.TIMESTAMP(); + case LIST: + Types.ListType listType = (Types.ListType) type; + return DataTypes.ARRAY(visit(listType.elementType())); + case MAP: + Types.MapType mapType = (Types.MapType) type; + return DataTypes.MAP(visit(mapType.keyType()), visit(mapType.valueType())); + case STRUCT: + RowType.Builder builder = RowType.builder(); + Arrays.stream(((Types.StructType) type).fields()) + .forEach( + field -> { + DataType dataType = GravitinoToPaimonTypeVisitor.visit(field.type()); + DataType dataTypeWithNullable = + field.nullable() ? dataType.nullable() : dataType.notNull(); + builder.field(field.name(), dataTypeWithNullable, field.comment()); + }); + return builder.build(); + default: + throw new UnsupportedOperationException( + String.format( + "Paimon does not support Gravitino %s data type.", type.simpleString())); + } + } + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java new file mode 100644 index 00000000000..5d676e54f6f --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java @@ -0,0 +1,344 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon; + +import static com.datastrato.gravitino.catalog.lakehouse.paimon.GravitinoPaimonColumn.fromPaimonColumn; +import static com.datastrato.gravitino.catalog.lakehouse.paimon.TestPaimonCatalog.PAIMON_PROPERTIES_METADATA; +import static com.datastrato.gravitino.catalog.lakehouse.paimon.utils.TableOpsUtils.checkColumnCapability; + +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.catalog.PropertiesMetadataHelpers; +import com.datastrato.gravitino.connector.PropertiesMetadata; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import com.datastrato.gravitino.exceptions.TableAlreadyExistsException; +import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.meta.CatalogEntity; +import com.datastrato.gravitino.rel.Column; +import com.datastrato.gravitino.rel.Table; +import com.datastrato.gravitino.rel.TableCatalog; +import com.datastrato.gravitino.rel.expressions.distributions.Distributions; +import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; +import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.types.Types; +import com.google.common.collect.Maps; +import java.time.Instant; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VarCharType; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class TestGravitinoPaimonTable { + + private static final String META_LAKE_NAME = "metalake"; + + private static final String PAIMON_CATALOG_NAME = "test_catalog"; + private static final String PAIMON_SCHEMA_NAME = "test_schema"; + private static final String PAIMON_COMMENT = "test_comment"; + private static PaimonCatalog paimonCatalog; + private static PaimonCatalogOperations paimonCatalogOperations; + private static PaimonSchema paimonSchema; + private static final NameIdentifier schemaIdent = + NameIdentifier.of(META_LAKE_NAME, PAIMON_CATALOG_NAME, PAIMON_SCHEMA_NAME); + + @BeforeAll + static void setup() { + initPaimonCatalog(); + initPaimonSchema(); + } + + @AfterEach + void resetSchema() { + NameIdentifier[] nameIdentifiers = + paimonCatalogOperations.listTables( + Namespace.of(ArrayUtils.add(schemaIdent.namespace().levels(), schemaIdent.name()))); + if (ArrayUtils.isNotEmpty(nameIdentifiers)) { + Arrays.stream(nameIdentifiers) + .map( + nameIdentifier -> { + String[] levels = nameIdentifier.namespace().levels(); + return NameIdentifier.of( + Namespace.of(levels[levels.length - 1]), nameIdentifier.name()); + }) + .forEach(nameIdentifier -> paimonCatalogOperations.dropTable(nameIdentifier)); + } + paimonCatalogOperations.dropSchema(schemaIdent, false); + initPaimonSchema(); + } + + @AfterAll + static void cleanUp() { + paimonCatalogOperations.dropSchema(schemaIdent, true); + } + + private static CatalogEntity createDefaultCatalogEntity() { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("testPaimonUser").withCreateTime(Instant.now()).build(); + + CatalogEntity entity = + CatalogEntity.builder() + .withId(1L) + .withName(PAIMON_CATALOG_NAME) + .withNamespace(Namespace.of(META_LAKE_NAME)) + .withType(PaimonCatalog.Type.RELATIONAL) + .withProvider("lakehouse-paimon") + .withAuditInfo(auditInfo) + .build(); + return entity; + } + + @Test + void testCreatePaimonTable() { + String paimonTableName = "test_paimon_table"; + NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(), paimonTableName); + Map properties = Maps.newHashMap(); + properties.put("key1", "val1"); + properties.put("key2", "val2"); + + GravitinoPaimonColumn col1 = + fromPaimonColumn(new DataField(0, "col_1", DataTypes.INT().nullable(), PAIMON_COMMENT)); + GravitinoPaimonColumn col2 = + fromPaimonColumn(new DataField(1, "col_2", DataTypes.DATE().notNull(), PAIMON_COMMENT)); + RowType rowTypeInside = + RowType.builder() + .field("integer_field_inside", DataTypes.INT().notNull()) + .field("string_field_inside", DataTypes.STRING().notNull()) + .build(); + RowType rowType = + RowType.builder() + .field("integer_field", DataTypes.INT().notNull()) + .field("string_field", DataTypes.STRING().notNull(), "string field") + .field("struct_field", rowTypeInside.nullable(), "struct field") + .build(); + GravitinoPaimonColumn col3 = + fromPaimonColumn(new DataField(2, "col_3", rowType.notNull(), PAIMON_COMMENT)); + + Column[] columns = new Column[] {col1, col2, col3}; + Table table = + paimonCatalogOperations.createTable( + tableIdentifier, + columns, + PAIMON_COMMENT, + properties, + new Transform[0], + Distributions.NONE, + new SortOrder[0]); + + Assertions.assertEquals(tableIdentifier.name(), table.name()); + Assertions.assertEquals(PAIMON_COMMENT, table.comment()); + Assertions.assertEquals("val1", table.properties().get("key1")); + Assertions.assertEquals("val2", table.properties().get("key2")); + + Table loadedTable = paimonCatalogOperations.loadTable(tableIdentifier); + + Assertions.assertEquals("val1", loadedTable.properties().get("key1")); + Assertions.assertEquals("val2", loadedTable.properties().get("key2")); + Assertions.assertTrue(loadedTable.columns()[0].nullable()); + Assertions.assertFalse(loadedTable.columns()[1].nullable()); + Assertions.assertFalse(loadedTable.columns()[2].nullable()); + + Assertions.assertTrue(paimonCatalogOperations.tableExists(tableIdentifier)); + NameIdentifier[] tableIdents = paimonCatalogOperations.listTables(tableIdentifier.namespace()); + Assertions.assertTrue(Arrays.asList(tableIdents).contains(tableIdentifier)); + + // Test exception + TableCatalog tableCatalog = paimonCatalogOperations; + Throwable exception = + Assertions.assertThrows( + TableAlreadyExistsException.class, + () -> + tableCatalog.createTable( + tableIdentifier, + columns, + PAIMON_COMMENT, + properties, + new Transform[0], + Distributions.NONE, + new SortOrder[0])); + Assertions.assertTrue( + exception + .getMessage() + .contains(String.format("Paimon table %s already exists", tableIdentifier))); + } + + @Test + void testDropPaimonTable() { + NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(), genRandomName()); + Map properties = Maps.newHashMap(); + properties.put("key1", "val1"); + properties.put("key2", "val2"); + + GravitinoPaimonColumn col1 = + fromPaimonColumn(new DataField(0, "col_1", DataTypes.INT().nullable(), PAIMON_COMMENT)); + GravitinoPaimonColumn col2 = + fromPaimonColumn(new DataField(1, "col_2", DataTypes.DATE().nullable(), PAIMON_COMMENT)); + Column[] columns = new Column[] {col1, col2}; + + paimonCatalogOperations.createTable( + tableIdentifier, + columns, + PAIMON_COMMENT, + properties, + new Transform[0], + Distributions.NONE, + new SortOrder[0]); + + Assertions.assertTrue(paimonCatalogOperations.tableExists(tableIdentifier)); + paimonCatalogOperations.dropTable(tableIdentifier); + Assertions.assertFalse(paimonCatalogOperations.tableExists(tableIdentifier)); + } + + @Test + void testListTableException() { + Namespace tableNs = Namespace.of("metalake", paimonCatalog.name(), "not_exist_db"); + TableCatalog tableCatalog = paimonCatalogOperations; + Throwable exception = + Assertions.assertThrows( + NoSuchSchemaException.class, () -> tableCatalog.listTables(tableNs)); + Assertions.assertTrue( + exception + .getMessage() + .contains( + String.format("Paimon schema (database) %s does not exist", tableNs.toString()))); + } + + @Test + void testTableProperty() { + CatalogEntity entity = createDefaultCatalogEntity(); + try (PaimonCatalogOperations ops = new PaimonCatalogOperations()) { + ops.initialize( + initBackendCatalogProperties(), entity.toCatalogInfo(), PAIMON_PROPERTIES_METADATA); + Map map = Maps.newHashMap(); + map.put(PaimonTablePropertiesMetadata.COMMENT, "test"); + map.put(PaimonTablePropertiesMetadata.CREATOR, "test"); + for (Map.Entry entry : map.entrySet()) { + HashMap properties = + new HashMap() { + { + put(entry.getKey(), entry.getValue()); + } + }; + PropertiesMetadata metadata = paimonCatalog.tablePropertiesMetadata(); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> PropertiesMetadataHelpers.validatePropertyForCreate(metadata, properties)); + } + + map = Maps.newHashMap(); + map.put("key1", "val1"); + map.put("key2", "val2"); + for (Map.Entry entry : map.entrySet()) { + HashMap properties = + new HashMap() { + { + put(entry.getKey(), entry.getValue()); + } + }; + PropertiesMetadata metadata = paimonCatalog.tablePropertiesMetadata(); + Assertions.assertDoesNotThrow( + () -> { + PropertiesMetadataHelpers.validatePropertyForCreate(metadata, properties); + }); + } + } + } + + @Test + void testGravitinoToPaimonTable() { + Column[] columns = createColumns(); + NameIdentifier identifier = NameIdentifier.of("test_schema", "test_table"); + Map properties = Maps.newHashMap(); + properties.put("key1", "val1"); + + GravitinoPaimonTable gravitinoPaimonTable = + GravitinoPaimonTable.builder() + .withName(identifier.name()) + .withColumns( + Arrays.stream(columns) + .map( + column -> { + checkColumnCapability( + column.name(), column.defaultValue(), column.autoIncrement()); + return GravitinoPaimonColumn.builder() + .withName(column.name()) + .withType(column.dataType()) + .withComment(column.comment()) + .withNullable(column.nullable()) + .withAutoIncrement(column.autoIncrement()) + .withDefaultValue(column.defaultValue()) + .build(); + }) + .toArray(GravitinoPaimonColumn[]::new)) + .withComment("test_table_comment") + .withProperties(properties) + .build(); + Schema paimonTableSchema = gravitinoPaimonTable.toPaimonTableSchema(); + Assertions.assertEquals(gravitinoPaimonTable.comment(), gravitinoPaimonTable.comment()); + Assertions.assertEquals(gravitinoPaimonTable.properties(), paimonTableSchema.options()); + Assertions.assertEquals( + gravitinoPaimonTable.columns().length, paimonTableSchema.fields().size()); + Assertions.assertEquals(3, paimonTableSchema.fields().size()); + for (int i = 0; i < gravitinoPaimonTable.columns().length; i++) { + Column column = gravitinoPaimonTable.columns()[i]; + DataField dataField = paimonTableSchema.fields().get(i); + Assertions.assertEquals(column.name(), dataField.name()); + Assertions.assertEquals(column.comment(), dataField.description()); + } + Assertions.assertEquals(new IntType().nullable(), paimonTableSchema.fields().get(0).type()); + Assertions.assertEquals(new DateType().nullable(), paimonTableSchema.fields().get(1).type()); + Assertions.assertEquals( + new VarCharType(Integer.MAX_VALUE).nullable(), paimonTableSchema.fields().get(2).type()); + } + + private static String genRandomName() { + return UUID.randomUUID().toString().replace("-", ""); + } + + private static Map initBackendCatalogProperties() { + Map conf = Maps.newHashMap(); + conf.put(PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, "filesystem"); + conf.put(PaimonCatalogPropertiesMetadata.WAREHOUSE, "/tmp/paimon_catalog_warehouse"); + return conf; + } + + private static void initPaimonCatalog() { + CatalogEntity entity = createDefaultCatalogEntity(); + + Map conf = initBackendCatalogProperties(); + paimonCatalog = new PaimonCatalog().withCatalogConf(conf).withCatalogEntity(entity); + paimonCatalogOperations = (PaimonCatalogOperations) paimonCatalog.ops(); + } + + private static void initPaimonSchema() { + Map properties = Maps.newHashMap(); + properties.put("key1", "val1"); + properties.put("key2", "val2"); + + if (paimonCatalogOperations.schemaExists(schemaIdent)) { + paimonCatalogOperations.dropSchema(schemaIdent, true); + } + paimonSchema = paimonCatalogOperations.createSchema(schemaIdent, PAIMON_COMMENT, properties); + } + + private static Column[] createColumns() { + Column col1 = Column.of("col1", Types.IntegerType.get(), "col_1_comment"); + Column col2 = Column.of("col2", Types.DateType.get(), "col_2_comment"); + Column col3 = Column.of("col3", Types.StringType.get(), "col_3_comment"); + return new Column[] {col1, col2, col3}; + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestPaimonCatalog.java b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestPaimonCatalog.java index e04225f6b25..4303645f914 100644 --- a/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestPaimonCatalog.java +++ b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestPaimonCatalog.java @@ -6,6 +6,7 @@ import static com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonCatalog.CATALOG_PROPERTIES_META; import static com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonCatalog.SCHEMA_PROPERTIES_META; +import static com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonCatalog.TABLE_PROPERTIES_META; import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.catalog.PropertiesMetadataHelpers; @@ -30,7 +31,7 @@ public class TestPaimonCatalog { @Override public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException { - throw new UnsupportedOperationException("Table properties are not supported"); + return TABLE_PROPERTIES_META; } @Override diff --git a/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestPaimonSchema.java b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestPaimonSchema.java index 98ae834f1d5..e4e1471d544 100644 --- a/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestPaimonSchema.java +++ b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestPaimonSchema.java @@ -111,7 +111,7 @@ public void testAlterSchema() { paimonCatalogOperations.createSchema(ident, COMMENT_VALUE, properties); Assertions.assertTrue(paimonCatalogOperations.schemaExists(ident)); properties.forEach( - (k, v) -> Assertions.assertEquals(v, paimonSchema.toPaimonProperties().getValue().get(k))); + (k, v) -> Assertions.assertEquals(v, paimonSchema.toPaimonProperties().get(k))); // schema properties of FilesystemCatalog is empty when loadDatabase. Map properties1 = paimonCatalogOperations.loadSchema(ident).properties(); diff --git a/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java index 01770c0a358..dfc2d43fbcb 100644 --- a/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java +++ b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java @@ -6,6 +6,7 @@ import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.Schema; import com.datastrato.gravitino.SchemaChange; import com.datastrato.gravitino.SupportsSchemas; @@ -13,13 +14,28 @@ import com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonConfig; import com.datastrato.gravitino.catalog.lakehouse.paimon.utils.CatalogUtils; import com.datastrato.gravitino.client.GravitinoMetalake; +import com.datastrato.gravitino.dto.util.DTOConverters; import com.datastrato.gravitino.exceptions.NoSuchSchemaException; import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; +import com.datastrato.gravitino.exceptions.TableAlreadyExistsException; import com.datastrato.gravitino.integration.test.container.ContainerSuite; import com.datastrato.gravitino.integration.test.util.AbstractIT; import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; +import com.datastrato.gravitino.rel.Column; +import com.datastrato.gravitino.rel.Table; +import com.datastrato.gravitino.rel.TableCatalog; +import com.datastrato.gravitino.rel.expressions.distributions.Distribution; +import com.datastrato.gravitino.rel.expressions.distributions.Distributions; +import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; +import com.datastrato.gravitino.rel.expressions.sorts.SortOrders; +import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.expressions.transforms.Transforms; +import com.datastrato.gravitino.rel.types.Types; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -28,6 +44,15 @@ import java.util.Map; import java.util.Set; import org.apache.paimon.catalog.Catalog.DatabaseNotExistException; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.TimestampType; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -38,23 +63,37 @@ public abstract class CatalogPaimonBaseIT extends AbstractIT { protected static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + protected String WAREHOUSE; + protected String TYPE; private static final String provider = "lakehouse-paimon"; private static final String catalog_comment = "catalog_comment"; private static final String schema_comment = "schema_comment"; + private static final String table_comment = "table_comment"; + private static final String PAIMON_COL_NAME1 = "paimon_col_name1"; + private static final String PAIMON_COL_NAME2 = "paimon_col_name2"; + private static final String PAIMON_COL_NAME3 = "paimon_col_name3"; + private static final String PAIMON_COL_NAME4 = "paimon_col_name4"; private String metalakeName = GravitinoITUtils.genRandomName("paimon_it_metalake"); private String catalogName = GravitinoITUtils.genRandomName("paimon_it_catalog"); private String schemaName = GravitinoITUtils.genRandomName("paimon_it_schema"); + private String tableName = GravitinoITUtils.genRandomName("paimon_it_table"); + private static String INSERT_BATCH_WITHOUT_PARTITION_TEMPLATE = "INSERT INTO paimon.%s VALUES %s"; + private static final String SELECT_ALL_TEMPLATE = "SELECT * FROM paimon.%s"; private GravitinoMetalake metalake; private Catalog catalog; private org.apache.paimon.catalog.Catalog paimonCatalog; + private SparkSession spark; + private Map catalogProperties; @BeforeAll public void startup() { containerSuite.startHiveContainer(); + catalogProperties = initPaimonCatalogProperties(); createMetalake(); createCatalog(); createSchema(); + initSparkEnv(); } @AfterAll @@ -62,6 +101,9 @@ public void stop() { clearTableAndSchema(); metalake.dropCatalog(catalogName); client.dropMetalake(metalakeName); + if (spark != null) { + spark.close(); + } } @AfterEach @@ -128,6 +170,317 @@ void testPaimonSchemaOperations() throws DatabaseNotExistException { Assertions.assertFalse(paimonDatabaseNames.contains(testSchemaName)); } + @Test + void testCreateTableWithNullComment() { + Column[] columns = createColumns(); + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName); + + TableCatalog tableCatalog = catalog.asTableCatalog(); + Table createdTable = + tableCatalog.createTable(tableIdentifier, columns, null, null, null, null, null); + Assertions.assertNull(createdTable.comment()); + + Table loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertNull(loadTable.comment()); + } + + @Test + void testCreateAndLoadPaimonTable() + throws org.apache.paimon.catalog.Catalog.TableNotExistException { + // Create table from Gravitino API + Column[] columns = createColumns(); + + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName); + Distribution distribution = Distributions.NONE; + + Transform[] partitioning = Transforms.EMPTY_TRANSFORM; + SortOrder[] sortOrders = SortOrders.NONE; + Map properties = createProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + Table createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + Assertions.assertEquals(createdTable.name(), tableName); + Map resultProp = createdTable.properties(); + for (Map.Entry entry : properties.entrySet()) { + Assertions.assertTrue(resultProp.containsKey(entry.getKey())); + Assertions.assertEquals(entry.getValue(), resultProp.get(entry.getKey())); + } + Assertions.assertEquals(createdTable.columns().length, columns.length); + + for (int i = 0; i < columns.length; i++) { + Assertions.assertEquals(DTOConverters.toDTO(columns[i]), createdTable.columns()[i]); + } + + Table loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals(tableName, loadTable.name()); + Assertions.assertEquals(table_comment, loadTable.comment()); + resultProp = loadTable.properties(); + for (Map.Entry entry : properties.entrySet()) { + Assertions.assertTrue(resultProp.containsKey(entry.getKey())); + Assertions.assertEquals(entry.getValue(), resultProp.get(entry.getKey())); + } + Assertions.assertEquals(loadTable.columns().length, columns.length); + for (int i = 0; i < columns.length; i++) { + Assertions.assertEquals(DTOConverters.toDTO(columns[i]), loadTable.columns()[i]); + } + + // catalog load check + org.apache.paimon.table.Table table = + paimonCatalog.getTable(Identifier.create(schemaName, tableName)); + Assertions.assertEquals(tableName, table.name()); + Assertions.assertTrue(table.comment().isPresent()); + Assertions.assertEquals(table_comment, table.comment().get()); + resultProp = table.options(); + for (Map.Entry entry : properties.entrySet()) { + Assertions.assertTrue(resultProp.containsKey(entry.getKey())); + Assertions.assertEquals(entry.getValue(), resultProp.get(entry.getKey())); + } + + Assertions.assertInstanceOf(FileStoreTable.class, table); + FileStoreTable fileStoreTable = (FileStoreTable) table; + + TableSchema schema = fileStoreTable.schema(); + Assertions.assertEquals(schema.fields().size(), columns.length); + for (int i = 0; i < columns.length; i++) { + Assertions.assertEquals(columns[i].name(), schema.fieldNames().get(i)); + } + Assertions.assertEquals(partitioning.length, fileStoreTable.partitionKeys().size()); + + Assertions.assertThrows( + TableAlreadyExistsException.class, + () -> + catalog + .asTableCatalog() + .createTable( + tableIdentifier, + columns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + distribution, + sortOrders)); + } + + @Test + void testCreateTableWithTimestampColumn() + throws org.apache.paimon.catalog.Catalog.TableNotExistException { + Column col1 = Column.of("paimon_column_1", Types.TimestampType.withTimeZone(), "col_1_comment"); + Column col2 = + Column.of("paimon_column_2", Types.TimestampType.withoutTimeZone(), "col_2_comment"); + + Column[] columns = new Column[] {col1, col2}; + + String timestampTableName = "timestamp_table"; + + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, timestampTableName); + + Map properties = createProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + Table createdTable = + tableCatalog.createTable(tableIdentifier, columns, table_comment, properties); + Assertions.assertEquals("paimon_column_1", createdTable.columns()[0].name()); + Assertions.assertEquals( + Types.TimestampType.withTimeZone(), createdTable.columns()[0].dataType()); + Assertions.assertEquals("col_1_comment", createdTable.columns()[0].comment()); + Assertions.assertTrue(createdTable.columns()[0].nullable()); + + Assertions.assertEquals("paimon_column_2", createdTable.columns()[1].name()); + Assertions.assertEquals( + Types.TimestampType.withoutTimeZone(), createdTable.columns()[1].dataType()); + Assertions.assertEquals("col_2_comment", createdTable.columns()[1].comment()); + Assertions.assertTrue(createdTable.columns()[1].nullable()); + + Table loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals("paimon_column_1", loadTable.columns()[0].name()); + Assertions.assertEquals(Types.TimestampType.withTimeZone(), loadTable.columns()[0].dataType()); + Assertions.assertEquals("col_1_comment", loadTable.columns()[0].comment()); + Assertions.assertTrue(loadTable.columns()[0].nullable()); + + Assertions.assertEquals("paimon_column_2", loadTable.columns()[1].name()); + Assertions.assertEquals( + Types.TimestampType.withoutTimeZone(), loadTable.columns()[1].dataType()); + Assertions.assertEquals("col_2_comment", loadTable.columns()[1].comment()); + Assertions.assertTrue(loadTable.columns()[1].nullable()); + + org.apache.paimon.table.Table table = + paimonCatalog.getTable(Identifier.create(schemaName, timestampTableName)); + Assertions.assertInstanceOf(FileStoreTable.class, table); + FileStoreTable fileStoreTable = (FileStoreTable) table; + TableSchema tableSchema = fileStoreTable.schema(); + Assertions.assertEquals("paimon_column_1", tableSchema.fields().get(0).name()); + Assertions.assertEquals( + new LocalZonedTimestampType().nullable(), tableSchema.fields().get(0).type()); + Assertions.assertEquals("col_1_comment", tableSchema.fields().get(0).description()); + + Assertions.assertEquals("paimon_column_2", tableSchema.fields().get(1).name()); + Assertions.assertEquals(new TimestampType().nullable(), tableSchema.fields().get(1).type()); + Assertions.assertEquals("col_2_comment", tableSchema.fields().get(1).description()); + } + + @Test + void testListAndDropPaimonTable() throws DatabaseNotExistException { + Column[] columns = createColumns(); + + String tableName1 = "table_1"; + + NameIdentifier table1 = NameIdentifier.of(schemaName, tableName1); + + Map properties = createProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + tableCatalog.createTable( + table1, + columns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + NameIdentifier[] nameIdentifiers = tableCatalog.listTables(Namespace.of(schemaName)); + Assertions.assertEquals(1, nameIdentifiers.length); + Assertions.assertEquals("table_1", nameIdentifiers[0].name()); + + List tableIdentifiers = paimonCatalog.listTables(schemaName); + Assertions.assertEquals(1, tableIdentifiers.size()); + Assertions.assertEquals("table_1", tableIdentifiers.get(0)); + + String tableName2 = "table_2"; + + NameIdentifier table2 = NameIdentifier.of(schemaName, tableName2); + tableCatalog.createTable( + table2, + columns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0]); + nameIdentifiers = tableCatalog.listTables(Namespace.of(schemaName)); + Assertions.assertEquals(2, nameIdentifiers.length); + Assertions.assertEquals("table_1", nameIdentifiers[0].name()); + Assertions.assertEquals("table_2", nameIdentifiers[1].name()); + + tableIdentifiers = paimonCatalog.listTables(schemaName); + Assertions.assertEquals(2, tableIdentifiers.size()); + Assertions.assertEquals("table_1", tableIdentifiers.get(0)); + Assertions.assertEquals("table_2", tableIdentifiers.get(1)); + + Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(table1)); + + nameIdentifiers = tableCatalog.listTables(Namespace.of(schemaName)); + Assertions.assertEquals(1, nameIdentifiers.length); + Assertions.assertEquals("table_2", nameIdentifiers[0].name()); + + Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(table2)); + Namespace schemaNamespace = Namespace.of(schemaName); + nameIdentifiers = tableCatalog.listTables(schemaNamespace); + Assertions.assertEquals(0, nameIdentifiers.length); + + Assertions.assertEquals(0, paimonCatalog.listTables(schemaName).size()); + } + + @Test + void testOperationDataOfPaimonTable() { + Column[] columns = createColumns(); + String testTableName = GravitinoITUtils.genRandomName("test_table"); + SortOrder[] sortOrders = SortOrders.NONE; + Transform[] transforms = Transforms.EMPTY_TRANSFORM; + catalog + .asTableCatalog() + .createTable( + NameIdentifier.of(schemaName, testTableName), + columns, + table_comment, + createProperties(), + transforms, + Distributions.NONE, + sortOrders); + List values = getValues(); + String dbTable = String.join(".", schemaName, testTableName); + // insert data + String insertSQL = + String.format(INSERT_BATCH_WITHOUT_PARTITION_TEMPLATE, dbTable, String.join(", ", values)); + spark.sql(insertSQL); + + // select data + Dataset sql = spark.sql(String.format(SELECT_ALL_TEMPLATE, dbTable)); + Assertions.assertEquals(4, sql.count()); + Row[] result = (Row[]) sql.sort(PAIMON_COL_NAME1).collect(); + LocalDate currentDate = LocalDate.now(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + for (int i = 0; i < result.length; i++) { + LocalDate previousDay = currentDate.minusDays(i + 1); + Assertions.assertEquals( + String.format( + "[%s,%s,data%s,[%s,string%s,[%s,inner%s]]]", + i + 1, previousDay.format(formatter), i + 1, (i + 1) * 10, i + 1, i + 1, i + 1), + result[i].toString()); + } + + // update data + spark.sql( + String.format( + "UPDATE paimon.%s SET %s = 100 WHERE %s = 1", + dbTable, PAIMON_COL_NAME1, PAIMON_COL_NAME1)); + sql = spark.sql(String.format(SELECT_ALL_TEMPLATE, dbTable)); + Assertions.assertEquals(4, sql.count()); + result = (Row[]) sql.sort(PAIMON_COL_NAME1).collect(); + for (int i = 0; i < result.length; i++) { + if (i == result.length - 1) { + LocalDate previousDay = currentDate.minusDays(1); + Assertions.assertEquals( + String.format( + "[100,%s,data%s,[%s,string%s,[%s,inner%s]]]", + previousDay.format(formatter), 1, 10, 1, 1, 1), + result[i].toString()); + } else { + LocalDate previousDay = currentDate.minusDays(i + 2); + Assertions.assertEquals( + String.format( + "[%s,%s,data%s,[%s,string%s,[%s,inner%s]]]", + i + 2, previousDay.format(formatter), i + 2, (i + 2) * 10, i + 2, i + 2, i + 2), + result[i].toString()); + } + } + // delete data + spark.sql(String.format("DELETE FROM paimon.%s WHERE %s = 100", dbTable, PAIMON_COL_NAME1)); + sql = spark.sql(String.format(SELECT_ALL_TEMPLATE, dbTable)); + Assertions.assertEquals(3, sql.count()); + result = (Row[]) sql.sort(PAIMON_COL_NAME1).collect(); + for (int i = 0; i < result.length; i++) { + LocalDate previousDay = currentDate.minusDays(i + 2); + Assertions.assertEquals( + String.format( + "[%s,%s,data%s,[%s,string%s,[%s,inner%s]]]", + i + 2, previousDay.format(formatter), i + 2, (i + 2) * 10, i + 2, i + 2, i + 2), + result[i].toString()); + } + } + + private static @NotNull List getValues() { + List values = new ArrayList<>(); + for (int i = 1; i < 5; i++) { + String structValue = + String.format( + "STRUCT(%d, 'string%d', %s)", + i * 10, // integer_field + i, // string_field + String.format( + "STRUCT(%d, 'inner%d')", + i, i) // struct_field, alternating NULL and non-NULL values + ); + values.add( + String.format("(%d, date_sub(current_date(), %d), 'data%d', %s)", i, i, i, structValue)); + } + return values; + } + private void clearTableAndSchema() { if (catalog.asSchemas().schemaExists(schemaName)) { catalog.asSchemas().dropSchema(schemaName, true); @@ -144,7 +497,6 @@ private void createMetalake() { } private void createCatalog() { - Map catalogProperties = initPaimonCatalogProperties(); Catalog createdCatalog = metalake.createCatalog( catalogName, Catalog.Type.RELATIONAL, provider, catalog_comment, catalogProperties); @@ -174,4 +526,45 @@ private void createSchema() { Assertions.assertEquals(createdSchema.name(), loadSchema.name()); Assertions.assertTrue(loadSchema.properties().isEmpty()); } + + private Column[] createColumns() { + Column col1 = Column.of(PAIMON_COL_NAME1, Types.IntegerType.get(), "col_1_comment"); + Column col2 = Column.of(PAIMON_COL_NAME2, Types.DateType.get(), "col_2_comment"); + Column col3 = Column.of(PAIMON_COL_NAME3, Types.StringType.get(), "col_3_comment"); + Types.StructType structTypeInside = + Types.StructType.of( + Types.StructType.Field.notNullField("integer_field_inside", Types.IntegerType.get()), + Types.StructType.Field.notNullField( + "string_field_inside", Types.StringType.get(), "string field inside")); + Types.StructType structType = + Types.StructType.of( + Types.StructType.Field.notNullField("integer_field", Types.IntegerType.get()), + Types.StructType.Field.notNullField( + "string_field", Types.StringType.get(), "string field"), + Types.StructType.Field.nullableField("struct_field", structTypeInside, "struct field")); + Column col4 = Column.of(PAIMON_COL_NAME4, structType, "col_4_comment"); + return new Column[] {col1, col2, col3, col4}; + } + + private Map createProperties() { + Map properties = Maps.newHashMap(); + properties.put("key1", "val1"); + properties.put("key2", "val2"); + return properties; + } + + private void initSparkEnv() { + spark = + SparkSession.builder() + .master("local[1]") + .appName("Paimon Catalog integration test") + .config("spark.sql.warehouse.dir", WAREHOUSE) + .config("spark.sql.catalog.paimon", "org.apache.paimon.spark.SparkCatalog") + .config("spark.sql.catalog.paimon.warehouse", WAREHOUSE) + .config( + "spark.sql.extensions", + "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") + .enableHiveSupport() + .getOrCreate(); + } } diff --git a/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonFileSystemIT.java b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonFileSystemIT.java index 84a5860420f..b48862583e7 100644 --- a/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonFileSystemIT.java +++ b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonFileSystemIT.java @@ -22,13 +22,15 @@ protected Map initPaimonCatalogProperties() { catalogProperties.put("key1", "val1"); catalogProperties.put("key2", "val2"); - catalogProperties.put(PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, "filesystem"); - catalogProperties.put( - PaimonCatalogPropertiesMetadata.WAREHOUSE, + TYPE = "filesystem"; + WAREHOUSE = String.format( "hdfs://%s:%d/user/hive/warehouse-catalog-paimon/", containerSuite.getHiveContainer().getContainerIpAddress(), - HiveContainer.HDFS_DEFAULTFS_PORT)); + HiveContainer.HDFS_DEFAULTFS_PORT); + + catalogProperties.put(PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, TYPE); + catalogProperties.put(PaimonCatalogPropertiesMetadata.WAREHOUSE, WAREHOUSE); return catalogProperties; } diff --git a/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/ops/TestPaimonCatalogOps.java b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/ops/TestPaimonCatalogOps.java new file mode 100644 index 00000000000..7279e91a05a --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/ops/TestPaimonCatalogOps.java @@ -0,0 +1,172 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon.ops; + +import static org.apache.paimon.CoreOptions.BUCKET; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; +import com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonConfig; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.io.File; +import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.Table; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.VarCharType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** Tests for {@link PaimonCatalogOps}. */ +public class TestPaimonCatalogOps { + + private PaimonCatalogOps paimonCatalogOps; + @TempDir private File warehouse; + + private static final String DATABASE = "test_table_ops_database"; + private static final String TABLE = "test_table_ops_table"; + private static final String COMMENT = "table_ops_table_comment"; + private static final NameIdentifier IDENTIFIER = NameIdentifier.of(Namespace.of(DATABASE), TABLE); + private static final Map OPTIONS = ImmutableMap.of(BUCKET.key(), "10"); + + @BeforeEach + public void setUp() throws Exception { + paimonCatalogOps = + new PaimonCatalogOps( + new PaimonConfig( + ImmutableMap.of(PaimonCatalogPropertiesMetadata.WAREHOUSE, warehouse.getPath()))); + createDatabase(); + } + + @AfterEach + public void tearDown() throws Exception { + dropDatabase(); + if (paimonCatalogOps != null) { + paimonCatalogOps.close(); + } + } + + @Test + void testTableOperations() throws Exception { + // list tables + Assertions.assertEquals( + 0, paimonCatalogOps.listTables(IDENTIFIER.namespace().toString()).size()); + + // create table + Pair tableInfo = + Pair.of( + IDENTIFIER.toString(), + Schema.newBuilder() + .column("col_1", DataTypes.INT().notNull(), IntType.class.getSimpleName()) + .column("col_2", DataTypes.STRING(), VarCharType.class.getSimpleName()) + .column("col_3", DataTypes.STRING().notNull(), VarCharType.class.getSimpleName()) + .column( + "col_4", + DataTypes.ARRAY( + RowType.builder() + .field( + "sub_col_1", + DataTypes.DATE(), + RowType.class.getSimpleName() + DateType.class.getSimpleName()) + .field( + "sub_col_2", + DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()), + RowType.class.getSimpleName() + MapType.class.getSimpleName()) + .field( + "sub_col_3", + DataTypes.TIMESTAMP().notNull(), + RowType.class.getSimpleName() + TimestampType.class.getSimpleName()) + .build()), + ArrayType.class.getSimpleName()) + .comment(COMMENT) + .options(OPTIONS) + .build()); + paimonCatalogOps.createTable(tableInfo.getKey(), tableInfo.getValue()); + + // load table + Table table = paimonCatalogOps.loadTable(IDENTIFIER.toString()); + + assertEquals(TABLE, table.name()); + assertTrue(table.comment().isPresent()); + assertEquals( + RowType.builder() + .field("col_1", DataTypes.INT().notNull(), IntType.class.getSimpleName()) + .field("col_2", DataTypes.STRING(), VarCharType.class.getSimpleName()) + .field("col_3", DataTypes.STRING().notNull(), VarCharType.class.getSimpleName()) + .field( + "col_4", + DataTypes.ARRAY( + RowType.builder() + .field( + "sub_col_1", + DataTypes.DATE(), + RowType.class.getSimpleName() + DateType.class.getSimpleName()) + .field( + "sub_col_2", + DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()), + RowType.class.getSimpleName() + MapType.class.getSimpleName()) + .field( + "sub_col_3", + DataTypes.TIMESTAMP().notNull(), + RowType.class.getSimpleName() + TimestampType.class.getSimpleName()) + .build()), + ArrayType.class.getSimpleName()) + .build() + .toString(), + table.rowType().toString()); + assertEquals(COMMENT, table.comment().get()); + assertEquals(OPTIONS.get(BUCKET.key()), table.options().get(BUCKET.key())); + + // TODO: alter table is unsupported now. + + // drop table + Assertions.assertDoesNotThrow(() -> paimonCatalogOps.dropTable(IDENTIFIER.toString())); + Assertions.assertThrowsExactly( + Catalog.TableNotExistException.class, + () -> paimonCatalogOps.dropTable(IDENTIFIER.toString())); + + // list table again + Assertions.assertEquals( + 0, paimonCatalogOps.listTables(IDENTIFIER.namespace().toString()).size()); + + // create a new table to make database not empty to test drop database cascade + paimonCatalogOps.createTable(tableInfo.getKey(), tableInfo.getValue()); + Assertions.assertNotNull(paimonCatalogOps.loadTable(IDENTIFIER.toString())); + } + + private void createDatabase() throws Exception { + // list databases + assertEquals(0, paimonCatalogOps.listDatabases().size()); + + // create database + paimonCatalogOps.createDatabase(DATABASE, Maps.newHashMap()); + assertEquals(1, paimonCatalogOps.listDatabases().size()); + // load database + assertNotNull(paimonCatalogOps.loadDatabase(DATABASE)); + } + + private void dropDatabase() throws Exception { + Assertions.assertEquals(1, paimonCatalogOps.listDatabases().size()); + Assertions.assertEquals(1, paimonCatalogOps.listTables(DATABASE).size()); + paimonCatalogOps.dropDatabase(DATABASE, true); + Assertions.assertTrue(paimonCatalogOps.listDatabases().isEmpty()); + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TestTypeUtils.java b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TestTypeUtils.java new file mode 100644 index 00000000000..f5be7178995 --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TestTypeUtils.java @@ -0,0 +1,196 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon.utils; + +import static com.datastrato.gravitino.catalog.lakehouse.paimon.utils.TypeUtils.fromPaimonType; +import static com.datastrato.gravitino.catalog.lakehouse.paimon.utils.TypeUtils.toPaimonType; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrowsExactly; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.datastrato.gravitino.rel.types.Type; +import com.datastrato.gravitino.rel.types.Type.Name; +import com.datastrato.gravitino.rel.types.Types; +import java.util.Arrays; +import java.util.function.Consumer; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.MultisetType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; +import org.junit.jupiter.api.Test; + +/** Tests for {@link TypeUtils}. */ +public class TestTypeUtils { + + @Test + void testFromAndToPaimonType() { + // Test supported data types. + RowType rowType = + RowType.builder() + .fields( + DataTypes.VARCHAR(10), + DataTypes.STRING(), + DataTypes.BOOLEAN(), + DataTypes.BINARY(BinaryType.MAX_LENGTH), + DataTypes.DECIMAL(8, 3), + DataTypes.TINYINT(), + DataTypes.SMALLINT(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.FLOAT(), + DataTypes.DOUBLE(), + DataTypes.DATE(), + DataTypes.TIME(), + DataTypes.TIMESTAMP(), + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), + DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()), + DataTypes.ARRAY(DataTypes.INT())) + .build(); + + Type gravitinoDataType = toGravitinoDataType(rowType); + + DataType paimonDataType = toPaimonType(gravitinoDataType); + + assertEquals(rowType, paimonDataType); + } + + @Test + void testUnparsedType() { + Arrays.asList( + DataTypes.CHAR(CharType.MAX_LENGTH), DataTypes.VARBINARY(VarBinaryType.MAX_LENGTH)) + .forEach(this::toGravitinoDataType); + } + + @Test + void testUnsupportedType() { + // Test UnsupportedOperationException with IntervalYearType, IntervalDayType, FixedCharType, + // UUIDType, FixedType, UnionType, NullType for toPaimonType. + Arrays.asList( + Types.IntervalYearType.get(), + Types.IntervalDayType.get(), + Types.FixedCharType.of(10), + Types.UUIDType.get(), + Types.FixedType.of(20), + Types.UnionType.of(Types.IntegerType.get()), + Types.NullType.get(), + Types.UnparsedType.of("unparsed")) + .forEach(this::checkUnsupportedType); + } + + private Type toGravitinoDataType(DataType dataType) { + switch (dataType.getTypeRoot()) { + case VARCHAR: + if (((VarCharType) dataType).getLength() == Integer.MAX_VALUE) { + return checkDataType(dataType, Name.STRING); + } else { + return checkDataType( + dataType, + Name.VARCHAR, + type -> + assertEquals( + ((VarCharType) dataType).getLength(), ((Types.VarCharType) type).length())); + } + case BOOLEAN: + return checkDataType(dataType, Name.BOOLEAN); + case BINARY: + return checkDataType(dataType, Name.BINARY); + case DECIMAL: + return checkDataType( + dataType, + Name.DECIMAL, + type -> { + assertEquals( + ((DecimalType) dataType).getPrecision(), ((Types.DecimalType) type).precision()); + assertEquals(((DecimalType) dataType).getScale(), ((Types.DecimalType) type).scale()); + }); + case TINYINT: + return checkDataType(dataType, Name.BYTE); + case SMALLINT: + return checkDataType(dataType, Name.SHORT); + case INTEGER: + return checkDataType(dataType, Name.INTEGER); + case BIGINT: + return checkDataType(dataType, Name.LONG); + case FLOAT: + return checkDataType(dataType, Name.FLOAT); + case DOUBLE: + return checkDataType(dataType, Name.DOUBLE); + case DATE: + return checkDataType(dataType, Name.DATE); + case TIME_WITHOUT_TIME_ZONE: + return checkDataType(dataType, Name.TIME); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return checkDataType( + dataType, + Name.TIMESTAMP, + type -> assertFalse(((Types.TimestampType) type).hasTimeZone())); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return checkDataType( + dataType, + Name.TIMESTAMP, + type -> assertTrue(((Types.TimestampType) type).hasTimeZone())); + case ARRAY: + return checkDataType( + dataType, + Name.LIST, + type -> assertTrue(((Types.ListType) type).elementType() instanceof Types.IntegerType)); + case MULTISET: + return checkDataType( + dataType, + Name.MAP, + type -> { + assertEquals( + fromPaimonType(((MultisetType) dataType).getElementType()), + ((Types.MapType) type).keyType()); + assertTrue(((Types.MapType) type).valueType() instanceof Types.IntegerType); + }); + case MAP: + return checkDataType( + dataType, + Name.MAP, + type -> { + assertEquals( + fromPaimonType(((MapType) dataType).getKeyType()), + ((Types.MapType) type).keyType()); + assertEquals( + fromPaimonType(((MapType) dataType).getValueType()), + ((Types.MapType) type).valueType()); + }); + case ROW: + return checkDataType( + dataType, + Name.STRUCT, + type -> ((RowType) dataType).getFieldTypes().forEach(this::toGravitinoDataType)); + default: + return checkDataType(dataType, Name.UNPARSED); + } + } + + private Type checkDataType(DataType dataType, Name expected) { + return checkDataType(dataType, expected, type -> {}); + } + + private Type checkDataType(DataType dataType, Name expected, Consumer consumer) { + Type actual = fromPaimonType(dataType); + assertEquals(expected, actual.name()); + consumer.accept(actual); + return actual; + } + + private void checkUnsupportedType(Type type) { + UnsupportedOperationException exception = + assertThrowsExactly(UnsupportedOperationException.class, () -> toPaimonType(type)); + assertEquals( + String.format("Paimon does not support Gravitino %s data type.", type.simpleString()), + exception.getMessage()); + } +}