diff --git a/api/src/main/java/com/datastrato/gravitino/rel/partitions/Partitions.java b/api/src/main/java/com/datastrato/gravitino/rel/partitions/Partitions.java index ecbfe6e43e3..2afd9eba6cf 100644 --- a/api/src/main/java/com/datastrato/gravitino/rel/partitions/Partitions.java +++ b/api/src/main/java/com/datastrato/gravitino/rel/partitions/Partitions.java @@ -58,6 +58,17 @@ public static Partition identity( return new IdentityPartitionImpl(name, fieldNames, values, properties); } + /** + * Creates an identity partition whose name will be automatically generated. + * + * @param fieldNames The field names of the identity partition. + * @param values The values of the identity partition. + * @return The created partition. + */ + public static Partition identity(String[][] fieldNames, Literal[] values) { + return identity(null, fieldNames, values, null); + } + /** Represents a result of range partitioning. */ private static class RangePartitionImpl implements RangePartition { private final String name; diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTable.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTable.java index c1520551f16..45c3fe88801 100644 --- a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTable.java +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTable.java @@ -63,6 +63,7 @@ public class HiveTable extends BaseTable { Sets.newHashSet(MANAGED_TABLE.name(), EXTERNAL_TABLE.name()); private String schemaName; private CachedClientPool clientPool; + private StorageDescriptor sd; private HiveTable() {} @@ -135,7 +136,8 @@ public static HiveTable.Builder fromHiveTable(Table table) { table.getPartitionKeys().stream() .map(p -> identity(p.getName())) .toArray(Transform[]::new)) - .withSchemaName(table.getDbName()); + .withSchemaName(table.getDbName()) + .withStorageDescriptor(table.getSd()); } public CachedClientPool clientPool() { @@ -153,6 +155,10 @@ public String schemaName() { return schemaName; } + public StorageDescriptor storageDescriptor() { + return sd; + } + private static Map buildTableProperties(Table table) { Map properties = Maps.newHashMap(table.getParameters()); @@ -318,6 +324,7 @@ public static class Builder extends BaseTableBuilder { private String schemaName; private CachedClientPool clientPool; + private StorageDescriptor sd; /** * Sets the Hive schema (database) name to be used for building the HiveTable. @@ -330,6 +337,17 @@ public Builder withSchemaName(String schemaName) { return this; } + /** + * Sets the StorageDescriptor to be used for adding partition. + * + * @param sd The StorageDescriptor instance of the HiveTable. + * @return This Builder instance. + */ + public Builder withStorageDescriptor(StorageDescriptor sd) { + this.sd = sd; + return this; + } + /** * Sets the HiveClientPool to be used for operate partition. * @@ -359,6 +377,7 @@ protected HiveTable internalBuild() { hiveTable.partitioning = partitioning; hiveTable.schemaName = schemaName; hiveTable.clientPool = clientPool; + hiveTable.sd = sd; // HMS put table comment in parameters if (comment != null) { diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTableOperations.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTableOperations.java index 7a6f02640e1..40244736650 100644 --- a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTableOperations.java +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveTableOperations.java @@ -11,13 +11,21 @@ import com.datastrato.gravitino.rel.SupportsPartitions; import com.datastrato.gravitino.rel.expressions.literals.Literal; import com.datastrato.gravitino.rel.expressions.literals.Literals; +import com.datastrato.gravitino.rel.expressions.transforms.Transforms; +import com.datastrato.gravitino.rel.partitions.IdentityPartition; import com.datastrato.gravitino.rel.partitions.Partition; import com.datastrato.gravitino.rel.partitions.Partitions; import com.google.common.base.Preconditions; import java.io.IOException; import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.parquet.Strings; import org.apache.thrift.TException; public class HiveTableOperations implements TableOperations, SupportsPartitions { @@ -99,7 +107,91 @@ private String[][] getFieldNames(String partitionName) { @Override public Partition addPartition(Partition partition) throws PartitionAlreadyExistsException { - throw new UnsupportedOperationException(); + Preconditions.checkArgument( + partition instanceof IdentityPartition, "Hive only supports identity partition"); + IdentityPartition identityPartition = (IdentityPartition) partition; + + Set transformFields = + Arrays.stream(table.partitioning()) + .map(t -> ((Transforms.IdentityTransform) t).fieldName()[0]) + .collect(Collectors.toSet()); + + Preconditions.checkArgument( + transformFields.size() == identityPartition.fieldNames().length, + "Hive partition field names must be the same as table partitioning field names: %s, but got %s", + Strings.join(transformFields, ","), + Strings.join( + Arrays.stream(identityPartition.fieldNames()) + .map(f -> Strings.join(f, ".")) + .collect(Collectors.toList()), + ",")); + Arrays.stream(identityPartition.fieldNames()) + .forEach( + f -> + Preconditions.checkArgument( + transformFields.contains(f[0]), + "Hive partition field name must be in table partitioning field names: %s, but got %s", + Strings.join(transformFields, ","), + f[0])); + + try { + org.apache.hadoop.hive.metastore.api.Partition createdPartition = + table.clientPool().run(c -> c.add_partition(toHivePartition(identityPartition))); + return fromHivePartition( + generatePartitionName((IdentityPartition) partition), createdPartition); + } catch (TException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private String generatePartitionName(IdentityPartition partition) { + Arrays.stream(partition.fieldNames()) + .forEach( + fieldName -> + Preconditions.checkArgument( + fieldName.length == 1, + "Hive catalog does not support nested partition field names")); + + // Hive partition name is in the format of "field1=value1/field2=value2/..." + return IntStream.range(0, partition.fieldNames().length) + .mapToObj( + i -> + partition.fieldNames()[i][0] + + PARTITION_VALUE_DELIMITER + + partition.values()[i].value().toString()) + .collect(Collectors.joining(PARTITION_NAME_DELIMITER)); + } + + private org.apache.hadoop.hive.metastore.api.Partition toHivePartition( + IdentityPartition partition) { + org.apache.hadoop.hive.metastore.api.Partition hivePartition = + new org.apache.hadoop.hive.metastore.api.Partition(); + hivePartition.setDbName(table.schemaName()); + hivePartition.setTableName(table.name()); + + // todo: support custom serde and location if necessary + StorageDescriptor sd; + if (table.storageDescriptor() == null) { + // In theoretically, this should not happen because the Hive table will reload after creating + // in CatalogOperationDispatcher and the storage descriptor will be set. But in case of the + // Hive table is created by other ways(such as UT), we need to handle this. + sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo()); + } else { + sd = table.storageDescriptor().deepCopy(); + // The location will be automatically generated by Hive Metastore + sd.setLocation(null); + } + hivePartition.setSd(sd); + + hivePartition.setParameters(partition.properties()); + + hivePartition.setValues( + Arrays.stream(partition.values()) + .map(l -> l.value().toString()) + .collect(Collectors.toList())); + + return hivePartition; } @Override diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveTableOperations.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveTableOperations.java index 2f250b78dff..2282db0a743 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveTableOperations.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveTableOperations.java @@ -16,9 +16,15 @@ import com.datastrato.gravitino.catalog.hive.miniHMS.MiniHiveMetastoreService; import com.datastrato.gravitino.exceptions.NoSuchPartitionException; import com.datastrato.gravitino.rel.Column; +import com.datastrato.gravitino.rel.expressions.literals.Literal; +import com.datastrato.gravitino.rel.expressions.literals.Literals; import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.partitions.Partition; +import com.datastrato.gravitino.rel.partitions.Partitions; import com.datastrato.gravitino.rel.types.Types; import com.google.common.collect.Maps; +import java.time.LocalDate; +import java.util.Arrays; import java.util.Map; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -31,12 +37,25 @@ public class TestHiveTableOperations extends MiniHiveMetastoreService { private static HiveCatalog hiveCatalog; private static HiveSchema hiveSchema; private static HiveTable hiveTable; + private static Column[] columns; + private static Partition existingPartition; @BeforeAll private static void setup() { hiveCatalog = initHiveCatalog(); hiveSchema = initHiveSchema(hiveCatalog); hiveTable = createPartitionedTable(); + + // add partition: city=0/dt=2020-01-01 + String[] fieldCity = new String[] {columns[1].name()}; + Literal valueCity = Literals.byteLiteral((byte) 0); + String[] fieldDt = new String[] {columns[2].name()}; + Literal valueDt = Literals.dateLiteral(LocalDate.parse("2020-01-01")); + Partition partition = + Partitions.identity( + new String[][] {fieldCity, fieldDt}, new Literal[] {valueCity, valueDt}); + + existingPartition = hiveTable.supportPartitions().addPartition(partition); } private static HiveTable createPartitionedTable() { @@ -44,13 +63,16 @@ private static HiveTable createPartitionedTable() { properties.put("key1", "val1"); properties.put("key2", "val2"); + HiveColumn col0 = + new HiveColumn.Builder().withName("name").withType(Types.StringType.get()).build(); HiveColumn col1 = new HiveColumn.Builder().withName("city").withType(Types.ByteType.get()).build(); HiveColumn col2 = new HiveColumn.Builder().withName("dt").withType(Types.DateType.get()).build(); - Column[] columns = new Column[] {col1, col2}; - Transform[] partitioning = new Transform[] {identity(col2.name())}; + columns = new Column[] {col0, col1, col2}; + + Transform[] partitioning = new Transform[] {identity(col1.name()), identity(col2.name())}; return (HiveTable) hiveCatalog @@ -61,12 +83,17 @@ private static HiveTable createPartitionedTable() { @Test public void testListPartitionNames() { String[] partitionNames = hiveTable.supportPartitions().listPartitionNames(); - // TODO: update following assertion after implementing addPartition - Assertions.assertEquals(0, partitionNames.length); + // there maybe other partitions in the list, so we only check the added partition + Assertions.assertTrue( + partitionNames.length > 0 + && Arrays.asList(partitionNames).contains(existingPartition.name())); } @Test public void testGetPartition() { + Partition partition = hiveTable.supportPartitions().getPartition(existingPartition.name()); + Assertions.assertEquals(existingPartition, partition); + NoSuchPartitionException exception = Assertions.assertThrows( NoSuchPartitionException.class, @@ -77,4 +104,49 @@ public void testGetPartition() { "Hive partition does_not_exist_partition does not exist in Hive Metastore", exception.getMessage()); } + + @Test + public void testAddPartition() { + // add partition: city=1/dt=2020-01-01 + String[] fieldCity = new String[] {columns[1].name()}; + Literal valueCity = Literals.byteLiteral((byte) 1); + String[] fieldDt = new String[] {columns[2].name()}; + Literal valueDt = Literals.dateLiteral(LocalDate.parse("2020-01-01")); + Partition partition = + Partitions.identity( + new String[][] {fieldCity, fieldDt}, new Literal[] {valueCity, valueDt}); + + Partition addedPartition = hiveTable.supportPartitions().addPartition(partition); + Partition gotPartition = hiveTable.supportPartitions().getPartition(addedPartition.name()); + Assertions.assertEquals(addedPartition, gotPartition); + + // test exception + String[] field1 = new String[] {columns[0].name()}; + Literal value1 = Literals.byteLiteral((byte) 1); + Partition partition1 = Partitions.identity(new String[][] {field1}, new Literal[] {value1}); + IllegalArgumentException exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> hiveTable.supportPartitions().addPartition(partition1)); + Assertions.assertTrue( + exception + .getMessage() + .contains( + "Hive partition field names must be the same as table partitioning field names"), + exception.getMessage()); + + String[] field2 = new String[] {columns[1].name()}; + Literal value2 = Literals.byteLiteral((byte) 1); + Partition partition2 = + Partitions.identity(new String[][] {field1, field2}, new Literal[] {value1, value2}); + exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> hiveTable.supportPartitions().addPartition(partition2)); + Assertions.assertTrue( + exception + .getMessage() + .contains("Hive partition field name must be in table partitioning field names"), + exception.getMessage()); + } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/hive/CatalogHiveIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/hive/CatalogHiveIT.java index e76f3bc6eed..85ee50593ea 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/hive/CatalogHiveIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/hive/CatalogHiveIT.java @@ -57,6 +57,8 @@ import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.distributions.Distributions; import com.datastrato.gravitino.rel.expressions.distributions.Strategy; +import com.datastrato.gravitino.rel.expressions.literals.Literal; +import com.datastrato.gravitino.rel.expressions.literals.Literals; import com.datastrato.gravitino.rel.expressions.sorts.NullOrdering; import com.datastrato.gravitino.rel.expressions.sorts.SortDirection; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; @@ -64,10 +66,13 @@ import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.expressions.transforms.Transforms; import com.datastrato.gravitino.rel.partitions.IdentityPartition; +import com.datastrato.gravitino.rel.partitions.Partition; +import com.datastrato.gravitino.rel.partitions.Partitions; import com.datastrato.gravitino.rel.types.Types; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import java.io.IOException; +import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -697,6 +702,56 @@ public void testGetPartition() throws TException, InterruptedException { Assertions.assertEquals(partition.properties(), hivePartition.getParameters()); } + @Test + public void testAddPartition() throws TException, InterruptedException { + Table createdTable = preparePartitionedTable(); + + // add partition "hive_col_name2=2023-01-02/hive_col_name3=gravitino_it_test2" + String[] field1 = new String[] {"hive_col_name2"}; + String[] field2 = new String[] {"hive_col_name3"}; + Literal literal1 = Literals.dateLiteral(LocalDate.parse("2023-01-02")); + Literal literal2 = Literals.stringLiteral("gravitino_it_test2"); + + Partition identity = + Partitions.identity(new String[][] {field1, field2}, new Literal[] {literal1, literal2}); + IdentityPartition partitionAdded = + (IdentityPartition) createdTable.supportPartitions().addPartition(identity); + + // Directly get partition from hive metastore to check if the partition is created successfully. + org.apache.hadoop.hive.metastore.api.Partition partitionGot = + hiveClientPool.run( + client -> client.getPartition(schemaName, createdTable.name(), partitionAdded.name())); + Assertions.assertEquals( + partitionAdded.values()[0].value().toString(), partitionGot.getValues().get(0)); + Assertions.assertEquals( + partitionAdded.values()[1].value().toString(), partitionGot.getValues().get(1)); + Assertions.assertEquals(partitionAdded.properties(), partitionGot.getParameters()); + + // test the new partition can be read and write successfully by dynamic partition + String selectTemplate = + "SELECT * FROM %s.%s WHERE hive_col_name2 = '2023-01-02' AND hive_col_name3 = 'gravitino_it_test2'"; + long count = + sparkSession.sql(String.format(selectTemplate, schemaName, createdTable.name())).count(); + Assertions.assertEquals(0, count); + + String insertTemplate = + "INSERT INTO TABLE %s.%s PARTITION (hive_col_name2='2023-01-02', hive_col_name3) VALUES (%s, %s)"; + sparkSession.sql( + String.format( + insertTemplate, schemaName, createdTable.name(), "1", "'gravitino_it_test2'")); + count = + sparkSession.sql(String.format(selectTemplate, schemaName, createdTable.name())).count(); + Assertions.assertEquals(1, count); + + // test the new partition can be read and write successfully by static partition + String insertTemplate2 = + "INSERT INTO TABLE %s.%s PARTITION (hive_col_name2='2023-01-02', hive_col_name3='gravitino_it_test2') VALUES (%s)"; + sparkSession.sql(String.format(insertTemplate2, schemaName, createdTable.name(), "2")); + count = + sparkSession.sql(String.format(selectTemplate, schemaName, createdTable.name())).count(); + Assertions.assertEquals(2, count); + } + private Table preparePartitionedTable() throws TException, InterruptedException { ColumnDTO[] columns = createColumns();