Skip to content

Commit

Permalink
[#1768] feat(hive): implement add partition for Hive catalog (#1792)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

 implement add partition for Hive catalog

### Why are the changes needed?

Fix: #1768 

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

tests added
  • Loading branch information
mchades authored Jan 30, 2024
1 parent 9215c37 commit 50415c3
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ public static Partition identity(
return new IdentityPartitionImpl(name, fieldNames, values, properties);
}

/**
* Creates an identity partition whose name will be automatically generated.
*
* @param fieldNames The field names of the identity partition.
* @param values The values of the identity partition.
* @return The created partition.
*/
public static Partition identity(String[][] fieldNames, Literal<?>[] values) {
return identity(null, fieldNames, values, null);
}

/** Represents a result of range partitioning. */
private static class RangePartitionImpl implements RangePartition {
private final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class HiveTable extends BaseTable {
Sets.newHashSet(MANAGED_TABLE.name(), EXTERNAL_TABLE.name());
private String schemaName;
private CachedClientPool clientPool;
private StorageDescriptor sd;

private HiveTable() {}

Expand Down Expand Up @@ -135,7 +136,8 @@ public static HiveTable.Builder fromHiveTable(Table table) {
table.getPartitionKeys().stream()
.map(p -> identity(p.getName()))
.toArray(Transform[]::new))
.withSchemaName(table.getDbName());
.withSchemaName(table.getDbName())
.withStorageDescriptor(table.getSd());
}

public CachedClientPool clientPool() {
Expand All @@ -153,6 +155,10 @@ public String schemaName() {
return schemaName;
}

public StorageDescriptor storageDescriptor() {
return sd;
}

private static Map<String, String> buildTableProperties(Table table) {
Map<String, String> properties = Maps.newHashMap(table.getParameters());

Expand Down Expand Up @@ -318,6 +324,7 @@ public static class Builder extends BaseTableBuilder<Builder, HiveTable> {

private String schemaName;
private CachedClientPool clientPool;
private StorageDescriptor sd;

/**
* Sets the Hive schema (database) name to be used for building the HiveTable.
Expand All @@ -330,6 +337,17 @@ public Builder withSchemaName(String schemaName) {
return this;
}

/**
* Sets the StorageDescriptor to be used for adding partition.
*
* @param sd The StorageDescriptor instance of the HiveTable.
* @return This Builder instance.
*/
public Builder withStorageDescriptor(StorageDescriptor sd) {
this.sd = sd;
return this;
}

/**
* Sets the HiveClientPool to be used for operate partition.
*
Expand Down Expand Up @@ -359,6 +377,7 @@ protected HiveTable internalBuild() {
hiveTable.partitioning = partitioning;
hiveTable.schemaName = schemaName;
hiveTable.clientPool = clientPool;
hiveTable.sd = sd;

// HMS put table comment in parameters
if (comment != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,21 @@
import com.datastrato.gravitino.rel.SupportsPartitions;
import com.datastrato.gravitino.rel.expressions.literals.Literal;
import com.datastrato.gravitino.rel.expressions.literals.Literals;
import com.datastrato.gravitino.rel.expressions.transforms.Transforms;
import com.datastrato.gravitino.rel.partitions.IdentityPartition;
import com.datastrato.gravitino.rel.partitions.Partition;
import com.datastrato.gravitino.rel.partitions.Partitions;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.parquet.Strings;
import org.apache.thrift.TException;

public class HiveTableOperations implements TableOperations, SupportsPartitions {
Expand Down Expand Up @@ -99,7 +107,91 @@ private String[][] getFieldNames(String partitionName) {

@Override
public Partition addPartition(Partition partition) throws PartitionAlreadyExistsException {
throw new UnsupportedOperationException();
Preconditions.checkArgument(
partition instanceof IdentityPartition, "Hive only supports identity partition");
IdentityPartition identityPartition = (IdentityPartition) partition;

Set<String> transformFields =
Arrays.stream(table.partitioning())
.map(t -> ((Transforms.IdentityTransform) t).fieldName()[0])
.collect(Collectors.toSet());

Preconditions.checkArgument(
transformFields.size() == identityPartition.fieldNames().length,
"Hive partition field names must be the same as table partitioning field names: %s, but got %s",
Strings.join(transformFields, ","),
Strings.join(
Arrays.stream(identityPartition.fieldNames())
.map(f -> Strings.join(f, "."))
.collect(Collectors.toList()),
","));
Arrays.stream(identityPartition.fieldNames())
.forEach(
f ->
Preconditions.checkArgument(
transformFields.contains(f[0]),
"Hive partition field name must be in table partitioning field names: %s, but got %s",
Strings.join(transformFields, ","),
f[0]));

try {
org.apache.hadoop.hive.metastore.api.Partition createdPartition =
table.clientPool().run(c -> c.add_partition(toHivePartition(identityPartition)));
return fromHivePartition(
generatePartitionName((IdentityPartition) partition), createdPartition);
} catch (TException | InterruptedException e) {
throw new RuntimeException(e);
}
}

private String generatePartitionName(IdentityPartition partition) {
Arrays.stream(partition.fieldNames())
.forEach(
fieldName ->
Preconditions.checkArgument(
fieldName.length == 1,
"Hive catalog does not support nested partition field names"));

// Hive partition name is in the format of "field1=value1/field2=value2/..."
return IntStream.range(0, partition.fieldNames().length)
.mapToObj(
i ->
partition.fieldNames()[i][0]
+ PARTITION_VALUE_DELIMITER
+ partition.values()[i].value().toString())
.collect(Collectors.joining(PARTITION_NAME_DELIMITER));
}

private org.apache.hadoop.hive.metastore.api.Partition toHivePartition(
IdentityPartition partition) {
org.apache.hadoop.hive.metastore.api.Partition hivePartition =
new org.apache.hadoop.hive.metastore.api.Partition();
hivePartition.setDbName(table.schemaName());
hivePartition.setTableName(table.name());

// todo: support custom serde and location if necessary
StorageDescriptor sd;
if (table.storageDescriptor() == null) {
// In theoretically, this should not happen because the Hive table will reload after creating
// in CatalogOperationDispatcher and the storage descriptor will be set. But in case of the
// Hive table is created by other ways(such as UT), we need to handle this.
sd = new StorageDescriptor();
sd.setSerdeInfo(new SerDeInfo());
} else {
sd = table.storageDescriptor().deepCopy();
// The location will be automatically generated by Hive Metastore
sd.setLocation(null);
}
hivePartition.setSd(sd);

hivePartition.setParameters(partition.properties());

hivePartition.setValues(
Arrays.stream(partition.values())
.map(l -> l.value().toString())
.collect(Collectors.toList()));

return hivePartition;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@
import com.datastrato.gravitino.catalog.hive.miniHMS.MiniHiveMetastoreService;
import com.datastrato.gravitino.exceptions.NoSuchPartitionException;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.expressions.literals.Literal;
import com.datastrato.gravitino.rel.expressions.literals.Literals;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.partitions.Partition;
import com.datastrato.gravitino.rel.partitions.Partitions;
import com.datastrato.gravitino.rel.types.Types;
import com.google.common.collect.Maps;
import java.time.LocalDate;
import java.util.Arrays;
import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -31,26 +37,42 @@ public class TestHiveTableOperations extends MiniHiveMetastoreService {
private static HiveCatalog hiveCatalog;
private static HiveSchema hiveSchema;
private static HiveTable hiveTable;
private static Column[] columns;
private static Partition existingPartition;

@BeforeAll
private static void setup() {
hiveCatalog = initHiveCatalog();
hiveSchema = initHiveSchema(hiveCatalog);
hiveTable = createPartitionedTable();

// add partition: city=0/dt=2020-01-01
String[] fieldCity = new String[] {columns[1].name()};
Literal<?> valueCity = Literals.byteLiteral((byte) 0);
String[] fieldDt = new String[] {columns[2].name()};
Literal<?> valueDt = Literals.dateLiteral(LocalDate.parse("2020-01-01"));
Partition partition =
Partitions.identity(
new String[][] {fieldCity, fieldDt}, new Literal<?>[] {valueCity, valueDt});

existingPartition = hiveTable.supportPartitions().addPartition(partition);
}

private static HiveTable createPartitionedTable() {
Map<String, String> properties = Maps.newHashMap();
properties.put("key1", "val1");
properties.put("key2", "val2");

HiveColumn col0 =
new HiveColumn.Builder().withName("name").withType(Types.StringType.get()).build();
HiveColumn col1 =
new HiveColumn.Builder().withName("city").withType(Types.ByteType.get()).build();
HiveColumn col2 =
new HiveColumn.Builder().withName("dt").withType(Types.DateType.get()).build();
Column[] columns = new Column[] {col1, col2};

Transform[] partitioning = new Transform[] {identity(col2.name())};
columns = new Column[] {col0, col1, col2};

Transform[] partitioning = new Transform[] {identity(col1.name()), identity(col2.name())};

return (HiveTable)
hiveCatalog
Expand All @@ -61,12 +83,17 @@ private static HiveTable createPartitionedTable() {
@Test
public void testListPartitionNames() {
String[] partitionNames = hiveTable.supportPartitions().listPartitionNames();
// TODO: update following assertion after implementing addPartition
Assertions.assertEquals(0, partitionNames.length);
// there maybe other partitions in the list, so we only check the added partition
Assertions.assertTrue(
partitionNames.length > 0
&& Arrays.asList(partitionNames).contains(existingPartition.name()));
}

@Test
public void testGetPartition() {
Partition partition = hiveTable.supportPartitions().getPartition(existingPartition.name());
Assertions.assertEquals(existingPartition, partition);

NoSuchPartitionException exception =
Assertions.assertThrows(
NoSuchPartitionException.class,
Expand All @@ -77,4 +104,49 @@ public void testGetPartition() {
"Hive partition does_not_exist_partition does not exist in Hive Metastore",
exception.getMessage());
}

@Test
public void testAddPartition() {
// add partition: city=1/dt=2020-01-01
String[] fieldCity = new String[] {columns[1].name()};
Literal<?> valueCity = Literals.byteLiteral((byte) 1);
String[] fieldDt = new String[] {columns[2].name()};
Literal<?> valueDt = Literals.dateLiteral(LocalDate.parse("2020-01-01"));
Partition partition =
Partitions.identity(
new String[][] {fieldCity, fieldDt}, new Literal<?>[] {valueCity, valueDt});

Partition addedPartition = hiveTable.supportPartitions().addPartition(partition);
Partition gotPartition = hiveTable.supportPartitions().getPartition(addedPartition.name());
Assertions.assertEquals(addedPartition, gotPartition);

// test exception
String[] field1 = new String[] {columns[0].name()};
Literal<?> value1 = Literals.byteLiteral((byte) 1);
Partition partition1 = Partitions.identity(new String[][] {field1}, new Literal<?>[] {value1});
IllegalArgumentException exception =
Assertions.assertThrows(
IllegalArgumentException.class,
() -> hiveTable.supportPartitions().addPartition(partition1));
Assertions.assertTrue(
exception
.getMessage()
.contains(
"Hive partition field names must be the same as table partitioning field names"),
exception.getMessage());

String[] field2 = new String[] {columns[1].name()};
Literal<?> value2 = Literals.byteLiteral((byte) 1);
Partition partition2 =
Partitions.identity(new String[][] {field1, field2}, new Literal<?>[] {value1, value2});
exception =
Assertions.assertThrows(
IllegalArgumentException.class,
() -> hiveTable.supportPartitions().addPartition(partition2));
Assertions.assertTrue(
exception
.getMessage()
.contains("Hive partition field name must be in table partitioning field names"),
exception.getMessage());
}
}
Loading

0 comments on commit 50415c3

Please sign in to comment.