Skip to content

Commit

Permalink
Optimize partitioned table write for Presto on Spark
Browse files Browse the repository at this point in the history
When writing to a partitioned (bucketed) table ensure that each writer
node has enough buckets to write to efficiently utilize all available
concurrent threads
  • Loading branch information
arhimondr committed Apr 15, 2021
1 parent d35dbfe commit 33cdd9b
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public final class SystemSessionProperties
public static final String MAX_UNACKNOWLEDGED_SPLITS_PER_TASK = "max_unacknowledged_splits_per_task";
public static final String OPTIMIZE_JOINS_WITH_EMPTY_SOURCES = "optimize_joins_with_empty_sources";
public static final String SPOOLING_OUTPUT_BUFFER_ENABLED = "spooling_output_buffer_enabled";
public static final String SPARK_ASSIGN_BUCKET_TO_PARTITION_FOR_PARTITIONED_TABLE_WRITE_ENABLED = "spark_assign_bucket_to_partition_for_partitioned_table_write_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -932,7 +933,12 @@ public SystemSessionProperties(
SPOOLING_OUTPUT_BUFFER_ENABLED,
"Enable spooling output buffer for terminal task",
featuresConfig.isSpoolingOutputBufferEnabled(),
false));
false),
booleanProperty(
SPARK_ASSIGN_BUCKET_TO_PARTITION_FOR_PARTITIONED_TABLE_WRITE_ENABLED,
"Assign bucket to partition map for partitioned table write when adding an exchange",
featuresConfig.isPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled(),
true));
}

public static boolean isEmptyJoinOptimization(Session session)
Expand Down Expand Up @@ -1578,4 +1584,9 @@ public static int getMaxUnacknowledgedSplitsPerTask(Session session)
{
return session.getSystemProperty(MAX_UNACKNOWLEDGED_SPLITS_PER_TASK, Integer.class);
}

public static boolean isPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled(Session session)
{
return session.getSystemProperty(SPARK_ASSIGN_BUCKET_TO_PARTITION_FOR_PARTITIONED_TABLE_WRITE_ENABLED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public class FeaturesConfig
private PartitioningPrecisionStrategy partitioningPrecisionStrategy = PartitioningPrecisionStrategy.AUTOMATIC;

private boolean enforceFixedDistributionForOutputOperator;
private boolean prestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled;

public enum PartitioningPrecisionStrategy
{
Expand Down Expand Up @@ -1583,4 +1584,16 @@ public FeaturesConfig setSpoolingOutputBufferTempStorage(String spoolingOutputBu
this.spoolingOutputBufferTempStorage = spoolingOutputBufferTempStorage;
return this;
}

public boolean isPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled()
{
return prestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled;
}

@Config("spark.assign-bucket-to-partition-for-partitioned-table-write-enabled")
public FeaturesConfig setPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled(boolean prestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled)
{
this.prestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled = prestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public PlanOptimizers(
CostCalculator costCalculator,
@EstimatedExchanges CostCalculator estimatedExchangesCostCalculator,
CostComparator costComparator,
TaskCountEstimator taskCountEstimator)
TaskCountEstimator taskCountEstimator,
PartitioningProviderManager partitioningProviderManager)
{
this(metadata,
sqlParser,
Expand All @@ -178,7 +179,8 @@ public PlanOptimizers(
costCalculator,
estimatedExchangesCostCalculator,
costComparator,
taskCountEstimator);
taskCountEstimator,
partitioningProviderManager);
}

@PostConstruct
Expand Down Expand Up @@ -207,7 +209,8 @@ public PlanOptimizers(
CostCalculator costCalculator,
CostCalculator estimatedExchangesCostCalculator,
CostComparator costComparator,
TaskCountEstimator taskCountEstimator)
TaskCountEstimator taskCountEstimator,
PartitioningProviderManager partitioningProviderManager)
{
this.exporter = exporter;
ImmutableList.Builder<PlanOptimizer> builder = ImmutableList.builder();
Expand Down Expand Up @@ -448,7 +451,7 @@ public PlanOptimizers(
ImmutableSet.<Rule<?>>builder()
.addAll(new PushDownDereferences(metadata).rules())
.build()),
new PruneUnreferencedOutputs());
new PruneUnreferencedOutputs());

builder.add(new IterativeOptimizer(
ruleStats,
Expand Down Expand Up @@ -540,7 +543,7 @@ public PlanOptimizers(
statsCalculator,
estimatedExchangesCostCalculator,
ImmutableSet.of(new PushTableWriteThroughUnion()))); // Must run before AddExchanges
builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchanges(metadata, sqlParser)));
builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchanges(metadata, sqlParser, partitioningProviderManager)));
}

//noinspection UnusedAssignment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import com.facebook.presto.connector.system.GlobalSystemConnector;
import com.facebook.presto.execution.QueryManagerConfig.ExchangeMaterializationStrategy;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.GroupingProperty;
import com.facebook.presto.spi.LocalProperty;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SortingProperty;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.Assignments;
import com.facebook.presto.spi.plan.DistinctLimitNode;
Expand All @@ -43,6 +45,7 @@
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.Partitioning;
import com.facebook.presto.sql.planner.PartitioningHandle;
import com.facebook.presto.sql.planner.PartitioningProviderManager;
import com.facebook.presto.sql.planner.PartitioningScheme;
import com.facebook.presto.sql.planner.PlanVariableAllocator;
import com.facebook.presto.sql.planner.TypeProvider;
Expand Down Expand Up @@ -98,12 +101,14 @@
import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount;
import static com.facebook.presto.SystemSessionProperties.getPartialMergePushdownStrategy;
import static com.facebook.presto.SystemSessionProperties.getPartitioningProviderCatalog;
import static com.facebook.presto.SystemSessionProperties.getTaskPartitionedWriterCount;
import static com.facebook.presto.SystemSessionProperties.isColocatedJoinEnabled;
import static com.facebook.presto.SystemSessionProperties.isDistributedIndexJoinEnabled;
import static com.facebook.presto.SystemSessionProperties.isDistributedSortEnabled;
import static com.facebook.presto.SystemSessionProperties.isExactPartitioningPreferred;
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isPreferDistributedUnion;
import static com.facebook.presto.SystemSessionProperties.isPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled;
import static com.facebook.presto.SystemSessionProperties.isRedistributeWrites;
import static com.facebook.presto.SystemSessionProperties.isScaleWriters;
import static com.facebook.presto.SystemSessionProperties.isUseStreamingExchangeForMarkDistinctEnabled;
Expand Down Expand Up @@ -140,24 +145,27 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

public class AddExchanges
implements PlanOptimizer
{
private final SqlParser parser;
private final Metadata metadata;
private final PartitioningProviderManager partitioningProviderManager;

public AddExchanges(Metadata metadata, SqlParser parser)
public AddExchanges(Metadata metadata, SqlParser parser, PartitioningProviderManager partitioningProviderManager)
{
this.metadata = metadata;
this.parser = parser;
this.metadata = requireNonNull(metadata, "metadata is null");
this.parser = requireNonNull(parser, "parser is null");
this.partitioningProviderManager = requireNonNull(partitioningProviderManager, "partitioningProviderManager is null");
}

@Override
public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, PlanVariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
{
PlanWithProperties result = plan.accept(new Rewriter(idAllocator, variableAllocator, session), PreferredProperties.any());
PlanWithProperties result = plan.accept(new Rewriter(idAllocator, variableAllocator, session, partitioningProviderManager), PreferredProperties.any());
return result.getNode();
}

Expand All @@ -177,8 +185,13 @@ private class Rewriter
private final String partitioningProviderCatalog;
private final int hashPartitionCount;
private final ExchangeMaterializationStrategy exchangeMaterializationStrategy;
private final PartitioningProviderManager partitioningProviderManager;

public Rewriter(PlanNodeIdAllocator idAllocator, PlanVariableAllocator variableAllocator, Session session)
public Rewriter(
PlanNodeIdAllocator idAllocator,
PlanVariableAllocator variableAllocator,
Session session,
PartitioningProviderManager partitioningProviderManager)
{
this.idAllocator = idAllocator;
this.variableAllocator = variableAllocator;
Expand All @@ -193,6 +206,7 @@ public Rewriter(PlanNodeIdAllocator idAllocator, PlanVariableAllocator variableA
this.partitioningProviderCatalog = getPartitioningProviderCatalog(session);
this.hashPartitionCount = getHashPartitionCount(session);
this.exchangeMaterializationStrategy = getExchangeMaterializationStrategy(session);
this.partitioningProviderManager = requireNonNull(partitioningProviderManager, "partitioningProviderManager is null");
}

@Override
Expand Down Expand Up @@ -598,17 +612,44 @@ else if (redistributeWrites) {
!source.getProperties().isCompatibleTablePartitioningWith(shufflePartitioningScheme.get().getPartitioning(), false, metadata, session) &&
!(source.getProperties().isRefinedPartitioningOver(shufflePartitioningScheme.get().getPartitioning(), false, metadata, session) &&
canPushdownPartialMerge(source.getNode(), partialMergePushdownStrategy))) {
PartitioningScheme exchangePartitioningScheme = shufflePartitioningScheme.get();
if (node.getTablePartitioningScheme().isPresent() && isPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled(session)) {
int writerThreadsPerNode = getTaskPartitionedWriterCount(session);
int bucketCount = getBucketCount(node.getTablePartitioningScheme().get().getPartitioning().getHandle());
int[] bucketToPartition = new int[bucketCount];
for (int i = 0; i < bucketCount; i++) {
bucketToPartition[i] = i / writerThreadsPerNode;
}
exchangePartitioningScheme = exchangePartitioningScheme.withBucketToPartition(Optional.of(bucketToPartition));
}

source = withDerivedProperties(
partitionedExchange(
idAllocator.getNextId(),
REMOTE_STREAMING,
source.getNode(),
shufflePartitioningScheme.get()),
exchangePartitioningScheme),
source.getProperties());
}
return rebaseAndDeriveProperties(node, source);
}

private int getBucketCount(PartitioningHandle partitioning)
{
ConnectorNodePartitioningProvider partitioningProvider = getPartitioningProvider(partitioning);
return partitioningProvider.getBucketCount(
partitioning.getTransactionHandle().orElse(null),
session.toConnectorSession(),
partitioning.getConnectorHandle());
}

private ConnectorNodePartitioningProvider getPartitioningProvider(PartitioningHandle partitioning)
{
ConnectorId connectorId = partitioning.getConnectorId()
.orElseThrow(() -> new IllegalArgumentException("Unexpected partitioning: " + partitioning));
return partitioningProviderManager.getPartitioningProvider(connectorId);
}

private PlanWithProperties planTableScan(TableScanNode node, RowExpression predicate)
{
PlanNode plan = pushPredicateIntoTableScan(node, predicate, true, session, idAllocator, metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,8 @@ public List<PlanOptimizer> getPlanOptimizers(boolean forceSingleNode)
costCalculator,
estimatedExchangesCostCalculator,
new CostComparator(featuresConfig),
taskCountEstimator).getPlanningTimeOptimizers();
taskCountEstimator,
partitioningProviderManager).getPlanningTimeOptimizers();
}

public Plan createPlan(Session session, @Language("SQL") String sql, List<PlanOptimizer> optimizers, WarningCollector warningCollector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ public void testDefaults()
.setEmptyJoinOptimization(false)
.setSpoolingOutputBufferEnabled(false)
.setSpoolingOutputBufferThreshold(new DataSize(8, MEGABYTE))
.setSpoolingOutputBufferTempStorage("local"));
.setSpoolingOutputBufferTempStorage("local")
.setPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled(false));
}

@Test
Expand Down Expand Up @@ -272,6 +273,7 @@ public void testExplicitPropertyMappings()
.put("spooling-output-buffer-enabled", "true")
.put("spooling-output-buffer-threshold", "16MB")
.put("spooling-output-buffer-temp-storage", "tempfs")
.put("spark.assign-bucket-to-partition-for-partitioned-table-write-enabled", "true")
.build();

FeaturesConfig expected = new FeaturesConfig()
Expand Down Expand Up @@ -381,7 +383,8 @@ public void testExplicitPropertyMappings()
.setEmptyJoinOptimization(true)
.setSpoolingOutputBufferEnabled(true)
.setSpoolingOutputBufferThreshold(new DataSize(16, MEGABYTE))
.setSpoolingOutputBufferTempStorage("tempfs");
.setSpoolingOutputBufferTempStorage("tempfs")
.setPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled(true);
assertFullMapping(properties, expected);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.common.block.SortOrder;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.PartitioningProviderManager;
import com.facebook.presto.sql.planner.RuleStatsRecorder;
import com.facebook.presto.sql.planner.assertions.BasePlanTest;
import com.facebook.presto.sql.planner.assertions.ExpectedValueProvider;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void assertUnitPlan(@Language("SQL") String sql, PlanMatchPattern pattern
getQueryRunner().getStatsCalculator(),
getQueryRunner().getCostCalculator(),
new TranslateExpressions(getMetadata(), new SqlParser()).rules()),
new AddExchanges(getQueryRunner().getMetadata(), new SqlParser()),
new AddExchanges(getQueryRunner().getMetadata(), new SqlParser(), new PartitioningProviderManager()),
new UnaliasSymbolReferences(getMetadata().getFunctionAndTypeManager()),
new PruneUnreferencedOutputs(),
new IterativeOptimizer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public static void setDefaults(FeaturesConfig config)
config.setForceSingleNodeOutput(false);
config.setInlineSqlFunctions(true);
config.setEnforceFixedDistributionForOutputOperator(true);
config.setPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled(true);
}

public static void setDefaults(QueryManagerConfig config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ public PrestoSparkQueryRunner(String defaultCatalog, Map<String, String> additio
ImmutableMap.Builder<String, String> configProperties = ImmutableMap.builder();
configProperties.put("presto.version", "testversion");
configProperties.put("query.hash-partition-count", Integer.toString(NODE_COUNT * 2));
configProperties.put("task.writer-count", Integer.toString(2));
configProperties.put("task.partitioned-writer-count", Integer.toString(4));
configProperties.putAll(additionalConfigProperties);

PrestoSparkInjectorFactory injectorFactory = new PrestoSparkInjectorFactory(
Expand Down
Loading

0 comments on commit 33cdd9b

Please sign in to comment.