Skip to content

Commit

Permalink
Remove support for multiple table layouts
Browse files Browse the repository at this point in the history
  • Loading branch information
martint committed Mar 7, 2019
1 parent c68109c commit 801423b
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 987 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ protected final OperatorFactory createTableScanOperator(int operatorId, PlanNode
List<ColumnHandle> columnHandles = columnHandlesBuilder.build();

// get the split for this table
List<TableLayoutResult> layouts = metadata.getLayouts(session, tableHandle, Constraint.alwaysTrue(), Optional.empty());
Split split = getLocalQuerySplit(session, layouts.get(0).getLayout().getHandle());
Optional<TableLayoutResult> layout = metadata.getLayout(session, tableHandle, Constraint.alwaysTrue(), Optional.empty());
Split split = getLocalQuerySplit(session, layout.get().getLayout().getHandle());

return new OperatorFactory()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.prestosql.metadata.QualifiedObjectName;
import io.prestosql.metadata.TableHandle;
import io.prestosql.metadata.TableLayout;
import io.prestosql.metadata.TableLayoutResult;
import io.prestosql.metadata.TableMetadata;
import io.prestosql.plugin.hive.HiveSessionProperties.InsertExistingPartitionsBehavior;
import io.prestosql.spi.connector.CatalogSchemaTableName;
Expand Down Expand Up @@ -1673,8 +1672,9 @@ private Object getHiveTableProperty(String tableName, Function<HiveTableLayoutHa
Optional<TableHandle> tableHandle = metadata.getTableHandle(transactionSession, new QualifiedObjectName(catalog, TPCH_SCHEMA, tableName));
assertTrue(tableHandle.isPresent());

List<TableLayoutResult> layouts = metadata.getLayouts(transactionSession, tableHandle.get(), Constraint.alwaysTrue(), Optional.empty());
TableLayout layout = getOnlyElement(layouts).getLayout();
TableLayout layout = metadata.getLayout(transactionSession, tableHandle.get(), Constraint.alwaysTrue(), Optional.empty())
.get()
.getLayout();
return propertyGetter.apply((HiveTableLayoutHandle) layout.getHandle().getConnectorHandle());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public interface Metadata

Optional<TableHandle> getTableHandleForStatisticsCollection(Session session, QualifiedObjectName tableName, Map<String, Object> analyzeProperties);

List<TableLayoutResult> getLayouts(Session session, TableHandle tableHandle, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns);
Optional<TableLayoutResult> getLayout(Session session, TableHandle tableHandle, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns);

TableLayout getLayout(Session session, TableLayoutHandle handle);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import java.util.concurrent.ConcurrentMap;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.prestosql.metadata.QualifiedObjectName.convertFromSchemaTableName;
import static io.prestosql.metadata.TableLayout.fromConnectorLayout;
Expand Down Expand Up @@ -373,10 +372,10 @@ public Optional<SystemTable> getSystemTable(Session session, QualifiedObjectName
}

@Override
public List<TableLayoutResult> getLayouts(Session session, TableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns)
public Optional<TableLayoutResult> getLayout(Session session, TableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns)
{
if (constraint.getSummary().isNone()) {
return ImmutableList.of();
return Optional.empty();
}

ConnectorId connectorId = table.getConnectorId();
Expand All @@ -387,10 +386,15 @@ public List<TableLayoutResult> getLayouts(Session session, TableHandle table, Co
ConnectorTransactionHandle transaction = catalogMetadata.getTransactionHandleFor(connectorId);
ConnectorSession connectorSession = session.toConnectorSession(connectorId);
List<ConnectorTableLayoutResult> layouts = metadata.getTableLayouts(connectorSession, connectorTable, constraint, desiredColumns);
if (layouts.isEmpty()) {
return Optional.empty();
}

if (layouts.size() > 1) {
throw new PrestoException(NOT_SUPPORTED, format("Connector returned multiple layouts for table %s", table));
}

return layouts.stream()
.map(layout -> new TableLayoutResult(fromConnectorLayout(connectorId, transaction, layout.getTableLayout()), layout.getUnenforcedConstraint()))
.collect(toImmutableList());
return Optional.of(new TableLayoutResult(fromConnectorLayout(connectorId, transaction, layouts.get(0).getTableLayout()), layouts.get(0).getUnenforcedConstraint()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,13 @@
package io.prestosql.metadata;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.sql.planner.plan.TableScanNode;

import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public class TableLayoutResult
Expand All @@ -49,19 +44,6 @@ public TupleDomain<ColumnHandle> getUnenforcedConstraint()
return unenforcedConstraint;
}

public boolean hasAllOutputs(TableScanNode node)
{
if (!layout.getColumns().isPresent()) {
return true;
}
Set<ColumnHandle> columns = ImmutableSet.copyOf(layout.getColumns().get());
List<ColumnHandle> nodeColumnHandles = node.getOutputSymbols().stream()
.map(node.getAssignments()::get)
.collect(toImmutableList());

return columns.containsAll(nodeColumnHandles);
}

public static TupleDomain<ColumnHandle> computeEnforced(TupleDomain<ColumnHandle> predicate, TupleDomain<ColumnHandle> unenforced)
{
if (predicate.isNone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,11 +469,11 @@ private static KdbTree loadKdbTree(String tableName, Session session, Metadata m

ColumnHandle kdbTreeColumn = Iterables.getOnlyElement(visibleColumnHandles);

List<TableLayoutResult> layouts = metadata.getLayouts(session, tableHandle, Constraint.alwaysTrue(), Optional.of(ImmutableSet.of(kdbTreeColumn)));
checkSpatialPartitioningTable(!layouts.isEmpty(), "Table is empty: %s", name);
Optional<TableLayoutResult> layout = metadata.getLayout(session, tableHandle, Constraint.alwaysTrue(), Optional.of(ImmutableSet.of(kdbTreeColumn)));
checkSpatialPartitioningTable(layout.isPresent(), "Table is empty: %s", name);

Optional<KdbTree> kdbTree = Optional.empty();
try (SplitSource splitSource = splitManager.getSplits(session, layouts.get(0).getLayout().getHandle(), UNGROUPED_SCHEDULING)) {
try (SplitSource splitSource = splitManager.getSplits(session, layout.get().getLayout().getHandle(), UNGROUPED_SCHEDULING)) {
while (!Thread.currentThread().isInterrupted()) {
SplitBatch splitBatch = getFutureValue(splitSource.getNextBatch(NOT_PARTITIONED, Lifespan.taskWide(), 1000));
List<Split> splits = splitBatch.getSplits();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,11 @@
import io.prestosql.sql.tree.NodeRef;
import io.prestosql.sql.tree.NullLiteral;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Sets.intersection;
import static io.prestosql.SystemSessionProperties.isNewOptimizerEnabled;
Expand All @@ -71,7 +68,6 @@
import static io.prestosql.sql.tree.BooleanLiteral.TRUE_LITERAL;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

/**
* These rules should not be run after AddExchanges so as not to overwrite the TableLayout
Expand Down Expand Up @@ -237,11 +233,10 @@ private static PlanNode planTableScan(
idAllocator,
metadata,
parser,
domainTranslator)
.get(0);
domainTranslator);
}

public static List<PlanNode> pushFilterIntoTableScan(
public static PlanNode pushFilterIntoTableScan(
TableScanNode node,
Expression predicate,
boolean pruneWithPredicateExpression,
Expand Down Expand Up @@ -288,60 +283,45 @@ public static List<PlanNode> pushFilterIntoTableScan(
constraint = new Constraint<>(newDomain);
}

// Layouts will be returned in order of the connector's preference
List<TableLayoutResult> layouts = metadata.getLayouts(
Optional<TableLayoutResult> layout = metadata.getLayout(
session,
node.getTable(),
constraint,
Optional.of(node.getOutputSymbols().stream()
.map(node.getAssignments()::get)
.collect(toImmutableSet())));

if (layouts.isEmpty()) {
return ImmutableList.of(new ValuesNode(idAllocator.getNextId(), node.getOutputSymbols(), ImmutableList.of()));
if (!layout.isPresent() || layout.get().getLayout().getPredicate().isNone()) {
return new ValuesNode(idAllocator.getNextId(), node.getOutputSymbols(), ImmutableList.of());
}

// Filter out layouts that cannot supply all the required columns
layouts = layouts.stream()
.filter(layout -> layout.hasAllOutputs(node))
.collect(toList());
checkState(!layouts.isEmpty(), "No usable layouts for %s", node);

if (layouts.stream().anyMatch(layout -> layout.getLayout().getPredicate().isNone())) {
return ImmutableList.of(new ValuesNode(idAllocator.getNextId(), node.getOutputSymbols(), ImmutableList.of()));
TableScanNode tableScan = new TableScanNode(
node.getId(),
node.getTable(),
node.getOutputSymbols(),
node.getAssignments(),
Optional.of(layout.get().getLayout().getHandle()),
layout.get().getLayout().getPredicate(),
computeEnforced(newDomain, layout.get().getUnenforcedConstraint()));

// The order of the arguments to combineConjuncts matters:
// * Unenforced constraints go first because they can only be simple column references,
// which are not prone to logic errors such as out-of-bound access, div-by-zero, etc.
// * Conjuncts in non-deterministic expressions and non-TupleDomain-expressible expressions should
// retain their original (maybe intermixed) order from the input predicate. However, this is not implemented yet.
// * Short of implementing the previous bullet point, the current order of non-deterministic expressions
// and non-TupleDomain-expressible expressions should be retained. Changing the order can lead
// to failures of previously successful queries.
Expression resultingPredicate = combineConjuncts(
domainTranslator.toPredicate(layout.get().getUnenforcedConstraint().transform(assignments::get)),
filterNonDeterministicConjuncts(predicate),
decomposedPredicate.getRemainingExpression());

if (!TRUE_LITERAL.equals(resultingPredicate)) {
return new FilterNode(idAllocator.getNextId(), tableScan, resultingPredicate);
}

return layouts.stream()
.map(layout -> {
TableScanNode tableScan = new TableScanNode(
node.getId(),
node.getTable(),
node.getOutputSymbols(),
node.getAssignments(),
Optional.of(layout.getLayout().getHandle()),
layout.getLayout().getPredicate(),
computeEnforced(newDomain, layout.getUnenforcedConstraint()));

// The order of the arguments to combineConjuncts matters:
// * Unenforced constraints go first because they can only be simple column references,
// which are not prone to logic errors such as out-of-bound access, div-by-zero, etc.
// * Conjuncts in non-deterministic expressions and non-TupleDomain-expressible expressions should
// retain their original (maybe intermixed) order from the input predicate. However, this is not implemented yet.
// * Short of implementing the previous bullet point, the current order of non-deterministic expressions
// and non-TupleDomain-expressible expressions should be retained. Changing the order can lead
// to failures of previously successful queries.
Expression resultingPredicate = combineConjuncts(
domainTranslator.toPredicate(layout.getUnenforcedConstraint().transform(assignments::get)),
filterNonDeterministicConjuncts(predicate),
decomposedPredicate.getRemainingExpression());

if (!TRUE_LITERAL.equals(resultingPredicate)) {
return new FilterNode(idAllocator.getNextId(), tableScan, resultingPredicate);
}

return tableScan;
})
.collect(toImmutableList());
return tableScan;
}

private static class LayoutConstraintEvaluator
Expand Down
Loading

0 comments on commit 801423b

Please sign in to comment.