Skip to content

Commit

Permalink
[#1510] feat(trino-connector): Support partition, distribution and so…
Browse files Browse the repository at this point in the history
…rt order of Hive table created by Trino (#1539)

### What changes were proposed in this pull request?

We can create a Hive table with partitioning, distribution, and sorting
ordered by Trino.

### Why are the changes needed?

It's a crucial feature of the Trino connector. 

Fix: #1510 

### Does this PR introduce _any_ user-facing change?

User can create a hive table in Trino by:
```
create table t10 (id int, name varchar) with (partitioned_by = ARRAY['name'], bucketed_by = ARRAY['id'], bucket_count = 50);
```
Or  create a Hive table and loaded by Trino like 
```
trino:db2> show create table t11;
                                   Create Table
----------------------------------------------------------------------------------
 CREATE TABLE "test.hive_catalog".db2.t11 (
    id integer,
    name varchar(65535)
 )
 COMMENT ''
 WITH (
    bucket_count = 50,
    bucketed_by = ARRAY['id'],
    input_format = 'org.apache.hadoop.mapred.TextInputFormat',
    location = 'hdfs://localhost:9000/user/hive/warehouse/db2.db/t11',
    output_format = 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
    partitioned_by = ARRAY['name'],
    serde_lib = 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe',
    serde_name = 't11',
    table_type = 'MANAGED_TABLE'
 )
(1 row)
```

### How was this patch tested?

Add some IT

---------

Co-authored-by: Jerry Shao <[email protected]>
  • Loading branch information
yuqi1129 and jerryshao authored Jan 22, 2024
1 parent f145099 commit 7cd80b0
Show file tree
Hide file tree
Showing 10 changed files with 464 additions and 12 deletions.
3 changes: 2 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,5 @@
./bin/common.sh

Trino
./integration-test/src/main/java/com/datastrato/gravitino/integration/test/util/CloseableGroup.java
./integration-test/src/main/java/com/datastrato/gravitino/integration/test/util/CloseableGroup.java
./trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/hive/SortingColumn.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ public static void validateFieldExistence(ColumnDTO[] columns, String[] fieldNam

List<ColumnDTO> partitionColumn =
Arrays.stream(columns)
.filter(c -> c.name().equals(fieldName[0]))
// (TODO) Need to consider the case sensitivity issues.
// To be optimized.
.filter(c -> c.name().equalsIgnoreCase(fieldName[0]))
.collect(Collectors.toList());
Preconditions.checkArgument(
partitionColumn.size() == 1, "partition field %s not found in table", fieldName[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
import com.datastrato.gravitino.catalog.hive.HiveClientPool;
import com.datastrato.gravitino.client.GravitinoMetaLake;
import com.datastrato.gravitino.dto.rel.ColumnDTO;
import com.datastrato.gravitino.dto.rel.DistributionDTO;
import com.datastrato.gravitino.dto.rel.SortOrderDTO;
import com.datastrato.gravitino.dto.rel.expressions.FieldReferenceDTO;
import com.datastrato.gravitino.dto.rel.partitions.IdentityPartitioningDTO;
import com.datastrato.gravitino.integration.test.catalog.jdbc.utils.JdbcDriverDownloader;
import com.datastrato.gravitino.integration.test.container.ContainerSuite;
import com.datastrato.gravitino.integration.test.container.HiveContainer;
Expand All @@ -19,6 +23,16 @@
import com.datastrato.gravitino.integration.test.util.ITUtils;
import com.datastrato.gravitino.rel.Schema;
import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.rel.expressions.NamedReference;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.distributions.Distributions;
import com.datastrato.gravitino.rel.expressions.distributions.Strategy;
import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering;
import com.datastrato.gravitino.rel.expressions.sorts.SortDirection;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrders;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.expressions.transforms.Transforms;
import com.datastrato.gravitino.rel.types.Types;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -376,7 +390,9 @@ void testHiveTableCreatedByTrino() {
String createTableSql =
String.format(
"CREATE TABLE \"%s.%s\".%s.%s (id int, name varchar)"
+ " with ( serde_name = '123455', location = 'hdfs://localhost:9000/user/hive/warehouse/hive_schema.db/hive_table')",
+ " with ( serde_name = '123455', location = 'hdfs://localhost:9000/user/hive/warehouse/hive_schema.db/hive_table'"
+ ", partitioned_by = ARRAY['name'], bucketed_by = ARRAY['id'], bucket_count = 50, sorted_by = ARRAY['name']"
+ ")",
metalakeName, catalogName, schemaName, tableName);
containerSuite.getTrinoContainer().executeUpdateSQL(createTableSql);

Expand All @@ -396,6 +412,20 @@ void testHiveTableCreatedByTrino() {
Assertions.assertEquals(
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
table.properties().get("output-format"));

Distribution distribution = table.distribution();
Assertions.assertEquals(Strategy.HASH, distribution.strategy());
Assertions.assertEquals(50, distribution.number());
Assertions.assertEquals(
"id", ((FieldReferenceDTO) ((DistributionDTO) distribution).args()[0]).fieldName()[0]);

Assertions.assertEquals(1, table.partitioning().length);
Transform partitioning = table.partitioning()[0];
Assertions.assertEquals("name", ((IdentityPartitioningDTO) partitioning).fieldName()[0]);

Assertions.assertEquals(1, table.sortOrder().length);
SortOrderDTO sortOrder = (SortOrderDTO) table.sortOrder()[0];
Assertions.assertEquals("name", ((FieldReferenceDTO) sortOrder.sortTerm()).fieldName()[0]);
}

@Test
Expand Down Expand Up @@ -711,7 +741,15 @@ void testHiveTableCreatedByGravitino() throws InterruptedException {
"hdfs://localhost:9000/user/hive/warehouse/hive_schema.db/hive_table")
.put("serde-name", "mock11")
.put("table-type", "EXTERNAL_TABLE")
.build());
.build(),
new Transform[] {Transforms.identity("BinaryType")},
Distributions.of(Strategy.HASH, 4, NamedReference.field("BooleanType")),
new SortOrder[] {
SortOrders.of(
NamedReference.field("LongType"),
SortDirection.ASCENDING,
NullOrdering.NULLS_FIRST)
});
LOG.info("create table \"{}.{}\".{}.{}", metalakeName, catalogName, schemaName, tableName);

Table table =
Expand Down Expand Up @@ -739,6 +777,10 @@ void testHiveTableCreatedByGravitino() throws InterruptedException {
Assertions.assertTrue(
data.contains("input_format = 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'"));
Assertions.assertTrue(data.contains("serde_lib = 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'"));
Assertions.assertTrue(data.contains("bucket_count = 4"));
Assertions.assertTrue(data.contains("bucketed_by = ARRAY['booleantype']"));
Assertions.assertTrue(data.contains("partitioned_by = ARRAY['binarytype']"));
Assertions.assertTrue(data.contains("sorted_by = ARRAY['longtype']"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.client.GravitinoMetaLake;
import com.datastrato.gravitino.dto.rel.ColumnDTO;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NoSuchTableException;
Expand Down Expand Up @@ -122,11 +121,15 @@ public void createTable(GravitinoTable table) {
NameIdentifier identifier =
NameIdentifier.ofTable(
metalake.name(), catalogName, table.getSchemaName(), table.getName());
ColumnDTO[] gravitinoColumns = table.getColumnDTOs();
String comment = table.getComment();
Map<String, String> properties = table.getProperties();
try {
tableCatalog.createTable(identifier, gravitinoColumns, comment, properties);
tableCatalog.createTable(
identifier,
table.getColumnDTOs(),
table.getComment(),
table.getProperties(),
table.getPartitioning(),
table.getDistribution(),
table.getSortOrders());
} catch (NoSuchSchemaException e) {
throw new TrinoException(GRAVITINO_SCHEMA_NOT_EXISTS, "Schema does not exist", e);
} catch (TableAlreadyExistsException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
Expand All @@ -34,7 +35,7 @@ public class CatalogConnectorMetadataAdapter {
protected final List<PropertyMetadata<?>> tableProperties;
protected final List<PropertyMetadata<?>> columnProperties;

private final GeneralDataTypeTransformer dataTypeTransformer;
protected final GeneralDataTypeTransformer dataTypeTransformer;

protected CatalogConnectorMetadataAdapter(
List<PropertyMetadata<?>> schemaProperties,
Expand Down Expand Up @@ -93,6 +94,13 @@ public GravitinoTable createTable(ConnectorTableMetadata tableMetadata) {
return new GravitinoTable(schemaName, tableName, columns, comment, properties);
}

protected Map<String, Object> removeKeys(
Map<String, Object> properties, Set<String> keyToDelete) {
return properties.entrySet().stream()
.filter(entry -> !keyToDelete.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

/** Transform trino schema metadata to gravitino schema metadata */
public GravitinoSchema createSchema(String schemaName, Map<String, Object> properties) {
return new GravitinoSchema(schemaName, toGravitinoSchemaProperties(properties), "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,56 @@
*/
package com.datastrato.gravitino.trino.connector.catalog.hive;

import static com.datastrato.gravitino.trino.connector.catalog.hive.HivePropertyMeta.HIVE_BUCKET_COUNT_KEY;
import static com.datastrato.gravitino.trino.connector.catalog.hive.HivePropertyMeta.HIVE_BUCKET_KEY;
import static com.datastrato.gravitino.trino.connector.catalog.hive.HivePropertyMeta.HIVE_PARTITION_KEY;
import static com.datastrato.gravitino.trino.connector.catalog.hive.HivePropertyMeta.HIVE_SORT_ORDER_KEY;

import com.datastrato.catalog.property.PropertyConverter;
import com.datastrato.gravitino.dto.rel.DistributionDTO;
import com.datastrato.gravitino.dto.rel.expressions.FieldReferenceDTO;
import com.datastrato.gravitino.dto.rel.partitions.Partitioning.SingleFieldPartitioning;
import com.datastrato.gravitino.rel.expressions.Expression;
import com.datastrato.gravitino.rel.expressions.NamedReference;
import com.datastrato.gravitino.rel.expressions.distributions.Distributions;
import com.datastrato.gravitino.rel.expressions.distributions.Strategy;
import com.datastrato.gravitino.rel.expressions.sorts.SortDirection;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrders;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.expressions.transforms.Transforms;
import com.datastrato.gravitino.trino.connector.GravitinoErrorCode;
import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
import com.datastrato.gravitino.trino.connector.catalog.hive.SortingColumn.Order;
import com.datastrato.gravitino.trino.connector.metadata.GravitinoColumn;
import com.datastrato.gravitino.trino.connector.metadata.GravitinoTable;
import com.google.common.collect.ImmutableSet;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.session.PropertyMetadata;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;

/** Transforming gravitino hive metadata to trino. */
public class HiveMetadataAdapter extends CatalogConnectorMetadataAdapter {

private final PropertyConverter tableConverter;
private final PropertyConverter schemaConverter;

private static final Set<String> HIVE_PROPERTIES_TO_REMOVE =
ImmutableSet.of(
HIVE_PARTITION_KEY, HIVE_BUCKET_KEY, HIVE_BUCKET_COUNT_KEY, HIVE_SORT_ORDER_KEY);

public HiveMetadataAdapter(
List<PropertyMetadata<?>> schemaProperties,
List<PropertyMetadata<?>> tableProperties,
Expand Down Expand Up @@ -48,4 +86,149 @@ public Map<String, String> toGravitinoSchemaProperties(Map<String, Object> prope
Map<String, Object> stringMap = schemaConverter.engineToGravitinoProperties(properties);
return super.toGravitinoSchemaProperties(stringMap);
}

@Override
public GravitinoTable createTable(ConnectorTableMetadata tableMetadata) {
String tableName = tableMetadata.getTableSchema().getTable().getTableName();
String schemaName = tableMetadata.getTableSchema().getTable().getSchemaName();
String comment = tableMetadata.getComment().orElse("");

Map<String, Object> propertyMap = tableMetadata.getProperties();
List<String> partitionColumns =
propertyMap.containsKey(HIVE_PARTITION_KEY)
? (List<String>) propertyMap.get(HIVE_PARTITION_KEY)
: Collections.EMPTY_LIST;
List<String> bucketColumns =
propertyMap.containsKey(HIVE_BUCKET_KEY)
? (List<String>) propertyMap.get(HIVE_BUCKET_KEY)
: Collections.EMPTY_LIST;
int bucketCount =
propertyMap.containsKey(HIVE_BUCKET_COUNT_KEY)
? (int) propertyMap.get(HIVE_BUCKET_COUNT_KEY)
: 0;
List<SortingColumn> sortColumns =
propertyMap.containsKey(HIVE_SORT_ORDER_KEY)
? (List<SortingColumn>) propertyMap.get(HIVE_SORT_ORDER_KEY)
: Collections.EMPTY_LIST;

if (!sortColumns.isEmpty() && (bucketColumns.isEmpty() || bucketCount == 0)) {
throw new TrinoException(
GravitinoErrorCode.GRAVITINO_ILLEGAL_ARGUMENT,
"Sort columns can only be set when bucket columns and bucket count are set");
}

Map<String, String> properties =
toGravitinoTableProperties(
removeKeys(tableMetadata.getProperties(), HIVE_PROPERTIES_TO_REMOVE));

List<GravitinoColumn> columns = new ArrayList<>();
for (int i = 0; i < tableMetadata.getColumns().size(); i++) {
ColumnMetadata column = tableMetadata.getColumns().get(i);
columns.add(
new GravitinoColumn(
column.getName(),
dataTypeTransformer.getGravitinoType(column.getType()),
i,
column.getComment(),
column.isNullable()));
}
GravitinoTable gravitinoTable =
new GravitinoTable(schemaName, tableName, columns, comment, properties);

if (!partitionColumns.isEmpty()) {
Transform[] partitioning =
partitionColumns.stream().map(Transforms::identity).toArray(Transform[]::new);
gravitinoTable.setPartitioning(partitioning);
}

if (!bucketColumns.isEmpty()) {
Expression[] bucketing =
bucketColumns.stream().map(NamedReference::field).toArray(Expression[]::new);
gravitinoTable.setDistribution(Distributions.of(Strategy.HASH, bucketCount, bucketing));
}

if (!sortColumns.isEmpty()) {
SortOrder[] sorting =
sortColumns.stream()
.map(
sortingColumn -> {
Expression expression = NamedReference.field(sortingColumn.getColumnName());
SortDirection sortDirection =
sortingColumn.getOrder() == Order.ASCENDING
? SortDirection.ASCENDING
: SortDirection.DESCENDING;
return SortOrders.of(expression, sortDirection);
})
.toArray(SortOrder[]::new);
gravitinoTable.setSortOrders(sorting);
}

return gravitinoTable;
}

@Override
public ConnectorTableMetadata getTableMetadata(GravitinoTable gravitinoTable) {
SchemaTableName schemaTableName =
new SchemaTableName(gravitinoTable.getSchemaName(), gravitinoTable.getName());
ArrayList<ColumnMetadata> columnMetadataList = new ArrayList<>();
for (GravitinoColumn column : gravitinoTable.getColumns()) {
columnMetadataList.add(getColumnMetadata(column));
}

Map<String, Object> properties = toTrinoTableProperties(gravitinoTable.getProperties());

if (ArrayUtils.isNotEmpty(gravitinoTable.getPartitioning())) {
// Only support simple partition now like partition by a, b, c.
// Format like partition like partition by year(a), b, c is NOT supported now.
properties.put(
HIVE_PARTITION_KEY,
gravitinoTable.getPartitioning().length > 0
? Arrays.stream(gravitinoTable.getPartitioning())
.map(
ts ->
((SingleFieldPartitioning) ts).fieldName()[0].toLowerCase(Locale.ENGLISH))
.collect(Collectors.toList())
: Collections.EMPTY_LIST);
}

if (gravitinoTable.getDistribution() != null
&& !DistributionDTO.NONE.equals(gravitinoTable.getDistribution())) {
properties.put(
HIVE_BUCKET_KEY,
Arrays.stream(gravitinoTable.getDistribution().expressions())
.map(ts -> ((FieldReferenceDTO) ts).fieldName()[0].toLowerCase(Locale.ENGLISH))
.collect(Collectors.toList()));

properties.put(HIVE_BUCKET_COUNT_KEY, gravitinoTable.getDistribution().number());
}

if (ArrayUtils.isNotEmpty(gravitinoTable.getSortOrders())) {
// Only support the simple format
properties.put(
HIVE_SORT_ORDER_KEY,
Arrays.stream(gravitinoTable.getSortOrders())
.map(
sortOrder -> {
Expression expression = sortOrder.expression();
SortDirection sortDirection =
sortOrder.direction() == SortDirection.ASCENDING
? SortDirection.ASCENDING
: SortDirection.DESCENDING;
Order order =
sortDirection == SortDirection.ASCENDING
? Order.ASCENDING
: Order.DESCENDING;
return new SortingColumn(
((FieldReferenceDTO) expression).fieldName()[0].toLowerCase(Locale.ENGLISH),
order);
})
.collect(Collectors.toList()));
}

return new ConnectorTableMetadata(
schemaTableName,
columnMetadataList,
properties,
Optional.ofNullable(gravitinoTable.getComment()));
}
}
Loading

0 comments on commit 7cd80b0

Please sign in to comment.