diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/DefaultGlueColumnStatisticsProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/DefaultGlueColumnStatisticsProvider.java index 9da4d10feeb7b..7d3ad75661400 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/DefaultGlueColumnStatisticsProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/DefaultGlueColumnStatisticsProvider.java @@ -43,6 +43,8 @@ import io.trino.spi.statistics.ColumnStatisticType; import io.trino.spi.type.Type; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -60,6 +62,7 @@ import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.getHiveBasicStatistics; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.CompletableFuture.supplyAsync; import static java.util.stream.Collectors.toUnmodifiableList; public class DefaultGlueColumnStatisticsProvider @@ -99,7 +102,7 @@ public Map getTableColumnStatistics(Table table) List columnNames = getAllColumns(table); List> columnChunks = Lists.partition(columnNames, GLUE_COLUMN_READ_STAT_PAGE_SIZE); List> getStatsFutures = columnChunks.stream() - .map(partialColumns -> CompletableFuture.supplyAsync(() -> { + .map(partialColumns -> supplyAsync(() -> { GetColumnStatisticsForTableRequest request = new GetColumnStatisticsForTableRequest() .withCatalogId(catalogId) .withDatabaseName(table.getDatabaseName()) @@ -139,35 +142,48 @@ private Optional> getPartitionColumnStatistics } @Override - public Map getPartitionColumnStatistics(Partition partition) + public Map> getPartitionColumnStatistics(Collection partitions) { - try { + Map>> resultsForPartition = new HashMap<>(); + for (Partition partition : partitions) { + ImmutableList.Builder> futures = ImmutableList.builder(); List> columnChunks = Lists.partition(partition.getColumns(), GLUE_COLUMN_READ_STAT_PAGE_SIZE); - List> getStatsFutures = columnChunks.stream() - .map(partialColumns -> CompletableFuture.supplyAsync(() -> { - List columnsNames = partialColumns.stream() - .map(Column::getName) - .collect(toImmutableList()); - GetColumnStatisticsForPartitionRequest request = new GetColumnStatisticsForPartitionRequest() - .withCatalogId(catalogId) - .withDatabaseName(partition.getDatabaseName()) - .withTableName(partition.getTableName()) - .withColumnNames(columnsNames) - .withPartitionValues(partition.getValues()); - return glueClient.getColumnStatisticsForPartition(request); - }, readExecutor)).collect(toImmutableList()); + for (List partialPartitionColumns : columnChunks) { + List columnsNames = partialPartitionColumns.stream() + .map(Column::getName) + .collect(toImmutableList()); + GetColumnStatisticsForPartitionRequest request = new GetColumnStatisticsForPartitionRequest() + .withCatalogId(catalogId) + .withDatabaseName(partition.getDatabaseName()) + .withTableName(partition.getTableName()) + .withColumnNames(columnsNames) + .withPartitionValues(partition.getValues()); + futures.add(supplyAsync(() -> glueClient.getColumnStatisticsForPartition(request), readExecutor)); + } + resultsForPartition.put(partition, futures.build()); + } - HiveBasicStatistics tableStatistics = getHiveBasicStatistics(partition.getParameters()); - ImmutableMap.Builder columnStatsMapBuilder = ImmutableMap.builder(); - for (CompletableFuture future : getStatsFutures) { - GetColumnStatisticsForPartitionResult partitionColumnStats = getFutureValue(future, TrinoException.class); - for (ColumnStatistics columnStatistics : partitionColumnStats.getColumnStatisticsList()) { - columnStatsMapBuilder.put( - columnStatistics.getColumnName(), - fromGlueColumnStatistics(columnStatistics.getStatisticsData(), tableStatistics.getRowCount())); + try { + ImmutableMap.Builder> partitionStatistics = ImmutableMap.builder(); + for (Map.Entry>> entry : resultsForPartition.entrySet()) { + Partition partition = entry.getKey(); + List> futures = entry.getValue(); + + HiveBasicStatistics tableStatistics = getHiveBasicStatistics(partition.getParameters()); + ImmutableMap.Builder columnStatsMapBuilder = ImmutableMap.builder(); + + for (CompletableFuture getColumnStatisticsResultFuture : futures) { + GetColumnStatisticsForPartitionResult getColumnStatisticsResult = getFutureValue(getColumnStatisticsResultFuture); + getColumnStatisticsResult.getColumnStatisticsList().forEach(columnStatistics -> + columnStatsMapBuilder.put( + columnStatistics.getColumnName(), + fromGlueColumnStatistics(columnStatistics.getStatisticsData(), tableStatistics.getRowCount()))); } + + partitionStatistics.put(partition, columnStatsMapBuilder.build()); } - return columnStatsMapBuilder.build(); + + return partitionStatistics.build(); } catch (EntityNotFoundException ex) { throw new TrinoException(HIVE_PARTITION_NOT_FOUND, ex); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/DisabledGlueColumnStatisticsProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/DisabledGlueColumnStatisticsProvider.java index d288261667131..4a322724e85f6 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/DisabledGlueColumnStatisticsProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/DisabledGlueColumnStatisticsProvider.java @@ -22,10 +22,13 @@ import io.trino.spi.statistics.ColumnStatisticType; import io.trino.spi.type.Type; +import java.util.Collection; import java.util.Map; import java.util.Set; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.util.function.UnaryOperator.identity; public class DisabledGlueColumnStatisticsProvider implements GlueColumnStatisticsProvider @@ -43,9 +46,9 @@ public Map getTableColumnStatistics(Table table) } @Override - public Map getPartitionColumnStatistics(Partition partition) + public Map> getPartitionColumnStatistics(Collection partitions) { - return ImmutableMap.of(); + return partitions.stream().collect(toImmutableMap(identity(), partition -> ImmutableMap.of())); } @Override diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueColumnStatisticsProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueColumnStatisticsProvider.java index f405dfcb1bcf0..973eee327a7d1 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueColumnStatisticsProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueColumnStatisticsProvider.java @@ -13,12 +13,14 @@ */ package io.trino.plugin.hive.metastore.glue; +import com.google.common.collect.ImmutableSet; import io.trino.plugin.hive.metastore.HiveColumnStatistics; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.Table; import io.trino.spi.statistics.ColumnStatisticType; import io.trino.spi.type.Type; +import java.util.Collection; import java.util.Map; import java.util.Set; @@ -28,7 +30,12 @@ public interface GlueColumnStatisticsProvider Map getTableColumnStatistics(Table table); - Map getPartitionColumnStatistics(Partition partition); + Map> getPartitionColumnStatistics(Collection partitions); + + default Map getPartitionColumnStatistics(Partition partition) + { + return getPartitionColumnStatistics(ImmutableSet.of(partition)).get(partition); + } void updateTableColumnStatistics(Table table, Map columnStatistics); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java index b2ba0172261db..1314bf1dac692 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java @@ -105,6 +105,7 @@ import javax.inject.Inject; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -340,12 +341,18 @@ public PartitionStatistics getTableStatistics(HiveIdentity identity, Table table @Override public Map getPartitionStatistics(HiveIdentity identity, Table table, List partitions) { - return partitions.stream().collect(toImmutableMap(partition -> makePartitionName(table, partition), this::getPartitionStatistics)); + return getPartitionStatistics(partitions).entrySet().stream() + .collect(toImmutableMap( + entry -> makePartitionName(table, entry.getKey()), + Entry::getValue)); } - private PartitionStatistics getPartitionStatistics(Partition partition) + private Map getPartitionStatistics(Collection partitions) { - return new PartitionStatistics(getHiveBasicStatistics(partition.getParameters()), columnStatisticsProvider.getPartitionColumnStatistics(partition)); + return columnStatisticsProvider.getPartitionColumnStatistics(partitions).entrySet().stream() + .collect(toImmutableMap( + Entry::getKey, + entry -> new PartitionStatistics(getHiveBasicStatistics(entry.getKey().getParameters()), entry.getValue()))); } @Override @@ -384,7 +391,7 @@ public void updatePartitionStatistics(HiveIdentity identity, Table table, String Partition partition = getPartition(identity, table, partitionValues) .orElseThrow(() -> new TrinoException(HIVE_PARTITION_DROPPED_DURING_QUERY, "Statistics result does not contain entry for partition: " + partitionName)); - PartitionStatistics currentStatistics = getPartitionStatistics(partition); + PartitionStatistics currentStatistics = getPartitionStatistics(ImmutableList.of(partition)).get(partition); PartitionStatistics updatedStatistics = update.apply(currentStatistics); try {