diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java index 0376103ae319..4b10691efd66 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java @@ -15,22 +15,37 @@ import com.facebook.presto.Session; import com.facebook.presto.SystemSessionProperties; +import com.facebook.presto.connector.ConnectorId; import com.facebook.presto.cost.StatsAndCosts; import com.facebook.presto.execution.QueryManagerConfig; import com.facebook.presto.execution.scheduler.BucketNodeMap; import com.facebook.presto.execution.warnings.WarningCollector; +import com.facebook.presto.metadata.InsertTableHandle; import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.metadata.NewTableLayout; +import com.facebook.presto.metadata.PartitioningMetadata; +import com.facebook.presto.metadata.TableHandle; import com.facebook.presto.metadata.TableLayout; import com.facebook.presto.metadata.TableLayout.TablePartitioning; import com.facebook.presto.metadata.TableLayoutHandle; +import com.facebook.presto.metadata.TableLayoutResult; import com.facebook.presto.operator.StageExecutionDescriptor; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorNewTableLayout; +import com.facebook.presto.spi.Constraint; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.PrestoWarning; +import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.connector.ConnectorPartitionHandle; import com.facebook.presto.spi.connector.ConnectorPartitioningHandle; +import com.facebook.presto.spi.predicate.NullableValue; +import com.facebook.presto.spi.predicate.TupleDomain; import com.facebook.presto.spi.type.Type; import com.facebook.presto.sql.parser.SqlParser; +import com.facebook.presto.sql.planner.Partitioning.ArgumentBinding; import com.facebook.presto.sql.planner.plan.AggregationNode; +import com.facebook.presto.sql.planner.plan.Assignments; import com.facebook.presto.sql.planner.plan.ExchangeNode; import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode; import com.facebook.presto.sql.planner.plan.JoinNode; @@ -40,6 +55,7 @@ import com.facebook.presto.sql.planner.plan.PlanNode; import com.facebook.presto.sql.planner.plan.PlanNodeId; import com.facebook.presto.sql.planner.plan.PlanVisitor; +import com.facebook.presto.sql.planner.plan.ProjectNode; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.facebook.presto.sql.planner.plan.RowNumberNode; import com.facebook.presto.sql.planner.plan.SimplePlanRewriter; @@ -47,16 +63,20 @@ import com.facebook.presto.sql.planner.plan.TableFinishNode; import com.facebook.presto.sql.planner.plan.TableScanNode; import com.facebook.presto.sql.planner.plan.TableWriterNode; +import com.facebook.presto.sql.planner.plan.TableWriterNode.InsertHandle; import com.facebook.presto.sql.planner.plan.TopNRowNumberNode; import com.facebook.presto.sql.planner.plan.ValuesNode; import com.facebook.presto.sql.planner.plan.WindowNode; import com.facebook.presto.sql.planner.sanity.PlanSanityChecker; +import com.facebook.presto.sql.tree.Expression; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import javax.inject.Inject; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -69,21 +89,33 @@ import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES; import static com.facebook.presto.spi.StandardWarningCode.TOO_MANY_STAGES; import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED; +import static com.facebook.presto.spi.type.BigintType.BIGINT; +import static com.facebook.presto.spi.type.VarbinaryType.VARBINARY; import static com.facebook.presto.sql.planner.SchedulingOrderVisitor.scheduleOrder; import static com.facebook.presto.sql.planner.SymbolsExtractor.extractOutputSymbols; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.COORDINATOR_DISTRIBUTION; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_MATERIALIZED; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPLICATE; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.gatheringExchange; import static com.facebook.presto.sql.planner.planPrinter.PlanPrinter.jsonFragmentPlan; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Predicates.in; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.Maps.filterKeys; import static java.lang.String.format; import static java.util.Objects.requireNonNull; +import static java.util.function.Function.identity; /** * Splits a logical plan into fragments that can be shipped and executed on distributed nodes @@ -232,6 +264,7 @@ private static class Fragmenter private final PlanSanityChecker planSanityChecker; private final WarningCollector warningCollector; private final SqlParser sqlParser; + private final LiteralEncoder literalEncoder; private int nextFragmentId = ROOT_FRAGMENT_ID + 1; public Fragmenter( @@ -252,6 +285,7 @@ public Fragmenter( this.sqlParser = requireNonNull(sqlParser, "sqlParser is null"); this.idAllocator = requireNonNull(idAllocator, "idAllocator is null"); this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is null"); + this.literalEncoder = new LiteralEncoder(metadata.getBlockEncodingSerde()); } public SubPlan buildRootFragment(PlanNode root, FragmentProperties properties) @@ -363,15 +397,18 @@ public PlanNode visitExchange(ExchangeNode exchange, RewriteContext context) + private PlanNode createRemoteStreamingExchange(ExchangeNode exchange, RewriteContext context) { + checkArgument(exchange.getScope() == REMOTE_STREAMING, "Unexpected exchange scope: %s", exchange.getScope()); + PartitioningScheme partitioningScheme = exchange.getPartitioningScheme(); if (exchange.getType() == ExchangeNode.Type.GATHER) { @@ -398,6 +435,240 @@ else if (exchange.getType() == ExchangeNode.Type.REPARTITION) { return new RemoteSourceNode(exchange.getId(), childrenIds, exchange.getOutputSymbols(), exchange.getOrderingScheme(), exchange.getType()); } + private PlanNode createRemoteMaterializedExchange(ExchangeNode exchange, RewriteContext context) + { + checkArgument(exchange.getType() == REPARTITION, "Unexpected exchange type: %s", exchange.getType()); + checkArgument(exchange.getScope() == REMOTE_MATERIALIZED, "Unexpected exchange scope: %s", exchange.getScope()); + + PartitioningScheme partitioningScheme = exchange.getPartitioningScheme(); + checkArgument(!partitioningScheme.getHashColumn().isPresent(), "precomputed hashes are not supported in materializing exchanges"); + + PartitioningHandle partitioningHandle = partitioningScheme.getPartitioning().getHandle(); + ConnectorId connectorId = partitioningHandle.getConnectorId() + .orElseThrow(() -> new IllegalArgumentException("Unsupported partitioning handle: " + partitioningHandle)); + + Partitioning partitioning = partitioningScheme.getPartitioning(); + PartitioningSymbolAssignments partitioningSymbolAssignments = assignPartitioningSymbols(partitioning); + Map symbolToColumnMap = assignTemporaryTableColumnNames(exchange.getOutputSymbols(), partitioningSymbolAssignments.getConstants().keySet()); + List partitioningSymbols = partitioningSymbolAssignments.getSymbols(); + List partitionColumns = partitioningSymbols.stream() + .map(symbol -> symbolToColumnMap.get(symbol).getName()) + .collect(toImmutableList()); + PartitioningMetadata partitioningMetadata = new PartitioningMetadata(partitioningHandle, partitionColumns); + + TableHandle temporaryTableHandle = metadata.createTemporaryTable( + session, + connectorId.getCatalogName(), + ImmutableList.copyOf(symbolToColumnMap.values()), + Optional.of(partitioningMetadata)); + + TableScanNode scan = createTemporaryTableScan( + temporaryTableHandle, + exchange.getOutputSymbols(), + symbolToColumnMap, + partitioningMetadata); + + checkArgument( + !exchange.getPartitioningScheme().isReplicateNullsAndAny(), + "materialized remote exchange is not supported when replicateNullsAndAny is needed"); + TableFinishNode write = createTemporaryTableWrite( + temporaryTableHandle, + symbolToColumnMap, + exchange.getOutputSymbols(), + exchange.getInputs(), + exchange.getSources(), + partitioningSymbolAssignments.getConstants(), + partitioningMetadata); + + FragmentProperties writeProperties = new FragmentProperties(new PartitioningScheme( + Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), + write.getOutputSymbols())); + writeProperties.setCoordinatorOnlyDistribution(); + + List children = ImmutableList.of(buildSubPlan(write, writeProperties, context)); + context.get().addChildren(children); + + return visitTableScan(scan, context); + } + + private PartitioningSymbolAssignments assignPartitioningSymbols(Partitioning partitioning) + { + ImmutableList.Builder symbols = ImmutableList.builder(); + ImmutableMap.Builder constants = ImmutableMap.builder(); + for (ArgumentBinding argumentBinding : partitioning.getArguments()) { + Symbol symbol; + if (argumentBinding.isConstant()) { + NullableValue constant = argumentBinding.getConstant(); + Expression expression = literalEncoder.toExpression(constant.getValue(), constant.getType()); + symbol = symbolAllocator.newSymbol(expression, constant.getType()); + constants.put(symbol, expression); + } + else { + symbol = argumentBinding.getColumn(); + } + symbols.add(symbol); + } + return new PartitioningSymbolAssignments(symbols.build(), constants.build()); + } + + private Map assignTemporaryTableColumnNames(Collection outputSymbols, Collection constantPartitioningSymbols) + { + ImmutableMap.Builder result = ImmutableMap.builder(); + int column = 0; + for (Symbol outputSymbol : concat(outputSymbols, constantPartitioningSymbols)) { + String columnName = format("_c%d_%s", column, outputSymbol.getName()); + result.put(outputSymbol, new ColumnMetadata(columnName, symbolAllocator.getTypes().get(outputSymbol))); + column++; + } + return result.build(); + } + + private TableScanNode createTemporaryTableScan( + TableHandle tableHandle, + List outputSymbols, + Map symbolToColumnMap, + PartitioningMetadata expectedPartitioningMetadata) + { + Map columnHandles = metadata.getColumnHandles(session, tableHandle); + Map outputColumns = outputSymbols.stream() + .collect(toImmutableMap(identity(), symbolToColumnMap::get)); + Set outputColumnHandles = outputColumns.values().stream() + .map(ColumnMetadata::getName) + .map(columnHandles::get) + .collect(toImmutableSet()); + + List layouts = metadata.getLayouts(session, tableHandle, Constraint.alwaysTrue(), Optional.of(outputColumnHandles)); + checkArgument(layouts.size() == 1, "temporary table is expected to have exactly one layout"); + TableLayoutResult selectedLayout = getOnlyElement(layouts); + verify(selectedLayout.getUnenforcedConstraint().equals(TupleDomain.all()), "temporary table layout shouldn't enforce any constraints"); + verify(!selectedLayout.getLayout().getColumns().isPresent(), "temporary table layout must provide all the columns"); + TablePartitioning expectedPartitioning = new TablePartitioning( + expectedPartitioningMetadata.getPartitioningHandle(), + expectedPartitioningMetadata.getPartitionColumns().stream() + .map(columnHandles::get) + .collect(toImmutableList())); + verify(selectedLayout.getLayout().getTablePartitioning().equals(Optional.of(expectedPartitioning)), "invalid temporary table partitioning"); + + TableLayoutHandle layoutHandle = selectedLayout.getLayout().getHandle(); + Map assignments = outputSymbols.stream() + .collect(toImmutableMap(identity(), symbol -> columnHandles.get(outputColumns.get(symbol).getName()))); + + return new TableScanNode( + idAllocator.getNextId(), + tableHandle, + outputSymbols, + assignments, + Optional.of(layoutHandle), + TupleDomain.all(), + TupleDomain.all()); + } + + private TableFinishNode createTemporaryTableWrite( + TableHandle tableHandle, + Map symbolToColumnMap, + List outputs, + List> inputs, + List sources, + Map constantExpressions, + PartitioningMetadata partitioningMetadata) + { + if (!constantExpressions.isEmpty()) { + List constantSymbols = ImmutableList.copyOf(constantExpressions.keySet()); + + // update outputs + outputs = ImmutableList.builder() + .addAll(outputs) + .addAll(constantSymbols) + .build(); + + // update inputs + inputs = inputs.stream() + .map(input -> ImmutableList.builder() + .addAll(input) + .addAll(constantSymbols) + .build()) + .collect(toImmutableList()); + + // update sources + sources = sources.stream() + .map(source -> { + Assignments.Builder assignments = Assignments.builder(); + assignments.putIdentities(source.getOutputSymbols()); + constantSymbols.forEach(symbol -> assignments.put(symbol, constantExpressions.get(symbol))); + return new ProjectNode(idAllocator.getNextId(), source, assignments.build()); + }) + .collect(toImmutableList()); + } + + NewTableLayout insertLayout = metadata.getInsertLayout(session, tableHandle) + // TODO: support insert into non partitioned table + .orElseThrow(() -> new IllegalArgumentException("insertLayout for the temporary table must be present")); + + PartitioningHandle partitioningHandle = partitioningMetadata.getPartitioningHandle(); + List partitionColumns = partitioningMetadata.getPartitionColumns(); + ConnectorNewTableLayout expectedNewTableLayout = new ConnectorNewTableLayout(partitioningHandle.getConnectorHandle(), partitionColumns); + verify(insertLayout.getLayout().equals(expectedNewTableLayout), "unexpected new table layout"); + + Map columnNameToSymbol = symbolToColumnMap.entrySet().stream() + .collect(toImmutableMap(entry -> entry.getValue().getName(), Map.Entry::getKey)); + List partitioningSymbols = partitionColumns.stream() + .map(columnNameToSymbol::get) + .collect(toImmutableList()); + + InsertTableHandle insertTableHandle = metadata.beginInsert(session, tableHandle); + List outputColumnNames = outputs.stream() + .map(symbolToColumnMap::get) + .map(ColumnMetadata::getName) + .collect(toImmutableList()); + + SchemaTableName temporaryTableName = metadata.getTableMetadata(session, tableHandle).getTable(); + InsertHandle insertHandle = new InsertHandle(insertTableHandle, new SchemaTableName(temporaryTableName.getSchemaName(), temporaryTableName.getTableName())); + + return new TableFinishNode( + idAllocator.getNextId(), + gatheringExchange( + idAllocator.getNextId(), + LOCAL, + gatheringExchange( + idAllocator.getNextId(), + REMOTE_STREAMING, + new TableWriterNode( + idAllocator.getNextId(), + gatheringExchange( + idAllocator.getNextId(), + LOCAL, + new ExchangeNode( + idAllocator.getNextId(), + REPARTITION, + REMOTE_STREAMING, + new PartitioningScheme( + Partitioning.create(partitioningHandle, partitioningSymbols), + outputs, + Optional.empty(), + false, + Optional.empty()), + sources, + inputs, + Optional.empty())), + insertHandle, + symbolAllocator.newSymbol("partialrows", BIGINT), + symbolAllocator.newSymbol("fragment", VARBINARY), + outputs, + outputColumnNames, + Optional.of(new PartitioningScheme( + Partitioning.create(partitioningHandle, partitioningSymbols), + outputs, + Optional.empty(), + false, + Optional.empty())), + Optional.empty(), + Optional.empty()))), + insertHandle, + symbolAllocator.newSymbol("rows", BIGINT), + Optional.empty(), + Optional.empty()); + } + private SubPlan buildSubPlan(PlanNode node, FragmentProperties properties, RewriteContext context) { PlanFragmentId planFragmentId = nextFragmentId(); @@ -806,4 +1077,29 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext context) node.getEnforcedConstraint()); } } + + private static class PartitioningSymbolAssignments + { + private final List symbols; + private final Map constants; + + private PartitioningSymbolAssignments(List symbols, Map constants) + { + this.symbols = ImmutableList.copyOf(requireNonNull(symbols, "symbols is null")); + this.constants = ImmutableMap.copyOf(requireNonNull(constants, "constants is null")); + checkArgument( + ImmutableSet.copyOf(symbols).containsAll(constants.keySet()), + "partitioningSymbols list must contain all partitioning symbols including constants"); + } + + public List getSymbols() + { + return symbols; + } + + public Map getConstants() + { + return constants; + } + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/SubPlan.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/SubPlan.java index 9efdeccb1bd5..8929d0096e9e 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/SubPlan.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/SubPlan.java @@ -15,7 +15,6 @@ import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Multiset; @@ -23,6 +22,7 @@ import java.util.List; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableMultiset.toImmutableMultiset; import static java.util.Objects.requireNonNull; @@ -68,7 +68,7 @@ public List getAllFragments() public void sanityCheck() { - Multiset exchangeIds = fragment.getRemoteSourceNodes().stream() + Multiset remoteSourceIds = fragment.getRemoteSourceNodes().stream() .map(RemoteSourceNode::getSourceFragmentIds) .flatMap(List::stream) .collect(toImmutableMultiset()); @@ -78,7 +78,7 @@ public void sanityCheck() .map(PlanFragment::getId) .collect(toImmutableMultiset()); - Preconditions.checkState(exchangeIds.equals(childrenIds), "Subplan exchange ids don't match child fragment ids (%s vs %s)", exchangeIds, childrenIds); + checkState(childrenIds.containsAll(remoteSourceIds), "child fragments must include all remote source fragments (%s vs %s)", remoteSourceIds, childrenIds); for (SubPlan child : children) { child.sanityCheck();