diff --git a/api/src/main/java/com/datastrato/gravitino/rel/Table.java b/api/src/main/java/com/datastrato/gravitino/rel/Table.java index 0f3f7c22a44..7e6bbfbc2a9 100644 --- a/api/src/main/java/com/datastrato/gravitino/rel/Table.java +++ b/api/src/main/java/com/datastrato/gravitino/rel/Table.java @@ -12,6 +12,8 @@ import com.datastrato.gravitino.rel.expressions.distributions.Distributions; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.indexes.Index; +import com.datastrato.gravitino.rel.indexes.Indexes; import java.util.Collections; import java.util.Map; import javax.annotation.Nullable; @@ -48,6 +50,14 @@ default Distribution distribution() { return Distributions.NONE; } + /** + * @return The indexes of the table. If no indexes are specified, Indexes.EMPTY_INDEXES is + * returned. + */ + default Index[] index() { + return Indexes.EMPTY_INDEXES; + } + /** @return The comment of the table. Null is returned if no comment is set. */ @Nullable default String comment() { diff --git a/api/src/main/java/com/datastrato/gravitino/rel/TableCatalog.java b/api/src/main/java/com/datastrato/gravitino/rel/TableCatalog.java index b4231cb93ad..fdaece66326 100644 --- a/api/src/main/java/com/datastrato/gravitino/rel/TableCatalog.java +++ b/api/src/main/java/com/datastrato/gravitino/rel/TableCatalog.java @@ -31,6 +31,8 @@ import com.datastrato.gravitino.rel.expressions.distributions.Distributions; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.indexes.Index; +import com.datastrato.gravitino.rel.indexes.Indexes; import java.util.Map; /** @@ -197,7 +199,7 @@ default Table createTable( * @throws NoSuchSchemaException If the schema does not exist. * @throws TableAlreadyExistsException If the table already exists. */ - Table createTable( + default Table createTable( NameIdentifier ident, Column[] columns, String comment, @@ -205,6 +207,42 @@ Table createTable( Transform[] partitions, Distribution distribution, SortOrder[] sortOrders) + throws NoSuchSchemaException, TableAlreadyExistsException { + return createTable( + ident, + columns, + comment, + properties, + partitions, + distribution, + sortOrders, + Indexes.EMPTY_INDEXES); + } + + /** + * Create a table in the catalog. + * + * @param ident A table identifier. + * @param columns The columns of the new table. + * @param comment The table comment. + * @param properties The table properties. + * @param distribution The distribution of the table + * @param sortOrders The sort orders of the table + * @param partitions The table partitioning. + * @param indexes The table indexes. + * @return The created table metadata. + * @throws NoSuchSchemaException If the schema does not exist. + * @throws TableAlreadyExistsException If the table already exists. + */ + Table createTable( + NameIdentifier ident, + Column[] columns, + String comment, + Map properties, + Transform[] partitions, + Distribution distribution, + SortOrder[] sortOrders, + Index[] indexes) throws NoSuchSchemaException, TableAlreadyExistsException; /** diff --git a/api/src/main/java/com/datastrato/gravitino/rel/indexes/Index.java b/api/src/main/java/com/datastrato/gravitino/rel/indexes/Index.java new file mode 100644 index 00000000000..95f81066101 --- /dev/null +++ b/api/src/main/java/com/datastrato/gravitino/rel/indexes/Index.java @@ -0,0 +1,29 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.rel.indexes; + +/** + * The Index interface defines methods for implementing table index columns. Currently, settings for + * PRIMARY_KEY and UNIQUE_KEY are provided. + */ +public interface Index { + + /** @return The type of the index. eg: PRIMARY_KEY and UNIQUE_KEY. */ + IndexType type(); + + /** @return The name of the index. */ + String name(); + + /** @return The field name under the table contained in the index. eg: table.id */ + String[][] fieldNames(); + + enum IndexType { + /** Primary key index. */ + PRIMARY_KEY, + /** Unique key index. */ + UNIQUE_KEY, + } +} diff --git a/api/src/main/java/com/datastrato/gravitino/rel/indexes/Indexes.java b/api/src/main/java/com/datastrato/gravitino/rel/indexes/Indexes.java new file mode 100644 index 00000000000..f91e7e5ceb9 --- /dev/null +++ b/api/src/main/java/com/datastrato/gravitino/rel/indexes/Indexes.java @@ -0,0 +1,104 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.rel.indexes; + +/** Helper methods to create index to pass into Gravitino. */ +public class Indexes { + + public static final Index[] EMPTY_INDEXES = new Index[0]; + private static final String UNIQUE_KEY_FORMAT = "%s_uk"; + private static final String PRIMARY_KEY_FORMAT = "%s_pk"; + + public static Index unique(String fieldName) { + String[] fieldNames = {fieldName}; + return unique(new String[][] {fieldNames}); + } + + public static Index unique(String[][] fieldNames) { + return unique(String.format(UNIQUE_KEY_FORMAT, fieldNames[0][0]), fieldNames); + } + + public static Index unique(String name, String[][] fieldNames) { + return of(Index.IndexType.UNIQUE_KEY, name, fieldNames); + } + + public static Index primary(String fieldName) { + String[] fieldNames = {fieldName}; + return primary(new String[][] {fieldNames}); + } + + public static Index primary(String[][] fieldNames) { + return primary(String.format(PRIMARY_KEY_FORMAT, fieldNames[0][0]), fieldNames); + } + + public static Index primary(String name, String[][] fieldNames) { + return of(Index.IndexType.PRIMARY_KEY, name, fieldNames); + } + + public static Index of(Index.IndexType indexType, String name, String[][] fieldNames) { + return IndexImpl.builder() + .withIndexType(indexType) + .withName(name) + .withFieldNames(fieldNames) + .build(); + } + + public static final class IndexImpl implements Index { + private final IndexType indexType; + private final String name; + private final String[][] fieldNames; + + public IndexImpl(IndexType indexType, String name, String[][] fieldNames) { + this.indexType = indexType; + this.name = name; + this.fieldNames = fieldNames; + } + + @Override + public IndexType type() { + return indexType; + } + + @Override + public String name() { + return name; + } + + @Override + public String[][] fieldNames() { + return fieldNames; + } + + public static Builder builder() { + return new Builder(); + } + + /** Builder to create a index. */ + public static class Builder { + protected IndexType indexType; + protected String name; + protected String[][] fieldNames; + + public Indexes.IndexImpl.Builder withIndexType(IndexType indexType) { + this.indexType = indexType; + return this; + } + + public Indexes.IndexImpl.Builder withName(String name) { + this.name = name; + return this; + } + + public Indexes.IndexImpl.Builder withFieldNames(String[][] fieldNames) { + this.fieldNames = fieldNames; + return this; + } + + public Index build() { + return new IndexImpl(indexType, name, fieldNames); + } + } + } +} diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java index 01e2dbe91d6..2be0d56b336 100644 --- a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java @@ -39,6 +39,7 @@ import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.expressions.transforms.Transforms; +import com.datastrato.gravitino.rel.indexes.Index; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; @@ -87,7 +88,7 @@ public class HiveCatalogOperations implements CatalogOperations, SupportsSchemas // will only need to set the configuration 'METASTORE_URL' in Gravitino and Gravitino will change // it to `METASTOREURIS` automatically and pass it to Hive. public static final Map GRAVITINO_CONFIG_TO_HIVE = - ImmutableMap.of(METASTORE_URIS, ConfVars.METASTOREURIS.varname); + ImmutableMap.of(METASTORE_URIS, ConfVars.METASTOREURIS.varname); /** * Constructs a new instance of HiveCatalogOperations. @@ -116,14 +117,14 @@ public void initialize(Map conf) throws RuntimeException { Map gravitinoConfig = Maps.newHashMap(); conf.forEach( - (key, value) -> { - if (key.startsWith(CATALOG_BYPASS_PREFIX)) { - // Trim bypass prefix and pass it to hive conf - byPassConfig.put(key.substring(CATALOG_BYPASS_PREFIX.length()), value); - } else if (GRAVITINO_CONFIG_TO_HIVE.containsKey(key)) { - gravitinoConfig.put(GRAVITINO_CONFIG_TO_HIVE.get(key), value); - } - }); + (key, value) -> { + if (key.startsWith(CATALOG_BYPASS_PREFIX)) { + // Trim bypass prefix and pass it to hive conf + byPassConfig.put(key.substring(CATALOG_BYPASS_PREFIX.length()), value); + } else if (GRAVITINO_CONFIG_TO_HIVE.containsKey(key)) { + gravitinoConfig.put(GRAVITINO_CONFIG_TO_HIVE.get(key), value); + } + }); Map mergeConfig = Maps.newHashMap(byPassConfig); // `gravitinoConfig` overwrite byPassConfig if possible @@ -163,26 +164,26 @@ public void close() { public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException { try { NameIdentifier[] schemas = - clientPool.run( - c -> - c.getAllDatabases().stream() - .map(db -> NameIdentifier.of(namespace, db)) - .toArray(NameIdentifier[]::new)); + clientPool.run( + c -> + c.getAllDatabases().stream() + .map(db -> NameIdentifier.of(namespace, db)) + .toArray(NameIdentifier[]::new)); return schemas; } catch (TException e) { throw new RuntimeException( - "Failed to list all schemas (database) under namespace : " - + namespace - + " in Hive Metastore", - e); + "Failed to list all schemas (database) under namespace : " + + namespace + + " in Hive Metastore", + e); } catch (InterruptedException e) { throw new RuntimeException(e); } } - /** + /**cre * Creates a new schema with the provided identifier, comment, and metadata. * * @param ident The identifier of the schema to create. @@ -194,40 +195,40 @@ public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogExc */ @Override public HiveSchema createSchema( - NameIdentifier ident, String comment, Map properties) - throws NoSuchCatalogException, SchemaAlreadyExistsException { + NameIdentifier ident, String comment, Map properties) + throws NoSuchCatalogException, SchemaAlreadyExistsException { try { HiveSchema hiveSchema = - new HiveSchema.Builder() - .withName(ident.name()) - .withComment(comment) - .withProperties(properties) - .withConf(hiveConf) - .withAuditInfo( - new AuditInfo.Builder() - .withCreator(currentUser()) - .withCreateTime(Instant.now()) - .build()) - .build(); + new HiveSchema.Builder() + .withName(ident.name()) + .withComment(comment) + .withProperties(properties) + .withConf(hiveConf) + .withAuditInfo( + new AuditInfo.Builder() + .withCreator(currentUser()) + .withCreateTime(Instant.now()) + .build()) + .build(); clientPool.run( - client -> { - client.createDatabase(hiveSchema.toHiveDB()); - return null; - }); + client -> { + client.createDatabase(hiveSchema.toHiveDB()); + return null; + }); LOG.info("Created Hive schema (database) {} in Hive Metastore", ident.name()); return hiveSchema; } catch (AlreadyExistsException e) { throw new SchemaAlreadyExistsException( - String.format( - "Hive schema (database) '%s' already exists in Hive Metastore", ident.name()), - e); + String.format( + "Hive schema (database) '%s' already exists in Hive Metastore", ident.name()), + e); } catch (TException e) { throw new RuntimeException( - "Failed to create Hive schema (database) " + ident.name() + " in Hive Metastore", e); + "Failed to create Hive schema (database) " + ident.name() + " in Hive Metastore", e); } catch (Exception e) { throw new RuntimeException(e); @@ -252,13 +253,13 @@ public HiveSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException } catch (NoSuchObjectException | UnknownDBException e) { throw new NoSuchSchemaException( - String.format( - "Hive schema (database) does not exist: %s in Hive Metastore", ident.name()), - e); + String.format( + "Hive schema (database) does not exist: %s in Hive Metastore", ident.name()), + e); } catch (TException e) { throw new RuntimeException( - "Failed to load Hive schema (database) " + ident.name() + " from Hive Metastore", e); + "Failed to load Hive schema (database) " + ident.name() + " from Hive Metastore", e); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -275,26 +276,26 @@ public HiveSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException */ @Override public HiveSchema alterSchema(NameIdentifier ident, SchemaChange... changes) - throws NoSuchSchemaException { + throws NoSuchSchemaException { try { // load the database parameters Database database = clientPool.run(client -> client.getDatabase(ident.name())); Map properties = HiveSchema.buildSchemaProperties(database); LOG.debug( - "Loaded properties for Hive schema (database) {} found {}", - ident.name(), - properties.keySet()); + "Loaded properties for Hive schema (database) {} found {}", + ident.name(), + properties.keySet()); for (SchemaChange change : changes) { if (change instanceof SchemaChange.SetProperty) { properties.put( - ((SchemaChange.SetProperty) change).getProperty(), - ((SchemaChange.SetProperty) change).getValue()); + ((SchemaChange.SetProperty) change).getProperty(), + ((SchemaChange.SetProperty) change).getValue()); } else if (change instanceof SchemaChange.RemoveProperty) { properties.remove(((SchemaChange.RemoveProperty) change).getProperty()); } else { throw new IllegalArgumentException( - "Unsupported schema change type: " + change.getClass().getSimpleName()); + "Unsupported schema change type: " + change.getClass().getSimpleName()); } } @@ -303,22 +304,22 @@ public HiveSchema alterSchema(NameIdentifier ident, SchemaChange... changes) alteredDatabase.setParameters(properties); clientPool.run( - client -> { - client.alterDatabase(ident.name(), alteredDatabase); - return null; - }); + client -> { + client.alterDatabase(ident.name(), alteredDatabase); + return null; + }); LOG.info("Altered Hive schema (database) {} in Hive Metastore", ident.name()); return HiveSchema.fromHiveDB(alteredDatabase, hiveConf); } catch (NoSuchObjectException e) { throw new NoSuchSchemaException( - String.format("Hive schema (database) %s does not exist in Hive Metastore", ident.name()), - e); + String.format("Hive schema (database) %s does not exist in Hive Metastore", ident.name()), + e); } catch (TException | InterruptedException e) { throw new RuntimeException( - "Failed to alter Hive schema (database) " + ident.name() + " in Hive metastore", e); + "Failed to alter Hive schema (database) " + ident.name() + " in Hive metastore", e); } catch (Exception e) { throw new RuntimeException(e); @@ -337,18 +338,18 @@ public HiveSchema alterSchema(NameIdentifier ident, SchemaChange... changes) public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException { try { clientPool.run( - client -> { - client.dropDatabase(ident.name(), false, false, cascade); - return null; - }); + client -> { + client.dropDatabase(ident.name(), false, false, cascade); + return null; + }); LOG.info("Dropped Hive schema (database) {}", ident.name()); return true; } catch (InvalidOperationException e) { throw new NonEmptySchemaException( - String.format( - "Hive schema (database) %s is not empty. One or more tables exist.", ident.name()), - e); + String.format( + "Hive schema (database) %s is not empty. One or more tables exist.", ident.name()), + e); } catch (NoSuchObjectException e) { LOG.warn("Hive schema (database) {} does not exist in Hive Metastore", ident.name()); @@ -356,7 +357,7 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty } catch (TException e) { throw new RuntimeException( - "Failed to drop Hive schema (database) " + ident.name() + " in Hive Metastore", e); + "Failed to drop Hive schema (database) " + ident.name() + " in Hive Metastore", e); } catch (Exception e) { throw new RuntimeException(e); @@ -391,18 +392,18 @@ public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaExcep // those names we can obtain metadata for each individual table and get the type we needed. List allTables = clientPool.run(c -> c.getAllTables(schemaIdent.name())); return clientPool.run( - c -> - c.getTableObjectsByName(schemaIdent.name(), allTables).stream() - .filter(tb -> SUPPORT_TABLE_TYPES.contains(tb.getTableType())) - .map(tb -> NameIdentifier.of(namespace, tb.getTableName())) - .toArray(NameIdentifier[]::new)); + c -> + c.getTableObjectsByName(schemaIdent.name(), allTables).stream() + .filter(tb -> SUPPORT_TABLE_TYPES.contains(tb.getTableType())) + .map(tb -> NameIdentifier.of(namespace, tb.getTableName())) + .toArray(NameIdentifier[]::new)); } catch (UnknownDBException e) { throw new NoSuchSchemaException( - "Schema (database) does not exist " + namespace + " in Hive Metastore"); + "Schema (database) does not exist " + namespace + " in Hive Metastore"); } catch (TException e) { throw new RuntimeException( - "Failed to list all tables under the namespace : " + namespace + " in Hive Metastore", e); + "Failed to list all tables under the namespace : " + namespace + " in Hive Metastore", e); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -430,16 +431,16 @@ private org.apache.hadoop.hive.metastore.api.Table loadHiveTable(NameIdentifier try { org.apache.hadoop.hive.metastore.api.Table table = - clientPool.run(c -> c.getTable(schemaIdent.name(), tableIdent.name())); + clientPool.run(c -> c.getTable(schemaIdent.name(), tableIdent.name())); return table; } catch (NoSuchObjectException e) { throw new NoSuchTableException( - String.format("Hive table does not exist: %s in Hive Metastore", tableIdent.name()), e); + String.format("Hive table does not exist: %s in Hive Metastore", tableIdent.name()), e); } catch (InterruptedException | TException e) { throw new RuntimeException( - "Failed to load Hive table " + tableIdent.name() + " from Hive metastore", e); + "Failed to load Hive table " + tableIdent.name() + " from Hive metastore", e); } } @@ -448,75 +449,75 @@ private void validatePartitionForCreate(Column[] columns, Transform[] partitioni for (int i = 0; i < partitioning.length; i++) { Preconditions.checkArgument( - partitioning[i] instanceof Transforms.IdentityTransform, - "Hive partition only supports identity transform"); + partitioning[i] instanceof Transforms.IdentityTransform, + "Hive partition only supports identity transform"); Transforms.IdentityTransform identity = (Transforms.IdentityTransform) partitioning[i]; Preconditions.checkArgument( - identity.fieldName().length == 1, "Hive partition does not support nested field"); + identity.fieldName().length == 1, "Hive partition does not support nested field"); // The partition field must be placed at the end of the columns in order. // For example, if the table has columns [a, b, c, d], then the partition field must be // [b, c, d] or [c, d] or [d]. Preconditions.checkArgument( - columns[partitionStartIndex + i].name().equals(identity.fieldName()[0]), - "The partition field must be placed at the end of the columns in order"); + columns[partitionStartIndex + i].name().equals(identity.fieldName()[0]), + "The partition field must be placed at the end of the columns in order"); } } private void validateColumnChangeForAlter( - TableChange[] changes, org.apache.hadoop.hive.metastore.api.Table hiveTable) { + TableChange[] changes, org.apache.hadoop.hive.metastore.api.Table hiveTable) { Set partitionFields = - hiveTable.getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toSet()); + hiveTable.getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toSet()); Set existingFields = - hiveTable.getSd().getCols().stream().map(FieldSchema::getName).collect(Collectors.toSet()); + hiveTable.getSd().getCols().stream().map(FieldSchema::getName).collect(Collectors.toSet()); existingFields.addAll(partitionFields); Arrays.stream(changes) - .filter(c -> c instanceof TableChange.ColumnChange) - .forEach( - c -> { - String fieldToAdd = String.join(".", ((TableChange.ColumnChange) c).fieldName()); - Preconditions.checkArgument( - c instanceof TableChange.UpdateColumnComment - || !partitionFields.contains(fieldToAdd), - "Cannot alter partition column: " + fieldToAdd); - - if (c instanceof TableChange.UpdateColumnNullability) { - throw new IllegalArgumentException( - "Hive does not support altering column nullability"); - } - - if (c instanceof TableChange.UpdateColumnPosition - && afterPartitionColumn( - partitionFields, ((TableChange.UpdateColumnPosition) c).getPosition())) { - throw new IllegalArgumentException( - "Cannot alter column position to after partition column"); - } - - if (c instanceof TableChange.AddColumn) { - TableChange.AddColumn addColumn = (TableChange.AddColumn) c; - - if (existingFields.contains(fieldToAdd)) { - throw new IllegalArgumentException( - "Cannot add column with duplicate name: " + fieldToAdd); - } - - if (addColumn.getPosition() == null) { - // If the position is not specified, the column will be added to the end of the - // non-partition columns. - return; - } - - if ((afterPartitionColumn(partitionFields, addColumn.getPosition()))) { - throw new IllegalArgumentException("Cannot add column after partition column"); - } - } - }); + .filter(c -> c instanceof TableChange.ColumnChange) + .forEach( + c -> { + String fieldToAdd = String.join(".", ((TableChange.ColumnChange) c).fieldName()); + Preconditions.checkArgument( + c instanceof TableChange.UpdateColumnComment + || !partitionFields.contains(fieldToAdd), + "Cannot alter partition column: " + fieldToAdd); + + if (c instanceof TableChange.UpdateColumnNullability) { + throw new IllegalArgumentException( + "Hive does not support altering column nullability"); + } + + if (c instanceof TableChange.UpdateColumnPosition + && afterPartitionColumn( + partitionFields, ((TableChange.UpdateColumnPosition) c).getPosition())) { + throw new IllegalArgumentException( + "Cannot alter column position to after partition column"); + } + + if (c instanceof TableChange.AddColumn) { + TableChange.AddColumn addColumn = (TableChange.AddColumn) c; + + if (existingFields.contains(fieldToAdd)) { + throw new IllegalArgumentException( + "Cannot add column with duplicate name: " + fieldToAdd); + } + + if (addColumn.getPosition() == null) { + // If the position is not specified, the column will be added to the end of the + // non-partition columns. + return; + } + + if ((afterPartitionColumn(partitionFields, addColumn.getPosition()))) { + throw new IllegalArgumentException("Cannot add column after partition column"); + } + } + }); } private boolean afterPartitionColumn( - Set partitionFields, TableChange.ColumnPosition columnPosition) { + Set partitionFields, TableChange.ColumnPosition columnPosition) { Preconditions.checkArgument(columnPosition != null, "Column position cannot be null"); if (columnPosition instanceof TableChange.After) { @@ -528,16 +529,16 @@ private boolean afterPartitionColumn( private void validateDistributionAndSort(Distribution distribution, SortOrder[] sortOrder) { if (distribution != Distributions.NONE) { boolean allNameReference = - Arrays.stream(distribution.expressions()) - .allMatch(t -> t instanceof NamedReference.FieldReference); + Arrays.stream(distribution.expressions()) + .allMatch(t -> t instanceof NamedReference.FieldReference); Preconditions.checkArgument( - allNameReference, "Hive distribution only supports field reference"); + allNameReference, "Hive distribution only supports field reference"); } if (ArrayUtils.isNotEmpty(sortOrder)) { boolean allNameReference = - Arrays.stream(sortOrder) - .allMatch(t -> t.expression() instanceof NamedReference.FieldReference); + Arrays.stream(sortOrder) + .allMatch(t -> t.expression() instanceof NamedReference.FieldReference); Preconditions.checkArgument(allNameReference, "Hive sort order only supports name reference"); } } @@ -556,30 +557,34 @@ private void validateDistributionAndSort(Distribution distribution, SortOrder[] */ @Override public Table createTable( - NameIdentifier tableIdent, - Column[] columns, - String comment, - Map properties, - Transform[] partitioning, - Distribution distribution, - SortOrder[] sortOrders) - throws NoSuchSchemaException, TableAlreadyExistsException { + NameIdentifier tableIdent, + Column[] columns, + String comment, + Map properties, + Transform[] partitioning, + Distribution distribution, + SortOrder[] sortOrders, + Index[] indexes) + throws NoSuchSchemaException, TableAlreadyExistsException { + Preconditions.checkArgument( + indexes.length == 0, + "Hive-catalog does not support indexes, current Gravitino hive-catalog only supports Hive 2.x"); NameIdentifier schemaIdent = NameIdentifier.of(tableIdent.namespace().levels()); validatePartitionForCreate(columns, partitioning); validateDistributionAndSort(distribution, sortOrders); Arrays.stream(columns) - .forEach( - c -> { - validateNullable(c.name(), c.nullable()); - validateColumnDefaultValue(c.name(), c.defaultValue()); - }); + .forEach( + c -> { + validateNullable(c.name(), c.nullable()); + validateColumnDefaultValue(c.name(), c.defaultValue()); + }); TableType tableType = (TableType) tablePropertiesMetadata.getOrDefault(properties, TABLE_TYPE); Preconditions.checkArgument( - SUPPORT_TABLE_TYPES.contains(tableType.name()), - "Unsupported table type: " + tableType.name()); + SUPPORT_TABLE_TYPES.contains(tableType.name()), + "Unsupported table type: " + tableType.name()); try { if (!schemaExists(schemaIdent)) { @@ -588,26 +593,26 @@ public Table createTable( } HiveTable hiveTable = - new HiveTable.Builder() - .withName(tableIdent.name()) - .withSchemaName(schemaIdent.name()) - .withComment(comment) - .withColumns(columns) - .withProperties(properties) - .withDistribution(distribution) - .withSortOrders(sortOrders) - .withAuditInfo( - new AuditInfo.Builder() - .withCreator(currentUser()) - .withCreateTime(Instant.now()) - .build()) - .withPartitioning(partitioning) - .build(); + new HiveTable.Builder() + .withName(tableIdent.name()) + .withSchemaName(schemaIdent.name()) + .withComment(comment) + .withColumns(columns) + .withProperties(properties) + .withDistribution(distribution) + .withSortOrders(sortOrders) + .withAuditInfo( + new AuditInfo.Builder() + .withCreator(currentUser()) + .withCreateTime(Instant.now()) + .build()) + .withPartitioning(partitioning) + .build(); clientPool.run( - c -> { - c.createTable(hiveTable.toHiveTable(tablePropertiesMetadata)); - return null; - }); + c -> { + c.createTable(hiveTable.toHiveTable(tablePropertiesMetadata)); + return null; + }); LOG.info("Created Hive table {} in Hive Metastore", tableIdent.name()); return hiveTable; @@ -616,7 +621,7 @@ public Table createTable( throw new TableAlreadyExistsException("Table already exists: " + tableIdent.name(), e); } catch (TException | InterruptedException e) { throw new RuntimeException( - "Failed to create Hive table " + tableIdent.name() + " in Hive Metastore", e); + "Failed to create Hive table " + tableIdent.name() + " in Hive Metastore", e); } catch (Exception e) { throw new RuntimeException(e); } @@ -638,14 +643,14 @@ public Table createTable( */ @Override public Table alterTable(NameIdentifier tableIdent, TableChange... changes) - throws NoSuchTableException, IllegalArgumentException { + throws NoSuchTableException, IllegalArgumentException { NameIdentifier schemaIdent = NameIdentifier.of(tableIdent.namespace().levels()); try { // TODO(@Minghuang): require a table lock to avoid race condition HiveTable table = (HiveTable) loadTable(tableIdent); org.apache.hadoop.hive.metastore.api.Table alteredHiveTable = - table.toHiveTable(tablePropertiesMetadata); + table.toHiveTable(tablePropertiesMetadata); validateColumnChangeForAlter(changes, alteredHiveTable); @@ -690,38 +695,38 @@ public Table alterTable(NameIdentifier tableIdent, TableChange... changes) } else { throw new IllegalArgumentException( - "Unsupported column change type: " + change.getClass().getSimpleName()); + "Unsupported column change type: " + change.getClass().getSimpleName()); } } else { throw new IllegalArgumentException( - "Unsupported table change type: " - + (change == null ? "null" : change.getClass().getSimpleName())); + "Unsupported table change type: " + + (change == null ? "null" : change.getClass().getSimpleName())); } } clientPool.run( - c -> { - c.alter_table(schemaIdent.name(), tableIdent.name(), alteredHiveTable); - return null; - }); + c -> { + c.alter_table(schemaIdent.name(), tableIdent.name(), alteredHiveTable); + return null; + }); LOG.info("Altered Hive table {} in Hive Metastore", tableIdent.name()); return HiveTable.fromHiveTable(alteredHiveTable); } catch (TException | InterruptedException e) { if (e.getMessage() != null - && e.getMessage().contains("types incompatible with the existing columns")) { + && e.getMessage().contains("types incompatible with the existing columns")) { throw new IllegalArgumentException( - "Failed to alter Hive table [" - + tableIdent.name() - + "] in Hive metastore, " - + "since Hive metastore will check the compatibility of column type between the old and new column positions, " - + "please ensure that the type of the new column position is compatible with the old one, " - + "otherwise the alter operation will fail in Hive metastore.", - e); + "Failed to alter Hive table [" + + tableIdent.name() + + "] in Hive metastore, " + + "since Hive metastore will check the compatibility of column type between the old and new column positions, " + + "please ensure that the type of the new column position is compatible with the old one, " + + "otherwise the alter operation will fail in Hive metastore.", + e); } throw new RuntimeException( - "Failed to alter Hive table " + tableIdent.name() + " in Hive metastore", e); + "Failed to alter Hive table " + tableIdent.name() + " in Hive metastore", e); } catch (IllegalArgumentException | NoSuchTableException e) { throw e; } catch (Exception e) { @@ -734,9 +739,9 @@ private void validateColumnDefaultValue(String fieldName, Expression defaultValu // https://issues.apache.org/jira/browse/HIVE-18726 if (!defaultValue.equals(Column.DEFAULT_VALUE_NOT_SET)) { throw new IllegalArgumentException( - "The DEFAULT constraint for column is only supported since Hive 3.0, " - + "but the current Gravitino Hive catalog only supports Hive 2.x. Illegal column: " - + fieldName); + "The DEFAULT constraint for column is only supported since Hive 3.0, " + + "but the current Gravitino Hive catalog only supports Hive 2.x. Illegal column: " + + fieldName); } } @@ -745,9 +750,9 @@ private void validateNullable(String fieldName, boolean nullable) { // https://issues.apache.org/jira/browse/HIVE-16575 if (!nullable) { throw new IllegalArgumentException( - "The NOT NULL constraint for column is only supported since Hive 3.0, " - + "but the current Gravitino Hive catalog only supports Hive 2.x. Illegal column: " - + fieldName); + "The NOT NULL constraint for column is only supported since Hive 3.0, " + + "but the current Gravitino Hive catalog only supports Hive 2.x. Illegal column: " + + fieldName); } } @@ -762,7 +767,7 @@ private int columnPosition(List columns, TableChange.ColumnPosition return 0; } else { throw new UnsupportedOperationException( - "Unsupported column position type: " + position.getClass().getSimpleName()); + "Unsupported column position type: " + position.getClass().getSimpleName()); } } @@ -776,30 +781,30 @@ private int columnPosition(List columns, TableChange.ColumnPosition */ private int indexOfColumn(List columns, String fieldName) { return columns.stream() - .map(FieldSchema::getName) - .collect(Collectors.toList()) - .indexOf(fieldName); + .map(FieldSchema::getName) + .collect(Collectors.toList()) + .indexOf(fieldName); } private void doRenameTable( - org.apache.hadoop.hive.metastore.api.Table hiveTable, TableChange.RenameTable change) { + org.apache.hadoop.hive.metastore.api.Table hiveTable, TableChange.RenameTable change) { hiveTable.setTableName(change.getNewName()); } private void doUpdateComment( - org.apache.hadoop.hive.metastore.api.Table hiveTable, TableChange.UpdateComment change) { + org.apache.hadoop.hive.metastore.api.Table hiveTable, TableChange.UpdateComment change) { Map parameters = hiveTable.getParameters(); parameters.put(COMMENT, change.getNewComment()); } private void doSetProperty( - org.apache.hadoop.hive.metastore.api.Table hiveTable, TableChange.SetProperty change) { + org.apache.hadoop.hive.metastore.api.Table hiveTable, TableChange.SetProperty change) { Map parameters = hiveTable.getParameters(); parameters.put(change.getProperty(), change.getValue()); } private void doRemoveProperty( - org.apache.hadoop.hive.metastore.api.Table hiveTable, TableChange.RemoveProperty change) { + org.apache.hadoop.hive.metastore.api.Table hiveTable, TableChange.RemoveProperty change) { Map parameters = hiveTable.getParameters(); parameters.remove(change.getProperty()); } @@ -810,17 +815,17 @@ private void doAddColumn(List cols, TableChange.AddColumn change) { // add to the end by default targetPosition = cols.size(); LOG.info( - "Hive catalog add column {} to the end of non-partition columns by default", - change.fieldName()[0]); + "Hive catalog add column {} to the end of non-partition columns by default", + change.fieldName()[0]); } else { targetPosition = columnPosition(cols, change.getPosition()); } cols.add( - targetPosition, - new FieldSchema( - change.fieldName()[0], - ToHiveType.convert(change.getDataType()).getQualifiedName(), - change.getComment())); + targetPosition, + new FieldSchema( + change.fieldName()[0], + ToHiveType.convert(change.getDataType()).getQualifiedName(), + change.getComment())); } private void doDeleteColumn(List cols, TableChange.DeleteColumn change) { @@ -844,12 +849,12 @@ private void doRenameColumn(List cols, TableChange.RenameColumn cha } private void doUpdateColumnComment( - List cols, TableChange.UpdateColumnComment change) { + List cols, TableChange.UpdateColumnComment change) { cols.get(indexOfColumn(cols, change.fieldName()[0])).setComment(change.getNewComment()); } private void doUpdateColumnPosition( - List cols, TableChange.UpdateColumnPosition change) { + List cols, TableChange.UpdateColumnPosition change) { String columnName = change.fieldName()[0]; int sourceIndex = indexOfColumn(cols, columnName); if (sourceIndex == -1) { @@ -917,10 +922,10 @@ private boolean dropHiveTable(NameIdentifier tableIdent, boolean deleteData, boo try { clientPool.run( - c -> { - c.dropTable(schemaIdent.name(), tableIdent.name(), deleteData, false, ifPurge); - return null; - }); + c -> { + c.dropTable(schemaIdent.name(), tableIdent.name(), deleteData, false, ifPurge); + return null; + }); LOG.info("Dropped Hive table {}", tableIdent.name()); return true; @@ -930,7 +935,7 @@ private boolean dropHiveTable(NameIdentifier tableIdent, boolean deleteData, boo return false; } catch (TException | InterruptedException e) { throw new RuntimeException( - "Failed to drop Hive table " + tableIdent.name() + " in Hive Metastore", e); + "Failed to drop Hive table " + tableIdent.name() + " in Hive Metastore", e); } catch (Exception e) { throw new RuntimeException(e); } @@ -971,7 +976,7 @@ public PropertiesMetadata schemaPropertiesMetadata() throws UnsupportedOperation @Override public PropertiesMetadata filesetPropertiesMetadata() throws UnsupportedOperationException { throw new UnsupportedOperationException( - "Hive catalog does not support fileset properties metadata"); + "Hive catalog does not support fileset properties metadata"); } private boolean isExternalTable(NameIdentifier tableIdent) { diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java index aa7c0171345..8ce24d86739 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java @@ -37,6 +37,8 @@ import com.datastrato.gravitino.rel.expressions.distributions.Distributions; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.indexes.Index; +import com.datastrato.gravitino.rel.indexes.Indexes; import com.datastrato.gravitino.utils.MapUtils; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; @@ -342,6 +344,7 @@ public boolean dropTable(NameIdentifier tableIdent) { * @param comment The comment for the new table. * @param properties The properties for the new table. * @param partitioning The partitioning for the new table. + * @param indexes The indexes for the new table. * @return The newly created JdbcTable instance. * @throws NoSuchSchemaException If the schema for the table does not exist. * @throws TableAlreadyExistsException If the table with the same name already exists. @@ -354,8 +357,11 @@ public Table createTable( Map properties, Transform[] partitioning, Distribution distribution, - SortOrder[] sortOrders) + SortOrder[] sortOrders, + Index[] indexes) throws NoSuchSchemaException, TableAlreadyExistsException { + Preconditions.checkArgument( + indexes.length == 0, "jdbc-catalog does not support indexes"); Preconditions.checkArgument( null == distribution || distribution == Distributions.NONE, "jdbc-catalog does not support distribution"); diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index e7c93e3b96f..7287e7283dc 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -30,6 +30,8 @@ import com.datastrato.gravitino.rel.expressions.distributions.Distributions; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.indexes.Index; +import com.datastrato.gravitino.rel.indexes.Indexes; import com.datastrato.gravitino.utils.MapUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -468,6 +470,7 @@ public boolean dropTable(NameIdentifier tableIdent) { * @param comment The comment for the new table. * @param properties The properties for the new table. * @param partitioning The partitioning for the new table. + * @param indexes The indexes for the new table. * @return The newly created IcebergTable instance. * @throws NoSuchSchemaException If the schema for the table does not exist. * @throws TableAlreadyExistsException If the table with the same name already exists. @@ -480,8 +483,11 @@ public Table createTable( Map properties, Transform[] partitioning, Distribution distribution, - SortOrder[] sortOrders) + SortOrder[] sortOrders, + Index[] indexes) throws NoSuchSchemaException, TableAlreadyExistsException { + Preconditions.checkArgument( + indexes.length == 0, "iceberg-catalog does not support indexes"); try { if (!Distributions.NONE.equals(distribution)) { throw new UnsupportedOperationException("Iceberg does not support distribution"); diff --git a/clients/client-java/src/main/java/com/datastrato/gravitino/client/RelationalCatalog.java b/clients/client-java/src/main/java/com/datastrato/gravitino/client/RelationalCatalog.java index 7d01fa3f907..aea3bba9755 100644 --- a/clients/client-java/src/main/java/com/datastrato/gravitino/client/RelationalCatalog.java +++ b/clients/client-java/src/main/java/com/datastrato/gravitino/client/RelationalCatalog.java @@ -38,6 +38,7 @@ import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.indexes.Index; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.Arrays; @@ -135,6 +136,7 @@ public Table loadTable(NameIdentifier ident) throws NoSuchTableException { * @param comment The comment of the table. * @param properties The properties of the table. * @param partitioning The partitioning of the table. + * @param indexes The indexes of the table. * @return The created {@link Table}. * @throws NoSuchSchemaException if the schema with specified namespace does not exist. * @throws TableAlreadyExistsException if the table with specified identifier already exists. @@ -147,7 +149,8 @@ public Table createTable( Map properties, Transform[] partitioning, Distribution distribution, - SortOrder[] sortOrders) + SortOrder[] sortOrders, + Index[] indexes) throws NoSuchSchemaException, TableAlreadyExistsException { NameIdentifier.checkTable(ident); @@ -159,7 +162,8 @@ public Table createTable( properties, toDTOs(sortOrders), toDTO(distribution), - toDTOs(partitioning)); + toDTOs(partitioning), + toDTOs(indexes)); req.validate(); TableResponse resp = diff --git a/common/src/main/java/com/datastrato/gravitino/dto/rel/TableDTO.java b/common/src/main/java/com/datastrato/gravitino/dto/rel/TableDTO.java index bd57c09abb5..5d1178af522 100644 --- a/common/src/main/java/com/datastrato/gravitino/dto/rel/TableDTO.java +++ b/common/src/main/java/com/datastrato/gravitino/dto/rel/TableDTO.java @@ -11,6 +11,7 @@ import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.indexes.Index; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import java.util.Map; @@ -42,6 +43,9 @@ public class TableDTO implements Table { @JsonProperty("partitioning") private Partitioning[] partitioning; + @JsonProperty("indexes") + private Index[] indexes; + private TableDTO() {} /** @@ -53,6 +57,7 @@ private TableDTO() {} * @param properties The properties associated with the table. * @param audit The audit information for the table. * @param partitioning The partitioning of the table. + * @param indexes Teh indexes of the table. */ private TableDTO( String name, @@ -62,7 +67,8 @@ private TableDTO( AuditDTO audit, Partitioning[] partitioning, DistributionDTO distribution, - SortOrderDTO[] sortOrderDTOs) { + SortOrderDTO[] sortOrderDTOs, + Index[] indexes) { this.name = name; this.comment = comment; this.columns = columns; @@ -71,6 +77,7 @@ private TableDTO( this.distribution = distribution; this.sortOrders = sortOrderDTOs; this.partitioning = partitioning; + this.indexes = indexes; } @Override @@ -113,6 +120,11 @@ public Distribution distribution() { return distribution; } + @Override + public Index[] index() { + return indexes; + } + /** * Creates a new Builder to build a Table DTO. * @@ -136,6 +148,7 @@ public static class Builder { protected SortOrderDTO[] sortOrderDTOs; protected DistributionDTO distributionDTO; protected Partitioning[] Partitioning; + protected Index[] indexes; public Builder() {} @@ -209,6 +222,11 @@ public S withPartitioning(Partitioning[] Partitioning) { return (S) this; } + public S withIndex(Index[] indexes) { + this.indexes = indexes; + return (S) this; + } + /** * Builds a Table DTO based on the provided builder parameters. * @@ -222,7 +240,15 @@ public TableDTO build() { Preconditions.checkArgument(audit != null, "audit cannot be null"); return new TableDTO( - name, comment, columns, properties, audit, Partitioning, distributionDTO, sortOrderDTOs); + name, + comment, + columns, + properties, + audit, + Partitioning, + distributionDTO, + sortOrderDTOs, + indexes); } } } diff --git a/common/src/main/java/com/datastrato/gravitino/dto/rel/indexes/IndexDTO.java b/common/src/main/java/com/datastrato/gravitino/dto/rel/indexes/IndexDTO.java new file mode 100644 index 00000000000..eee540abb7d --- /dev/null +++ b/common/src/main/java/com/datastrato/gravitino/dto/rel/indexes/IndexDTO.java @@ -0,0 +1,73 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.dto.rel.indexes; + +import com.datastrato.gravitino.rel.indexes.Index; +import com.google.common.base.Preconditions; + +public class IndexDTO implements Index { + + private final IndexType indexType; + private final String name; + private final String[][] fieldNames; + + public IndexDTO(IndexType indexType, String name, String[][] fieldNames) { + this.indexType = indexType; + this.name = name; + this.fieldNames = fieldNames; + } + + @Override + public IndexType type() { + return indexType; + } + + @Override + public String name() { + return name; + } + + @Override + public String[][] fieldNames() { + return fieldNames; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + protected IndexType indexType; + + protected String name; + protected String[][] fieldNames; + + public Builder() {} + + public S withIndexType(IndexType indexType) { + this.indexType = indexType; + return (S) this; + } + + public S withName(String name) { + this.name = name; + return (S) this; + } + + public S withFieldNames(String[][] fieldNames) { + this.fieldNames = fieldNames; + return (S) this; + } + + public IndexDTO build() { + Preconditions.checkArgument(indexType != null, "Index type cannot be null"); + Preconditions.checkArgument(name != null, "Index name cannot be null"); + Preconditions.checkArgument( + fieldNames != null, "The index must be set with corresponding column names"); + return new IndexDTO(indexType, name, fieldNames); + } + } +} diff --git a/common/src/main/java/com/datastrato/gravitino/dto/requests/TableCreateRequest.java b/common/src/main/java/com/datastrato/gravitino/dto/requests/TableCreateRequest.java index 4f8e538af7a..9fa791e86cf 100644 --- a/common/src/main/java/com/datastrato/gravitino/dto/requests/TableCreateRequest.java +++ b/common/src/main/java/com/datastrato/gravitino/dto/requests/TableCreateRequest.java @@ -9,6 +9,8 @@ import com.datastrato.gravitino.dto.rel.SortOrderDTO; import com.datastrato.gravitino.dto.rel.expressions.FunctionArg; import com.datastrato.gravitino.dto.rel.partitions.Partitioning; +import com.datastrato.gravitino.rel.indexes.Index; +import com.datastrato.gravitino.rel.indexes.Indexes; import com.datastrato.gravitino.rest.RESTRequest; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; @@ -57,6 +59,10 @@ public class TableCreateRequest implements RESTRequest { @JsonProperty("partitioning") private final Partitioning[] partitioning; + @Nullable + @JsonProperty("indexes") + private final Index[] indexes; + public TableCreateRequest() { this(null, null, null, null, null, null, null); } @@ -73,6 +79,25 @@ public TableCreateRequest( new Partitioning[0]); } + public TableCreateRequest( + String name, + String comment, + ColumnDTO[] columns, + Map properties, + SortOrderDTO[] sortOrders, + DistributionDTO distribution, + Partitioning[] partitioning) { + this( + name, + comment, + columns, + properties, + sortOrders, + distribution, + partitioning, + Indexes.EMPTY_INDEXES); + } + public TableCreateRequest( String name, @Nullable String comment, @@ -80,7 +105,8 @@ public TableCreateRequest( @Nullable Map properties, @Nullable SortOrderDTO[] sortOrders, @Nullable DistributionDTO distribution, - @Nullable Partitioning[] partitioning) { + @Nullable Partitioning[] partitioning, + @Nullable Index[] indexes) { this.name = name; this.columns = columns; this.comment = comment; @@ -88,6 +114,7 @@ public TableCreateRequest( this.sortOrders = sortOrders; this.distribution = distribution; this.partitioning = partitioning; + this.indexes = indexes; } @Override @@ -122,5 +149,9 @@ public void validate() throws IllegalArgumentException { autoIncrementCols.size() <= 1, "Only one column can be auto-incremented. There are multiple auto-increment columns in your table: " + autoIncrementColsStr); + + if (indexes != null && indexes.length > 0) { + throw new UnsupportedOperationException("Support for indexing is currently not implemented"); + } } } diff --git a/common/src/main/java/com/datastrato/gravitino/dto/util/DTOConverters.java b/common/src/main/java/com/datastrato/gravitino/dto/util/DTOConverters.java index ce0fd0fff7f..b03cf3f3353 100644 --- a/common/src/main/java/com/datastrato/gravitino/dto/util/DTOConverters.java +++ b/common/src/main/java/com/datastrato/gravitino/dto/util/DTOConverters.java @@ -22,6 +22,7 @@ import com.datastrato.gravitino.dto.rel.expressions.FuncExpressionDTO; import com.datastrato.gravitino.dto.rel.expressions.FunctionArg; import com.datastrato.gravitino.dto.rel.expressions.LiteralDTO; +import com.datastrato.gravitino.dto.rel.indexes.IndexDTO; import com.datastrato.gravitino.dto.rel.partitions.BucketPartitioningDTO; import com.datastrato.gravitino.dto.rel.partitions.DayPartitioningDTO; import com.datastrato.gravitino.dto.rel.partitions.FunctionPartitioningDTO; @@ -47,6 +48,7 @@ import com.datastrato.gravitino.rel.expressions.sorts.SortOrders; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.expressions.transforms.Transforms; +import com.datastrato.gravitino.rel.indexes.Index; import java.util.Arrays; import org.apache.commons.lang3.ArrayUtils; @@ -191,6 +193,17 @@ public static Partitioning toDTO(Transform transform) { } } + public static IndexDTO toDTO(Index index) { + if (index instanceof IndexDTO) { + return (IndexDTO) index; + } + return IndexDTO.builder() + .withIndexType(index.type()) + .withName(index.name()) + .withFieldNames(index.fieldNames()) + .build(); + } + public static FunctionArg toFunctionArg(Expression expression) { if (expression instanceof FunctionArg) { return (FunctionArg) expression; @@ -239,6 +252,13 @@ public static Partitioning[] toDTOs(Transform[] transforms) { return Arrays.stream(transforms).map(DTOConverters::toDTO).toArray(Partitioning[]::new); } + public static IndexDTO[] toDTOs(Index[] indexes) { + if (ArrayUtils.isEmpty(indexes)) { + return new IndexDTO[0]; + } + return Arrays.stream(indexes).map(DTOConverters::toDTO).toArray(IndexDTO[]::new); + } + public static Distribution fromDTO(DistributionDTO distributionDTO) { if (DistributionDTO.NONE.equals(distributionDTO) || null == distributionDTO) { return Distributions.NONE; diff --git a/common/src/test/java/com/datastrato/gravitino/json/TestDTOJsonSerDe.java b/common/src/test/java/com/datastrato/gravitino/json/TestDTOJsonSerDe.java index b1bdac5c1e7..c9fc4acd5be 100644 --- a/common/src/test/java/com/datastrato/gravitino/json/TestDTOJsonSerDe.java +++ b/common/src/test/java/com/datastrato/gravitino/json/TestDTOJsonSerDe.java @@ -47,7 +47,7 @@ public class TestDTOJsonSerDe { "{\"name\":%s,\"type\":%s,\"comment\":%s,\"nullable\":%s,\"autoIncrement\":%s}"; private final String tableJson = - "{\"name\":%s,\"comment\":%s,\"columns\":[%s],\"properties\":%s,\"audit\":%s,\"distribution\":%s,\"sortOrders\":%s,\"partitioning\":%s}"; + "{\"name\":%s,\"comment\":%s,\"columns\":[%s],\"properties\":%s,\"audit\":%s,\"distribution\":%s,\"sortOrders\":%s,\"partitioning\":%s,\"indexes\":%s}"; private String withQuotes(String str) { return "\"" + str + "\""; @@ -273,6 +273,7 @@ public void testTableDTOSerDe() throws Exception { String.format(auditJson, withQuotes(creator), withQuotes(now.toString()), null, null), null, null, + null, null); Assertions.assertEquals(expectedJson, serJson); } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/CatalogOperationDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/CatalogOperationDispatcher.java index 76ed6ff6203..7204543d458 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/CatalogOperationDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/CatalogOperationDispatcher.java @@ -39,6 +39,8 @@ import com.datastrato.gravitino.rel.expressions.distributions.Distributions; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.indexes.Index; +import com.datastrato.gravitino.rel.indexes.Indexes; import com.datastrato.gravitino.storage.IdGenerator; import com.datastrato.gravitino.utils.PrincipalUtils; import com.datastrato.gravitino.utils.ThrowableFunction; @@ -392,6 +394,7 @@ private Set getHiddenSchemaPropertyNames( * @param comment A description or comment associated with the table. * @param properties Additional properties to set for the table. * @param partitions An array of {@link Transform} objects representing the partitioning of table + * @param indexes An array of {@link Index} objects representing the indexes of the table. * @return The newly created {@link Table} object. * @throws NoSuchSchemaException If the schema in which to create the table does not exist. * @throws TableAlreadyExistsException If a table with the same name already exists in the schema. @@ -404,7 +407,8 @@ public Table createTable( Map properties, Transform[] partitions, Distribution distribution, - SortOrder[] sortOrders) + SortOrder[] sortOrders, + Index[] indexes) throws NoSuchSchemaException, TableAlreadyExistsException { NameIdentifier catalogIdent = getCatalogIdentifier(ident); doWithCatalog( @@ -436,7 +440,8 @@ public Table createTable( updatedProperties, partitions == null ? EMPTY_TRANSFORM : partitions, distribution == null ? Distributions.NONE : distribution, - sortOrders == null ? new SortOrder[0] : sortOrders)), + sortOrders == null ? new SortOrder[0] : sortOrders, + indexes == null ? Indexes.EMPTY_INDEXES : indexes)), NoSuchSchemaException.class, TableAlreadyExistsException.class); diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/rel/BaseTable.java b/core/src/main/java/com/datastrato/gravitino/catalog/rel/BaseTable.java index f5ee6008c31..f7b11d384da 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/rel/BaseTable.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/rel/BaseTable.java @@ -10,6 +10,7 @@ import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.indexes.Index; import java.util.Map; import javax.annotation.Nullable; import lombok.ToString; @@ -34,6 +35,8 @@ public abstract class BaseTable implements Table { @Nullable protected Distribution distribution; + @Nullable protected Index[] indexes; + /** Returns the audit details of the table. */ @Override public AuditInfo auditInfo() { @@ -87,6 +90,11 @@ public Distribution distribution() { return distribution; } + @Override + public Index[] index() { + return indexes; + } + /** * Builder interface for creating instances of {@link BaseTable}. * @@ -111,6 +119,8 @@ interface Builder, T extends BaseTable> { SELF withDistribution(Distribution distribution); + SELF withIndexes(Index[] indexes); + T build(); } @@ -131,6 +141,7 @@ public abstract static class BaseTableBuilder, T e protected SortOrder[] sortOrders; protected Distribution distribution; + protected Index[] indexes; /** * Sets the name of the table. @@ -214,6 +225,11 @@ public SELF withDistribution(Distribution distribution) { return (SELF) this; } + public SELF withIndexes(Index[] indexes) { + this.indexes = indexes; + return (SELF) this; + } + /** * Builds the instance of the table with the provided attributes. * diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/rel/EntityCombinedTable.java b/core/src/main/java/com/datastrato/gravitino/catalog/rel/EntityCombinedTable.java index 7b4c463d26b..84a705c0c49 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/rel/EntityCombinedTable.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/rel/EntityCombinedTable.java @@ -12,6 +12,7 @@ import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.indexes.Index; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -84,6 +85,11 @@ public Distribution distribution() { return table.distribution(); } + @Override + public Index[] index() { + return table.index(); + } + @Override public Audit auditInfo() { AuditInfo mergedAudit = diff --git a/core/src/test/java/com/datastrato/gravitino/TestCatalogOperations.java b/core/src/test/java/com/datastrato/gravitino/TestCatalogOperations.java index d81d4e118ff..3ae385cbae4 100644 --- a/core/src/test/java/com/datastrato/gravitino/TestCatalogOperations.java +++ b/core/src/test/java/com/datastrato/gravitino/TestCatalogOperations.java @@ -25,6 +25,7 @@ import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.indexes.Index; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import java.io.IOException; @@ -87,7 +88,8 @@ public Table createTable( Map properties, Transform[] partitions, Distribution distribution, - SortOrder[] sortOrders) + SortOrder[] sortOrders, + Index[] indexes) throws NoSuchSchemaException, TableAlreadyExistsException { AuditInfo auditInfo = new AuditInfo.Builder().withCreator("test").withCreateTime(Instant.now()).build(); @@ -102,6 +104,7 @@ public Table createTable( .withDistribution(distribution) .withSortOrders(sortOrders) .withPartitioning(partitions) + .withIndexes(indexes) .build(); if (tables.containsKey(ident)) { @@ -119,6 +122,7 @@ public Table createTable( .withDistribution(distribution) .withSortOrders(sortOrders) .withPartitioning(partitions) + .withIndexes(indexes) .build(); }