Skip to content

Commit

Permalink
HIVE-28601: Leverage configurable getPartitions API in HMS to decreas…
Browse files Browse the repository at this point in the history
…e memory footprint in HS2
  • Loading branch information
Araika committed Nov 13, 2024
1 parent 3482c0b commit 9863caf
Show file tree
Hide file tree
Showing 13 changed files with 530 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.api.PartitionFilterMode;
import org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils;
import org.slf4j.Logger;
Expand All @@ -34,6 +40,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Formats SHOW TABLE STATUS results.
Expand Down Expand Up @@ -89,7 +96,25 @@ List<Path> getLocations(Hive db, Partition partition, Table table) throws HiveEx
List<Path> locations = new ArrayList<Path>();
if (table.isPartitioned()) {
if (partition == null) {
for (Partition currPartition : db.getPartitions(table)) {
List<Partition> partitions = null;
Map<String, Table> tempTables = SessionHiveMetaStoreClient.getTempTablesForDatabase(table.getDbName(), table.getTableName());

if(tempTables != null && tempTables.containsKey(table.getTableName())){
partitions = db.getPartitions(table);
} else {
GetProjectionsSpec getProjectionsSpec = new GetPartitionProjectionsSpecBuilder()
.addProjectField("catName").addProjectField("dbName").addProjectField("tableName")
.addProjectField("sd.location").build();
GetPartitionsRequest request = new GetPartitionsRequest(table.getDbName(), table.getTableName(), getProjectionsSpec, new GetPartitionsFilterSpec());
request.setCatName(table.getCatName());
try{
partitions = db.getPartitionsWithSpecs(table, request);
} catch (Exception e){
throw new HiveException(e);
}
}

for (Partition currPartition : partitions) {
if (currPartition.getLocation() != null) {
locations.add(new Path(currPartition.getLocation()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
Expand Down Expand Up @@ -119,6 +120,20 @@ public static List<Partition> getPartitions(Hive db, Table table, Map<String, St
return partitions;
}

public static List<Partition> getPartitionsWithSpecs(Hive db, Table table, GetPartitionsRequest request,
boolean throwException) throws SemanticException {
List<Partition> partitions = null;
try {
partitions = db.getPartitionsWithSpecs(table, request);
} catch (Exception e) {
throw new SemanticException(toMessage(ErrorMsg.INVALID_PARTITION, request.getFilterSpec()), e);
}
if (partitions.isEmpty() && throwException) {
throw new SemanticException(toMessage(ErrorMsg.INVALID_PARTITION, request.getFilterSpec()));
}
return partitions;
}

private static String toMessage(ErrorMsg message, Object detail) {
return detail == null ? message.getMsg() : message.getMsg(detail.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@
import java.util.List;
import java.util.Map;

import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.api.PartitionFilterMode;
import org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
Expand All @@ -36,6 +42,7 @@
import org.apache.hadoop.hive.ql.hooks.WriteEntity.WriteType;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveParser;
Expand Down Expand Up @@ -77,9 +84,31 @@ protected void analyzeCommand(TableName tableName, Map<String, String> partition
if (AcidUtils.isTransactionalTable(sourceTable) || AcidUtils.isTransactionalTable(destTable)) {
throw new SemanticException(ErrorMsg.EXCHANGE_PARTITION_NOT_ALLOWED_WITH_TRANSACTIONAL_TABLES.getMsg());
}
Map<String, Table> sourceTempTables = SessionHiveMetaStoreClient.getTempTablesForDatabase(sourceTable.getDbName(), sourceTable.getTableName());
Map<String, Table> destTempTables = SessionHiveMetaStoreClient.getTempTablesForDatabase(destTable.getDbName(), destTable.getTableName());
List<String> projectFilters = MetaStoreUtils.getPvals(sourceTable.getPartCols(), partitionSpecs);

// check if source partition exists
PartitionUtils.getPartitions(db, sourceTable, partitionSpecs, true);
if(sourceTempTables != null && sourceTempTables.containsKey(sourceTable.getTableName())){
PartitionUtils.getPartitions(db, sourceTable, partitionSpecs, true);
} else {
GetPartitionsFilterSpec getPartitionsFilterSpec = new GetPartitionsFilterSpec();
getPartitionsFilterSpec.setFilters(projectFilters);
getPartitionsFilterSpec.setFilterMode(PartitionFilterMode.BY_VALUES);

GetProjectionsSpec getProjectionsSpec = new GetPartitionProjectionsSpecBuilder()
.addProjectField("catName").addProjectField("dbName").addProjectField("tableName")
.addProjectField("lastAccessTime").addProjectField("values").build();

GetPartitionsRequest request = new GetPartitionsRequest(sourceTable.getDbName(), sourceTable.getTableName(),
getProjectionsSpec, getPartitionsFilterSpec);
request.setCatName(sourceTable.getCatName());
try {
PartitionUtils.getPartitionsWithSpecs(db, sourceTable, request, true);
} catch (SemanticException ex) {
throw ex;
}
}

// Verify that the partitions specified are continuous
// If a subpartition value is specified without specifying a partition's value then we throw an exception
Expand All @@ -90,11 +119,26 @@ protected void analyzeCommand(TableName tableName, Map<String, String> partition

List<Partition> destPartitions = null;
try {
destPartitions = PartitionUtils.getPartitions(db, destTable, partitionSpecs, true);
if(destTempTables != null && destTempTables.containsKey(destTable.getTableName())){
destPartitions = PartitionUtils.getPartitions(db, destTable, partitionSpecs, true);
}else{
GetPartitionsFilterSpec getPartitionsFilterSpec = new GetPartitionsFilterSpec();
getPartitionsFilterSpec.setFilters(projectFilters);
getPartitionsFilterSpec.setFilterMode(PartitionFilterMode.BY_VALUES);

GetProjectionsSpec getProjectionsSpec = new GetPartitionProjectionsSpecBuilder()
.addProjectField("catName").addProjectField("dbName").addProjectField("tableName")
.addProjectField("lastAccessTime").addProjectField("values").build();

GetPartitionsRequest destRequest = new GetPartitionsRequest(destTable.getDbName(), destTable.getTableName(),
getProjectionsSpec, getPartitionsFilterSpec);
destRequest.setCatName(destTable.getCatName());
destPartitions = PartitionUtils.getPartitionsWithSpecs(db, destTable, destRequest, true);
}
} catch (SemanticException ex) {
// We should expect a semantic exception being throw as this partition should not be present.
}
if (destPartitions != null) {
if (CollectionUtils.isNotEmpty(destPartitions)) {
// If any destination partition is present then throw a Semantic Exception.
throw new SemanticException(ErrorMsg.PARTITION_EXISTS.getMsg(destPartitions.toString()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
Expand All @@ -37,6 +41,7 @@
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.thrift.TException;

/**
* Abstract ancestor of analyzers that can create a view.
Expand Down Expand Up @@ -113,8 +118,14 @@ protected void validateReplaceWithPartitions(String viewName, Table oldView, Lis
String partitionViewErrorMsg = "The following view has partition, it could not be replaced: " + viewName;
List<Partition> partitions = null;
try {
partitions = db.getPartitions(oldView);
} catch (HiveException e) {
GetProjectionsSpec getProjectionsSpec = new GetPartitionProjectionsSpecBuilder()
.addProjectField("catName").addProjectField("dbName").addProjectField("tableName")
.addProjectField("values").build();

GetPartitionsRequest request = new GetPartitionsRequest(oldView.getDbName(), oldView.getTableName(), getProjectionsSpec, new GetPartitionsFilterSpec());
request.setCatName(oldView.getCatName());
partitions = db.getPartitionsWithSpecs(oldView, request);
} catch (HiveException | TException e) {
throw new SemanticException(ErrorMsg.REPLACE_VIEW_WITH_PARTITION.getMsg(partitionViewErrorMsg));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
Expand All @@ -44,6 +48,7 @@
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -397,7 +402,18 @@ private static ArrayList<String> getListing(String dbName, String tableName, Hiv
// Check if the table is partitioned, in case the table is partitioned we need to check for the partitions
// listing as well.
if (table.isPartitioned()) {
List<Partition> partitions = hiveDb.getPartitions(table);
GetProjectionsSpec getProjectionsSpec = new GetPartitionProjectionsSpecBuilder()
.addProjectField("catName").addProjectField("dbName").addProjectField("tableName")
.addProjectField("sd.location").build();
GetPartitionsRequest request = new GetPartitionsRequest(table.getDbName(), table.getTableName(),
getProjectionsSpec, new GetPartitionsFilterSpec());
request.setCatName(table.getCatName());
List<Partition> partitions = null;
try {
partitions = hiveDb.getPartitionsWithSpecs(table, request);
} catch (TException e) {
throw new HiveException(e);
}
for (Partition part : partitions) {
Path partPath = part.getDataLocation();
// Build listing for the partition only if it doesn't lies within the table location, else it would have been
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
Expand All @@ -36,6 +38,7 @@
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -98,14 +101,21 @@ void dataLocationDump(Table table, FileList fileList, HashMap<String, Boolean> s
if (table.isPartitioned()) {
List<Partition> partitions;
try {
partitions = Hive.get(hiveConf).getPartitions(table);
} catch (HiveException e) {
GetPartitionsRequest request = new GetPartitionsRequest(table.getDbName(), table.getTableName(),
null, new GetPartitionsFilterSpec());
request.setCatName(table.getCatName());
request.setProjectionSpec(
new org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder()
.addProjectField("catName").addProjectField("tableName")
.addProjectField("dbName").addProjectField("sd.location").build());
partitions = Hive.get(hiveConf).getPartitionsWithSpecs(table, request);
} catch (HiveException | TException e) {
if (e.getCause() instanceof NoSuchObjectException) {
// If table is dropped when dump in progress, just skip partitions data location dump
LOG.debug(e.getMessage());
return;
}
throw e;
throw new HiveException(e);
}

for (Partition partition : partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.ql.ddl.DDLWork;
import org.apache.hadoop.hive.ql.ddl.table.partition.add.AlterTableAddPartitionDesc;
import org.apache.hadoop.hive.ql.ddl.table.partition.drop.AlterTableDropPartitionDesc;
Expand Down Expand Up @@ -137,7 +141,12 @@ public TaskTracker tasks() throws Exception {
if (tablesToBootstrap.stream().anyMatch(table.getTableName()::equalsIgnoreCase)) {
Hive hiveDb = Hive.get(context.hiveConf);
// Collect the non-existing partitions to drop.
List<Partition> partitions = hiveDb.getPartitions(table);
GetProjectionsSpec getProjectionsSpec = new GetPartitionProjectionsSpecBuilder()
.addProjectField("catName").addProjectField("tableName").addProjectField("dbName")
.addProjectField("values").build();
GetPartitionsRequest request = new GetPartitionsRequest(table.getDbName(), table.getTableName(),
getProjectionsSpec, new GetPartitionsFilterSpec());
List<Partition> partitions = hiveDb.getPartitionsWithSpecs(table, request);
List<String> newParts = event.partitions(tableDesc);
for (Partition part : partitions) {
if (!newParts.contains(part.getName())) {
Expand Down
Loading

0 comments on commit 9863caf

Please sign in to comment.