From 7cd80b04029f5fd5f5825a798832788bb06a1f85 Mon Sep 17 00:00:00 2001 From: Qi Yu Date: Mon, 22 Jan 2024 19:14:26 +0800 Subject: [PATCH] [#1510] feat(trino-connector): Support partition, distribution and sort order of Hive table created by Trino (#1539) ### What changes were proposed in this pull request? We can create a Hive table with partitioning, distribution, and sorting ordered by Trino. ### Why are the changes needed? It's a crucial feature of the Trino connector. Fix: #1510 ### Does this PR introduce _any_ user-facing change? User can create a hive table in Trino by: ``` create table t10 (id int, name varchar) with (partitioned_by = ARRAY['name'], bucketed_by = ARRAY['id'], bucket_count = 50); ``` Or create a Hive table and loaded by Trino like ``` trino:db2> show create table t11; Create Table ---------------------------------------------------------------------------------- CREATE TABLE "test.hive_catalog".db2.t11 ( id integer, name varchar(65535) ) COMMENT '' WITH ( bucket_count = 50, bucketed_by = ARRAY['id'], input_format = 'org.apache.hadoop.mapred.TextInputFormat', location = 'hdfs://localhost:9000/user/hive/warehouse/db2.db/t11', output_format = 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', partitioned_by = ARRAY['name'], serde_lib = 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe', serde_name = 't11', table_type = 'MANAGED_TABLE' ) (1 row) ``` ### How was this patch tested? Add some IT --------- Co-authored-by: Jerry Shao --- LICENSE | 3 +- .../gravitino/dto/rel/PartitionUtils.java | 4 +- .../test/trino/TrinoConnectorIT.java | 46 ++++- .../catalog/CatalogConnectorMetadata.java | 13 +- .../CatalogConnectorMetadataAdapter.java | 10 +- .../catalog/hive/HiveMetadataAdapter.java | 183 ++++++++++++++++++ .../catalog/hive/HivePropertyMeta.java | 60 +++++- .../connector/catalog/hive/SortingColumn.java | 113 +++++++++++ .../connector/metadata/GravitinoTable.java | 36 ++++ .../trino/connector/GravitinoMockServer.java | 8 +- 10 files changed, 464 insertions(+), 12 deletions(-) create mode 100644 trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/SortingColumn.java diff --git a/LICENSE b/LICENSE index ec360e66e30..a0e27b20680 100644 --- a/LICENSE +++ b/LICENSE @@ -259,4 +259,5 @@ ./bin/common.sh Trino - ./integration-test/src/main/java/com/datastrato/gravitino/integration/test/util/CloseableGroup.java \ No newline at end of file + ./integration-test/src/main/java/com/datastrato/gravitino/integration/test/util/CloseableGroup.java + ./trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/SortingColumn.java \ No newline at end of file diff --git a/common/src/main/java/com/datastrato/gravitino/dto/rel/PartitionUtils.java b/common/src/main/java/com/datastrato/gravitino/dto/rel/PartitionUtils.java index 0fd0bbd6de0..51d7601c0d3 100644 --- a/common/src/main/java/com/datastrato/gravitino/dto/rel/PartitionUtils.java +++ b/common/src/main/java/com/datastrato/gravitino/dto/rel/PartitionUtils.java @@ -18,7 +18,9 @@ public static void validateFieldExistence(ColumnDTO[] columns, String[] fieldNam List partitionColumn = Arrays.stream(columns) - .filter(c -> c.name().equals(fieldName[0])) + // (TODO) Need to consider the case sensitivity issues. + // To be optimized. + .filter(c -> c.name().equalsIgnoreCase(fieldName[0])) .collect(Collectors.toList()); Preconditions.checkArgument( partitionColumn.size() == 1, "partition field %s not found in table", fieldName[0]); diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java index e3e040a999c..c0ae17b6dc2 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java @@ -11,6 +11,10 @@ import com.datastrato.gravitino.catalog.hive.HiveClientPool; import com.datastrato.gravitino.client.GravitinoMetaLake; import com.datastrato.gravitino.dto.rel.ColumnDTO; +import com.datastrato.gravitino.dto.rel.DistributionDTO; +import com.datastrato.gravitino.dto.rel.SortOrderDTO; +import com.datastrato.gravitino.dto.rel.expressions.FieldReferenceDTO; +import com.datastrato.gravitino.dto.rel.partitions.IdentityPartitioningDTO; import com.datastrato.gravitino.integration.test.catalog.jdbc.utils.JdbcDriverDownloader; import com.datastrato.gravitino.integration.test.container.ContainerSuite; import com.datastrato.gravitino.integration.test.container.HiveContainer; @@ -19,6 +23,16 @@ import com.datastrato.gravitino.integration.test.util.ITUtils; import com.datastrato.gravitino.rel.Schema; import com.datastrato.gravitino.rel.Table; +import com.datastrato.gravitino.rel.expressions.NamedReference; +import com.datastrato.gravitino.rel.expressions.distributions.Distribution; +import com.datastrato.gravitino.rel.expressions.distributions.Distributions; +import com.datastrato.gravitino.rel.expressions.distributions.Strategy; +import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering; +import com.datastrato.gravitino.rel.expressions.sorts.SortDirection; +import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; +import com.datastrato.gravitino.rel.expressions.sorts.SortOrders; +import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.expressions.transforms.Transforms; import com.datastrato.gravitino.rel.types.Types; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -376,7 +390,9 @@ void testHiveTableCreatedByTrino() { String createTableSql = String.format( "CREATE TABLE \"%s.%s\".%s.%s (id int, name varchar)" - + " with ( serde_name = '123455', location = 'hdfs://localhost:9000/user/hive/warehouse/hive_schema.db/hive_table')", + + " with ( serde_name = '123455', location = 'hdfs://localhost:9000/user/hive/warehouse/hive_schema.db/hive_table'" + + ", partitioned_by = ARRAY['name'], bucketed_by = ARRAY['id'], bucket_count = 50, sorted_by = ARRAY['name']" + + ")", metalakeName, catalogName, schemaName, tableName); containerSuite.getTrinoContainer().executeUpdateSQL(createTableSql); @@ -396,6 +412,20 @@ void testHiveTableCreatedByTrino() { Assertions.assertEquals( "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", table.properties().get("output-format")); + + Distribution distribution = table.distribution(); + Assertions.assertEquals(Strategy.HASH, distribution.strategy()); + Assertions.assertEquals(50, distribution.number()); + Assertions.assertEquals( + "id", ((FieldReferenceDTO) ((DistributionDTO) distribution).args()[0]).fieldName()[0]); + + Assertions.assertEquals(1, table.partitioning().length); + Transform partitioning = table.partitioning()[0]; + Assertions.assertEquals("name", ((IdentityPartitioningDTO) partitioning).fieldName()[0]); + + Assertions.assertEquals(1, table.sortOrder().length); + SortOrderDTO sortOrder = (SortOrderDTO) table.sortOrder()[0]; + Assertions.assertEquals("name", ((FieldReferenceDTO) sortOrder.sortTerm()).fieldName()[0]); } @Test @@ -711,7 +741,15 @@ void testHiveTableCreatedByGravitino() throws InterruptedException { "hdfs://localhost:9000/user/hive/warehouse/hive_schema.db/hive_table") .put("serde-name", "mock11") .put("table-type", "EXTERNAL_TABLE") - .build()); + .build(), + new Transform[] {Transforms.identity("BinaryType")}, + Distributions.of(Strategy.HASH, 4, NamedReference.field("BooleanType")), + new SortOrder[] { + SortOrders.of( + NamedReference.field("LongType"), + SortDirection.ASCENDING, + NullOrdering.NULLS_FIRST) + }); LOG.info("create table \"{}.{}\".{}.{}", metalakeName, catalogName, schemaName, tableName); Table table = @@ -739,6 +777,10 @@ void testHiveTableCreatedByGravitino() throws InterruptedException { Assertions.assertTrue( data.contains("input_format = 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'")); Assertions.assertTrue(data.contains("serde_lib = 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'")); + Assertions.assertTrue(data.contains("bucket_count = 4")); + Assertions.assertTrue(data.contains("bucketed_by = ARRAY['booleantype']")); + Assertions.assertTrue(data.contains("partitioned_by = ARRAY['binarytype']")); + Assertions.assertTrue(data.contains("sorted_by = ARRAY['longtype']")); } @Test diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java index b5ed6045188..fd997c08a0b 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java @@ -17,7 +17,6 @@ import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.client.GravitinoMetaLake; -import com.datastrato.gravitino.dto.rel.ColumnDTO; import com.datastrato.gravitino.exceptions.NoSuchCatalogException; import com.datastrato.gravitino.exceptions.NoSuchSchemaException; import com.datastrato.gravitino.exceptions.NoSuchTableException; @@ -122,11 +121,15 @@ public void createTable(GravitinoTable table) { NameIdentifier identifier = NameIdentifier.ofTable( metalake.name(), catalogName, table.getSchemaName(), table.getName()); - ColumnDTO[] gravitinoColumns = table.getColumnDTOs(); - String comment = table.getComment(); - Map properties = table.getProperties(); try { - tableCatalog.createTable(identifier, gravitinoColumns, comment, properties); + tableCatalog.createTable( + identifier, + table.getColumnDTOs(), + table.getComment(), + table.getProperties(), + table.getPartitioning(), + table.getDistribution(), + table.getSortOrders()); } catch (NoSuchSchemaException e) { throw new TrinoException(GRAVITINO_SCHEMA_NOT_EXISTS, "Schema does not exist", e); } catch (TableAlreadyExistsException e) { diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogConnectorMetadataAdapter.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogConnectorMetadataAdapter.java index 1627f783f81..b77a1072489 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogConnectorMetadataAdapter.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogConnectorMetadataAdapter.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.lang3.NotImplementedException; import org.slf4j.Logger; @@ -34,7 +35,7 @@ public class CatalogConnectorMetadataAdapter { protected final List> tableProperties; protected final List> columnProperties; - private final GeneralDataTypeTransformer dataTypeTransformer; + protected final GeneralDataTypeTransformer dataTypeTransformer; protected CatalogConnectorMetadataAdapter( List> schemaProperties, @@ -93,6 +94,13 @@ public GravitinoTable createTable(ConnectorTableMetadata tableMetadata) { return new GravitinoTable(schemaName, tableName, columns, comment, properties); } + protected Map removeKeys( + Map properties, Set keyToDelete) { + return properties.entrySet().stream() + .filter(entry -> !keyToDelete.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + /** Transform trino schema metadata to gravitino schema metadata */ public GravitinoSchema createSchema(String schemaName, Map properties) { return new GravitinoSchema(schemaName, toGravitinoSchemaProperties(properties), ""); diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/HiveMetadataAdapter.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/HiveMetadataAdapter.java index a5be430f364..ac60fa8850d 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/HiveMetadataAdapter.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/HiveMetadataAdapter.java @@ -4,11 +4,45 @@ */ package com.datastrato.gravitino.trino.connector.catalog.hive; +import static com.datastrato.gravitino.trino.connector.catalog.hive.HivePropertyMeta.HIVE_BUCKET_COUNT_KEY; +import static com.datastrato.gravitino.trino.connector.catalog.hive.HivePropertyMeta.HIVE_BUCKET_KEY; +import static com.datastrato.gravitino.trino.connector.catalog.hive.HivePropertyMeta.HIVE_PARTITION_KEY; +import static com.datastrato.gravitino.trino.connector.catalog.hive.HivePropertyMeta.HIVE_SORT_ORDER_KEY; + import com.datastrato.catalog.property.PropertyConverter; +import com.datastrato.gravitino.dto.rel.DistributionDTO; +import com.datastrato.gravitino.dto.rel.expressions.FieldReferenceDTO; +import com.datastrato.gravitino.dto.rel.partitions.Partitioning.SingleFieldPartitioning; +import com.datastrato.gravitino.rel.expressions.Expression; +import com.datastrato.gravitino.rel.expressions.NamedReference; +import com.datastrato.gravitino.rel.expressions.distributions.Distributions; +import com.datastrato.gravitino.rel.expressions.distributions.Strategy; +import com.datastrato.gravitino.rel.expressions.sorts.SortDirection; +import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; +import com.datastrato.gravitino.rel.expressions.sorts.SortOrders; +import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.expressions.transforms.Transforms; +import com.datastrato.gravitino.trino.connector.GravitinoErrorCode; import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter; +import com.datastrato.gravitino.trino.connector.catalog.hive.SortingColumn.Order; +import com.datastrato.gravitino.trino.connector.metadata.GravitinoColumn; +import com.datastrato.gravitino.trino.connector.metadata.GravitinoTable; +import com.google.common.collect.ImmutableSet; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.SchemaTableName; import io.trino.spi.session.PropertyMetadata; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang3.ArrayUtils; /** Transforming gravitino hive metadata to trino. */ public class HiveMetadataAdapter extends CatalogConnectorMetadataAdapter { @@ -16,6 +50,10 @@ public class HiveMetadataAdapter extends CatalogConnectorMetadataAdapter { private final PropertyConverter tableConverter; private final PropertyConverter schemaConverter; + private static final Set HIVE_PROPERTIES_TO_REMOVE = + ImmutableSet.of( + HIVE_PARTITION_KEY, HIVE_BUCKET_KEY, HIVE_BUCKET_COUNT_KEY, HIVE_SORT_ORDER_KEY); + public HiveMetadataAdapter( List> schemaProperties, List> tableProperties, @@ -48,4 +86,149 @@ public Map toGravitinoSchemaProperties(Map prope Map stringMap = schemaConverter.engineToGravitinoProperties(properties); return super.toGravitinoSchemaProperties(stringMap); } + + @Override + public GravitinoTable createTable(ConnectorTableMetadata tableMetadata) { + String tableName = tableMetadata.getTableSchema().getTable().getTableName(); + String schemaName = tableMetadata.getTableSchema().getTable().getSchemaName(); + String comment = tableMetadata.getComment().orElse(""); + + Map propertyMap = tableMetadata.getProperties(); + List partitionColumns = + propertyMap.containsKey(HIVE_PARTITION_KEY) + ? (List) propertyMap.get(HIVE_PARTITION_KEY) + : Collections.EMPTY_LIST; + List bucketColumns = + propertyMap.containsKey(HIVE_BUCKET_KEY) + ? (List) propertyMap.get(HIVE_BUCKET_KEY) + : Collections.EMPTY_LIST; + int bucketCount = + propertyMap.containsKey(HIVE_BUCKET_COUNT_KEY) + ? (int) propertyMap.get(HIVE_BUCKET_COUNT_KEY) + : 0; + List sortColumns = + propertyMap.containsKey(HIVE_SORT_ORDER_KEY) + ? (List) propertyMap.get(HIVE_SORT_ORDER_KEY) + : Collections.EMPTY_LIST; + + if (!sortColumns.isEmpty() && (bucketColumns.isEmpty() || bucketCount == 0)) { + throw new TrinoException( + GravitinoErrorCode.GRAVITINO_ILLEGAL_ARGUMENT, + "Sort columns can only be set when bucket columns and bucket count are set"); + } + + Map properties = + toGravitinoTableProperties( + removeKeys(tableMetadata.getProperties(), HIVE_PROPERTIES_TO_REMOVE)); + + List columns = new ArrayList<>(); + for (int i = 0; i < tableMetadata.getColumns().size(); i++) { + ColumnMetadata column = tableMetadata.getColumns().get(i); + columns.add( + new GravitinoColumn( + column.getName(), + dataTypeTransformer.getGravitinoType(column.getType()), + i, + column.getComment(), + column.isNullable())); + } + GravitinoTable gravitinoTable = + new GravitinoTable(schemaName, tableName, columns, comment, properties); + + if (!partitionColumns.isEmpty()) { + Transform[] partitioning = + partitionColumns.stream().map(Transforms::identity).toArray(Transform[]::new); + gravitinoTable.setPartitioning(partitioning); + } + + if (!bucketColumns.isEmpty()) { + Expression[] bucketing = + bucketColumns.stream().map(NamedReference::field).toArray(Expression[]::new); + gravitinoTable.setDistribution(Distributions.of(Strategy.HASH, bucketCount, bucketing)); + } + + if (!sortColumns.isEmpty()) { + SortOrder[] sorting = + sortColumns.stream() + .map( + sortingColumn -> { + Expression expression = NamedReference.field(sortingColumn.getColumnName()); + SortDirection sortDirection = + sortingColumn.getOrder() == Order.ASCENDING + ? SortDirection.ASCENDING + : SortDirection.DESCENDING; + return SortOrders.of(expression, sortDirection); + }) + .toArray(SortOrder[]::new); + gravitinoTable.setSortOrders(sorting); + } + + return gravitinoTable; + } + + @Override + public ConnectorTableMetadata getTableMetadata(GravitinoTable gravitinoTable) { + SchemaTableName schemaTableName = + new SchemaTableName(gravitinoTable.getSchemaName(), gravitinoTable.getName()); + ArrayList columnMetadataList = new ArrayList<>(); + for (GravitinoColumn column : gravitinoTable.getColumns()) { + columnMetadataList.add(getColumnMetadata(column)); + } + + Map properties = toTrinoTableProperties(gravitinoTable.getProperties()); + + if (ArrayUtils.isNotEmpty(gravitinoTable.getPartitioning())) { + // Only support simple partition now like partition by a, b, c. + // Format like partition like partition by year(a), b, c is NOT supported now. + properties.put( + HIVE_PARTITION_KEY, + gravitinoTable.getPartitioning().length > 0 + ? Arrays.stream(gravitinoTable.getPartitioning()) + .map( + ts -> + ((SingleFieldPartitioning) ts).fieldName()[0].toLowerCase(Locale.ENGLISH)) + .collect(Collectors.toList()) + : Collections.EMPTY_LIST); + } + + if (gravitinoTable.getDistribution() != null + && !DistributionDTO.NONE.equals(gravitinoTable.getDistribution())) { + properties.put( + HIVE_BUCKET_KEY, + Arrays.stream(gravitinoTable.getDistribution().expressions()) + .map(ts -> ((FieldReferenceDTO) ts).fieldName()[0].toLowerCase(Locale.ENGLISH)) + .collect(Collectors.toList())); + + properties.put(HIVE_BUCKET_COUNT_KEY, gravitinoTable.getDistribution().number()); + } + + if (ArrayUtils.isNotEmpty(gravitinoTable.getSortOrders())) { + // Only support the simple format + properties.put( + HIVE_SORT_ORDER_KEY, + Arrays.stream(gravitinoTable.getSortOrders()) + .map( + sortOrder -> { + Expression expression = sortOrder.expression(); + SortDirection sortDirection = + sortOrder.direction() == SortDirection.ASCENDING + ? SortDirection.ASCENDING + : SortDirection.DESCENDING; + Order order = + sortDirection == SortDirection.ASCENDING + ? Order.ASCENDING + : Order.DESCENDING; + return new SortingColumn( + ((FieldReferenceDTO) expression).fieldName()[0].toLowerCase(Locale.ENGLISH), + order); + }) + .collect(Collectors.toList())); + } + + return new ConnectorTableMetadata( + schemaTableName, + columnMetadataList, + properties, + Optional.ofNullable(gravitinoTable.getComment())); + } } diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/HivePropertyMeta.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/HivePropertyMeta.java index 4c6230e5641..a220d9460d2 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/HivePropertyMeta.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/HivePropertyMeta.java @@ -5,13 +5,18 @@ package com.datastrato.gravitino.trino.connector.catalog.hive; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.spi.session.PropertyMetadata.booleanProperty; import static io.trino.spi.session.PropertyMetadata.enumProperty; +import static io.trino.spi.session.PropertyMetadata.integerProperty; import static io.trino.spi.session.PropertyMetadata.stringProperty; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Locale.ENGLISH; import com.datastrato.gravitino.trino.connector.catalog.HasPropertyMeta; import com.google.common.collect.ImmutableList; import io.trino.spi.session.PropertyMetadata; +import io.trino.spi.type.ArrayType; import java.util.List; /** Implementation of {@link HasPropertyMeta} for Hive catalog. */ @@ -21,6 +26,11 @@ public class HivePropertyMeta implements HasPropertyMeta { ImmutableList.of( stringProperty("location", "Hive storage location for the schema", null, false)); + public static final String HIVE_PARTITION_KEY = "partitioned_by"; + public static final String HIVE_BUCKET_KEY = "bucketed_by"; + public static final String HIVE_BUCKET_COUNT_KEY = "bucket_count"; + public static final String HIVE_SORT_ORDER_KEY = "sorted_by"; + private static final List> TABLE_PROPERTY_META = ImmutableList.of( stringProperty("format", "Hive storage format for the table", "TEXTFILE", false), @@ -44,7 +54,55 @@ public class HivePropertyMeta implements HasPropertyMeta { "The serde library class for the table", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", false), - stringProperty("serde_name", "Name of the serde, table name by default", null, false)); + stringProperty("serde_name", "Name of the serde, table name by default", null, false), + new PropertyMetadata<>( + HIVE_PARTITION_KEY, + "Partition columns", + new ArrayType(VARCHAR), + List.class, + ImmutableList.of(), + false, + value -> + ((List) value) + .stream() + .map(name -> ((String) name).toLowerCase(ENGLISH)) + .collect(toImmutableList()), + value -> value), + new PropertyMetadata<>( + HIVE_BUCKET_KEY, + "Bucketing columns", + new ArrayType(VARCHAR), + List.class, + ImmutableList.of(), + false, + value -> + ((List) value) + .stream() + .map(name -> ((String) name).toLowerCase(ENGLISH)) + .collect(toImmutableList()), + value -> value), + integerProperty( + HIVE_BUCKET_COUNT_KEY, "The number of buckets for the table", null, false), + new PropertyMetadata<>( + HIVE_SORT_ORDER_KEY, + "Bucket sorting columns", + new ArrayType(VARCHAR), + List.class, + ImmutableList.of(), + false, + value -> + ((List) value) + .stream() + .map(String.class::cast) + .map(String::toLowerCase) + .map(SortingColumn::sortingColumnFromString) + .collect(toImmutableList()), + value -> + ((List) value) + .stream() + .map(SortingColumn.class::cast) + .map(SortingColumn::sortingColumnToString) + .collect(toImmutableList()))); enum CatalogStorageFormat { AVRO, diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/SortingColumn.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/SortingColumn.java new file mode 100644 index 00000000000..933bc5ef6f9 --- /dev/null +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/SortingColumn.java @@ -0,0 +1,113 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastrato.gravitino.trino.connector.catalog.hive; + +import static com.datastrato.gravitino.trino.connector.catalog.hive.SortingColumn.Order.ASCENDING; +import static com.datastrato.gravitino.trino.connector.catalog.hive.SortingColumn.Order.DESCENDING; +import static com.google.common.base.MoreObjects.toStringHelper; +import static io.trino.spi.connector.SortOrder.ASC_NULLS_FIRST; +import static io.trino.spi.connector.SortOrder.DESC_NULLS_LAST; +import static java.util.Locale.ENGLISH; +import static java.util.Objects.requireNonNull; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.errorprone.annotations.Immutable; +import io.trino.spi.connector.SortOrder; +import java.util.Objects; + +// This class is referred from Trino: +// plugin/trino-gravitino/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/SortingColumn.java +@Immutable +public class SortingColumn { + public enum Order { + ASCENDING(ASC_NULLS_FIRST, 1), + DESCENDING(DESC_NULLS_LAST, 0); + + private final SortOrder sortOrder; + private final int hiveOrder; + + Order(SortOrder sortOrder, int hiveOrder) { + this.sortOrder = requireNonNull(sortOrder, "sortOrder is null"); + this.hiveOrder = hiveOrder; + } + + public SortOrder getSortOrder() { + return sortOrder; + } + + public int getHiveOrder() { + return hiveOrder; + } + } + + private final String columnName; + private final Order order; + + @JsonCreator + public SortingColumn( + @JsonProperty("columnName") String columnName, @JsonProperty("order") Order order) { + this.columnName = requireNonNull(columnName, "columnName is null"); + this.order = requireNonNull(order, "order is null"); + } + + @JsonProperty + public String getColumnName() { + return columnName; + } + + @JsonProperty + public Order getOrder() { + return order; + } + + @Override + public String toString() { + return toStringHelper(this).add("columnName", columnName).add("order", order).toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SortingColumn that = (SortingColumn) o; + return Objects.equals(columnName, that.columnName) && order == that.order; + } + + @Override + public int hashCode() { + return Objects.hash(columnName, order); + } + + public static String sortingColumnToString(SortingColumn column) { + return column.getColumnName() + ((column.getOrder() == DESCENDING) ? " DESC" : ""); + } + + public static SortingColumn sortingColumnFromString(String name) { + SortingColumn.Order order = ASCENDING; + String lower = name.toUpperCase(ENGLISH); + if (lower.endsWith(" ASC")) { + name = name.substring(0, name.length() - 4).trim(); + } else if (lower.endsWith(" DESC")) { + name = name.substring(0, name.length() - 5).trim(); + order = DESCENDING; + } + return new SortingColumn(name, order); + } +} diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/metadata/GravitinoTable.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/metadata/GravitinoTable.java index 329449508d8..50bf85f3f3e 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/metadata/GravitinoTable.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/metadata/GravitinoTable.java @@ -11,6 +11,10 @@ import com.datastrato.gravitino.dto.rel.TableDTO; import com.datastrato.gravitino.rel.Column; import com.datastrato.gravitino.rel.Table; +import com.datastrato.gravitino.rel.expressions.distributions.Distribution; +import com.datastrato.gravitino.rel.expressions.distributions.Distributions; +import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; +import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.collect.ImmutableList; import io.trino.spi.TrinoException; @@ -28,6 +32,10 @@ public class GravitinoTable { private final String comment; private final Map properties; + private SortOrder[] sortOrders = new SortOrder[0]; + private Transform[] partitioning = new Transform[0]; + private Distribution distribution = Distributions.NONE; + @JsonCreator public GravitinoTable(String schemaName, String tableName, Table tableMetadata) { this.schemaName = schemaName; @@ -40,6 +48,10 @@ public GravitinoTable(String schemaName, String tableName, Table tableMetadata) this.columns = tableColumns.build(); this.comment = tableMetadata.comment(); properties = tableMetadata.properties(); + + sortOrders = tableMetadata.sortOrder(); + partitioning = tableMetadata.partitioning(); + distribution = tableMetadata.distribution(); } public GravitinoTable( @@ -125,4 +137,28 @@ public TableDTO getTableDTO() { .withAudit(new AuditDTO.Builder().build()) .build(); } + + public void setSortOrders(SortOrder[] sortOrders) { + this.sortOrders = sortOrders; + } + + public void setPartitioning(Transform[] partitioning) { + this.partitioning = partitioning; + } + + public void setDistribution(Distribution distribution) { + this.distribution = distribution; + } + + public SortOrder[] getSortOrders() { + return sortOrders; + } + + public Transform[] getPartitioning() { + return partitioning; + } + + public Distribution getDistribution() { + return distribution; + } } diff --git a/trino-connector/src/test/java/com/datastrato/gravitino/trino/connector/GravitinoMockServer.java b/trino-connector/src/test/java/com/datastrato/gravitino/trino/connector/GravitinoMockServer.java index 18f1fa81897..eee0d1f674d 100644 --- a/trino-connector/src/test/java/com/datastrato/gravitino/trino/connector/GravitinoMockServer.java +++ b/trino-connector/src/test/java/com/datastrato/gravitino/trino/connector/GravitinoMockServer.java @@ -311,7 +311,13 @@ public Schema answer(InvocationOnMock invocation) throws Throwable { private TableCatalog createTableCatalog(NameIdentifier catalogName) { TableCatalog tableCatalog = mock(TableCatalog.class); when(tableCatalog.createTable( - any(NameIdentifier.class), any(Column[].class), anyString(), anyMap())) + any(NameIdentifier.class), + any(Column[].class), + anyString(), + anyMap(), + any(), + any(), + any())) .thenAnswer( new Answer() { @Override