Skip to content

Commit

Permalink
Support reading multiple Glue partition statistics in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
alexjo2144 committed Aug 12, 2021
1 parent 20f9c48 commit 344badf
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import io.trino.spi.statistics.ColumnStatisticType;
import io.trino.spi.type.Type;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -60,6 +62,7 @@
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.getHiveBasicStatistics;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.toUnmodifiableList;

public class DefaultGlueColumnStatisticsProvider
Expand Down Expand Up @@ -99,7 +102,7 @@ public Map<String, HiveColumnStatistics> getTableColumnStatistics(Table table)
List<String> columnNames = getAllColumns(table);
List<List<String>> columnChunks = Lists.partition(columnNames, GLUE_COLUMN_READ_STAT_PAGE_SIZE);
List<CompletableFuture<GetColumnStatisticsForTableResult>> getStatsFutures = columnChunks.stream()
.map(partialColumns -> CompletableFuture.supplyAsync(() -> {
.map(partialColumns -> supplyAsync(() -> {
GetColumnStatisticsForTableRequest request = new GetColumnStatisticsForTableRequest()
.withCatalogId(catalogId)
.withDatabaseName(table.getDatabaseName())
Expand Down Expand Up @@ -139,35 +142,48 @@ private Optional<Map<String, HiveColumnStatistics>> getPartitionColumnStatistics
}

@Override
public Map<String, HiveColumnStatistics> getPartitionColumnStatistics(Partition partition)
public Map<Partition, Map<String, HiveColumnStatistics>> getPartitionColumnStatistics(Collection<Partition> partitions)
{
try {
Map<Partition, List<CompletableFuture<GetColumnStatisticsForPartitionResult>>> resultsForPartition = new HashMap<>();
for (Partition partition : partitions) {
ImmutableList.Builder<CompletableFuture<GetColumnStatisticsForPartitionResult>> futures = ImmutableList.builder();
List<List<Column>> columnChunks = Lists.partition(partition.getColumns(), GLUE_COLUMN_READ_STAT_PAGE_SIZE);
List<CompletableFuture<GetColumnStatisticsForPartitionResult>> getStatsFutures = columnChunks.stream()
.map(partialColumns -> CompletableFuture.supplyAsync(() -> {
List<String> columnsNames = partialColumns.stream()
.map(Column::getName)
.collect(toImmutableList());
GetColumnStatisticsForPartitionRequest request = new GetColumnStatisticsForPartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(partition.getDatabaseName())
.withTableName(partition.getTableName())
.withColumnNames(columnsNames)
.withPartitionValues(partition.getValues());
return glueClient.getColumnStatisticsForPartition(request);
}, readExecutor)).collect(toImmutableList());
for (List<Column> partialPartitionColumns : columnChunks) {
List<String> columnsNames = partialPartitionColumns.stream()
.map(Column::getName)
.collect(toImmutableList());
GetColumnStatisticsForPartitionRequest request = new GetColumnStatisticsForPartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(partition.getDatabaseName())
.withTableName(partition.getTableName())
.withColumnNames(columnsNames)
.withPartitionValues(partition.getValues());
futures.add(supplyAsync(() -> glueClient.getColumnStatisticsForPartition(request), readExecutor));
}
resultsForPartition.put(partition, futures.build());
}

HiveBasicStatistics tableStatistics = getHiveBasicStatistics(partition.getParameters());
ImmutableMap.Builder<String, HiveColumnStatistics> columnStatsMapBuilder = ImmutableMap.builder();
for (CompletableFuture<GetColumnStatisticsForPartitionResult> future : getStatsFutures) {
GetColumnStatisticsForPartitionResult partitionColumnStats = getFutureValue(future, TrinoException.class);
for (ColumnStatistics columnStatistics : partitionColumnStats.getColumnStatisticsList()) {
columnStatsMapBuilder.put(
columnStatistics.getColumnName(),
fromGlueColumnStatistics(columnStatistics.getStatisticsData(), tableStatistics.getRowCount()));
try {
ImmutableMap.Builder<Partition, Map<String, HiveColumnStatistics>> partitionStatistics = ImmutableMap.builder();
for (Map.Entry<Partition, List<CompletableFuture<GetColumnStatisticsForPartitionResult>>> entry : resultsForPartition.entrySet()) {
Partition partition = entry.getKey();
List<CompletableFuture<GetColumnStatisticsForPartitionResult>> futures = entry.getValue();

HiveBasicStatistics tableStatistics = getHiveBasicStatistics(partition.getParameters());
ImmutableMap.Builder<String, HiveColumnStatistics> columnStatsMapBuilder = ImmutableMap.builder();

for (CompletableFuture<GetColumnStatisticsForPartitionResult> getColumnStatisticsResultFuture : futures) {
GetColumnStatisticsForPartitionResult getColumnStatisticsResult = getFutureValue(getColumnStatisticsResultFuture);
getColumnStatisticsResult.getColumnStatisticsList().forEach(columnStatistics ->
columnStatsMapBuilder.put(
columnStatistics.getColumnName(),
fromGlueColumnStatistics(columnStatistics.getStatisticsData(), tableStatistics.getRowCount())));
}

partitionStatistics.put(partition, columnStatsMapBuilder.build());
}
return columnStatsMapBuilder.build();

return partitionStatistics.build();
}
catch (EntityNotFoundException ex) {
throw new TrinoException(HIVE_PARTITION_NOT_FOUND, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import io.trino.spi.statistics.ColumnStatisticType;
import io.trino.spi.type.Type;

import java.util.Collection;
import java.util.Map;
import java.util.Set;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.function.UnaryOperator.identity;

public class DisabledGlueColumnStatisticsProvider
implements GlueColumnStatisticsProvider
Expand All @@ -43,9 +46,9 @@ public Map<String, HiveColumnStatistics> getTableColumnStatistics(Table table)
}

@Override
public Map<String, HiveColumnStatistics> getPartitionColumnStatistics(Partition partition)
public Map<Partition, Map<String, HiveColumnStatistics>> getPartitionColumnStatistics(Collection<Partition> partitions)
{
return ImmutableMap.of();
return partitions.stream().collect(toImmutableMap(identity(), partition -> ImmutableMap.of()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
*/
package io.trino.plugin.hive.metastore.glue;

import com.google.common.collect.ImmutableSet;
import io.trino.plugin.hive.metastore.HiveColumnStatistics;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.statistics.ColumnStatisticType;
import io.trino.spi.type.Type;

import java.util.Collection;
import java.util.Map;
import java.util.Set;

Expand All @@ -28,7 +30,12 @@ public interface GlueColumnStatisticsProvider

Map<String, HiveColumnStatistics> getTableColumnStatistics(Table table);

Map<String, HiveColumnStatistics> getPartitionColumnStatistics(Partition partition);
Map<Partition, Map<String, HiveColumnStatistics>> getPartitionColumnStatistics(Collection<Partition> partitions);

default Map<String, HiveColumnStatistics> getPartitionColumnStatistics(Partition partition)
{
return getPartitionColumnStatistics(ImmutableSet.of(partition)).get(partition);
}

void updateTableColumnStatistics(Table table, Map<String, HiveColumnStatistics> columnStatistics);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import javax.inject.Inject;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -340,12 +341,18 @@ public PartitionStatistics getTableStatistics(HiveIdentity identity, Table table
@Override
public Map<String, PartitionStatistics> getPartitionStatistics(HiveIdentity identity, Table table, List<Partition> partitions)
{
return partitions.stream().collect(toImmutableMap(partition -> makePartitionName(table, partition), this::getPartitionStatistics));
return getPartitionStatistics(partitions).entrySet().stream()
.collect(toImmutableMap(
entry -> makePartitionName(table, entry.getKey()),
Entry::getValue));
}

private PartitionStatistics getPartitionStatistics(Partition partition)
private Map<Partition, PartitionStatistics> getPartitionStatistics(Collection<Partition> partitions)
{
return new PartitionStatistics(getHiveBasicStatistics(partition.getParameters()), columnStatisticsProvider.getPartitionColumnStatistics(partition));
return columnStatisticsProvider.getPartitionColumnStatistics(partitions).entrySet().stream()
.collect(toImmutableMap(
Entry::getKey,
entry -> new PartitionStatistics(getHiveBasicStatistics(entry.getKey().getParameters()), entry.getValue())));
}

@Override
Expand Down Expand Up @@ -384,7 +391,7 @@ public void updatePartitionStatistics(HiveIdentity identity, Table table, String
Partition partition = getPartition(identity, table, partitionValues)
.orElseThrow(() -> new TrinoException(HIVE_PARTITION_DROPPED_DURING_QUERY, "Statistics result does not contain entry for partition: " + partitionName));

PartitionStatistics currentStatistics = getPartitionStatistics(partition);
PartitionStatistics currentStatistics = getPartitionStatistics(ImmutableList.of(partition)).get(partition);
PartitionStatistics updatedStatistics = update.apply(currentStatistics);

try {
Expand Down

0 comments on commit 344badf

Please sign in to comment.