diff --git a/presto-main/src/main/java/io/prestosql/connector/system/GlobalSystemConnector.java b/presto-main/src/main/java/io/prestosql/connector/system/GlobalSystemConnector.java index 7f69f01564375..9578d4544c4a5 100644 --- a/presto-main/src/main/java/io/prestosql/connector/system/GlobalSystemConnector.java +++ b/presto-main/src/main/java/io/prestosql/connector/system/GlobalSystemConnector.java @@ -22,6 +22,7 @@ import io.prestosql.spi.connector.ConnectorPageSourceProvider; import io.prestosql.spi.connector.ConnectorSession; import io.prestosql.spi.connector.ConnectorSplitManager; +import io.prestosql.spi.connector.ConnectorSplitSource; import io.prestosql.spi.connector.ConnectorTableHandle; import io.prestosql.spi.connector.ConnectorTableLayout; import io.prestosql.spi.connector.ConnectorTableLayoutHandle; @@ -130,8 +131,13 @@ public Map> listTableColumns(ConnectorSess @Override public ConnectorSplitManager getSplitManager() { - return (transactionHandle, session, layout, splitSchedulingStrategy) -> { - throw new UnsupportedOperationException(); + return new ConnectorSplitManager() + { + @Override + public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableHandle table, SplitSchedulingStrategy splitSchedulingStrategy) + { + throw new UnsupportedOperationException(); + } }; } diff --git a/presto-main/src/main/java/io/prestosql/metadata/Metadata.java b/presto-main/src/main/java/io/prestosql/metadata/Metadata.java index 7dfeb8706ad51..4083461483168 100644 --- a/presto-main/src/main/java/io/prestosql/metadata/Metadata.java +++ b/presto-main/src/main/java/io/prestosql/metadata/Metadata.java @@ -73,9 +73,10 @@ public interface Metadata Optional getTableHandleForStatisticsCollection(Session session, QualifiedObjectName tableName, Map analyzeProperties); + @Deprecated Optional getLayout(Session session, TableHandle tableHandle, Constraint constraint, Optional> desiredColumns); - TableLayout getLayout(Session session, TableHandle handle); + TableProperties getTableProperties(Session session, TableHandle handle); /** * Return a table handle whose partitioning is converted to the provided partitioning handle, @@ -380,4 +381,7 @@ public interface Metadata ColumnPropertyManager getColumnPropertyManager(); AnalyzePropertyManager getAnalyzePropertyManager(); + + @Deprecated + boolean usesLegacyTableLayouts(Session session, TableHandle table); } diff --git a/presto-main/src/main/java/io/prestosql/metadata/MetadataManager.java b/presto-main/src/main/java/io/prestosql/metadata/MetadataManager.java index 990a0922556e9..79662f0e0f7ce 100644 --- a/presto-main/src/main/java/io/prestosql/metadata/MetadataManager.java +++ b/presto-main/src/main/java/io/prestosql/metadata/MetadataManager.java @@ -45,6 +45,7 @@ import io.prestosql.spi.connector.ConnectorTableLayoutHandle; import io.prestosql.spi.connector.ConnectorTableLayoutResult; import io.prestosql.spi.connector.ConnectorTableMetadata; +import io.prestosql.spi.connector.ConnectorTableProperties; import io.prestosql.spi.connector.ConnectorTransactionHandle; import io.prestosql.spi.connector.ConnectorViewDefinition; import io.prestosql.spi.connector.Constraint; @@ -88,6 +89,7 @@ import java.util.concurrent.ConcurrentMap; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.prestosql.metadata.QualifiedObjectName.convertFromSchemaTableName; import static io.prestosql.metadata.ViewDefinition.ViewColumn; @@ -391,6 +393,9 @@ public Optional getLayout(Session session, TableHandle table, CatalogMetadata catalogMetadata = getCatalogMetadata(session, connectorId); ConnectorMetadata metadata = catalogMetadata.getMetadataFor(connectorId); + + checkState(metadata.usesLegacyTableLayouts(), "getLayout() was called even though connector doesn't support legacy Table Layout"); + ConnectorTransactionHandle transaction = catalogMetadata.getTransactionHandleFor(connectorId); ConnectorSession connectorSession = session.toConnectorSession(connectorId); List layouts = metadata.getTableLayouts(connectorSession, connectorTable, constraint, desiredColumns); @@ -405,23 +410,27 @@ public Optional getLayout(Session session, TableHandle table, ConnectorTableLayout tableLayout = layouts.get(0).getTableLayout(); return Optional.of(new TableLayoutResult( new TableHandle(connectorId, connectorTable, transaction, Optional.of(tableLayout.getHandle())), - new TableLayout(connectorId, transaction, tableLayout), + new TableProperties(connectorId, transaction, new ConnectorTableProperties(tableLayout)), layouts.get(0).getUnenforcedConstraint())); } @Override - public TableLayout getLayout(Session session, TableHandle handle) + public TableProperties getTableProperties(Session session, TableHandle handle) { ConnectorId connectorId = handle.getConnectorId(); CatalogMetadata catalogMetadata = getCatalogMetadata(session, connectorId); ConnectorMetadata metadata = catalogMetadata.getMetadataFor(connectorId); ConnectorSession connectorSession = session.toConnectorSession(connectorId); - return handle.getLayout() - .map(layout -> new TableLayout(connectorId, handle.getTransaction(), metadata.getTableLayout(connectorSession, layout))) - .orElseGet(() -> getLayout(session, handle, Constraint.alwaysTrue(), Optional.empty()) - .get() - .getLayout()); + if (metadata.usesLegacyTableLayouts()) { + return handle.getLayout() + .map(layout -> new TableProperties(connectorId, handle.getTransaction(), new ConnectorTableProperties(metadata.getTableLayout(connectorSession, layout)))) + .orElseGet(() -> getLayout(session, handle, Constraint.alwaysTrue(), Optional.empty()) + .get() + .getTableProperties()); + } + + return new TableProperties(connectorId, handle.getTransaction(), metadata.getTableProperties(connectorSession, handle.getConnectorHandle())); } @Override @@ -461,14 +470,18 @@ public Optional getInfo(Session session, TableHandle handle) ConnectorId connectorId = handle.getConnectorId(); ConnectorMetadata metadata = getMetadata(session, connectorId); - ConnectorTableLayoutHandle layoutHandle = handle.getLayout() - .orElseGet(() -> getLayout(session, handle, Constraint.alwaysTrue(), Optional.empty()) - .get() - .getNewTableHandle() - .getLayout() - .get()); + if (usesLegacyTableLayouts(session, handle)) { + ConnectorTableLayoutHandle layoutHandle = handle.getLayout() + .orElseGet(() -> getLayout(session, handle, Constraint.alwaysTrue(), Optional.empty()) + .get() + .getNewTableHandle() + .getLayout() + .get()); + + return metadata.getInfo(layoutHandle); + } - return metadata.getInfo(layoutHandle); + return metadata.getInfo(handle.getConnectorHandle()); } @Override @@ -1151,6 +1164,12 @@ public AnalyzePropertyManager getAnalyzePropertyManager() return analyzePropertyManager; } + @Override + public boolean usesLegacyTableLayouts(Session session, TableHandle table) + { + return getMetadata(session, table.getConnectorId()).usesLegacyTableLayouts(); + } + private ViewDefinition deserializeView(String data) { try { diff --git a/presto-main/src/main/java/io/prestosql/metadata/TableLayoutResult.java b/presto-main/src/main/java/io/prestosql/metadata/TableLayoutResult.java index dc02158054c82..3547498450aae 100644 --- a/presto-main/src/main/java/io/prestosql/metadata/TableLayoutResult.java +++ b/presto-main/src/main/java/io/prestosql/metadata/TableLayoutResult.java @@ -23,13 +23,14 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +@Deprecated public class TableLayoutResult { private final TableHandle newTableHandle; - private final TableLayout layout; + private final TableProperties layout; private final TupleDomain unenforcedConstraint; - public TableLayoutResult(TableHandle newTable, TableLayout layout, TupleDomain unenforcedConstraint) + public TableLayoutResult(TableHandle newTable, TableProperties layout, TupleDomain unenforcedConstraint) { this.newTableHandle = requireNonNull(newTable, "newTable is null"); this.layout = requireNonNull(layout, "layout is null"); @@ -41,7 +42,7 @@ public TableHandle getNewTableHandle() return newTableHandle; } - public TableLayout getLayout() + public TableProperties getTableProperties() { return layout; } diff --git a/presto-main/src/main/java/io/prestosql/metadata/TableLayout.java b/presto-main/src/main/java/io/prestosql/metadata/TableProperties.java similarity index 84% rename from presto-main/src/main/java/io/prestosql/metadata/TableLayout.java rename to presto-main/src/main/java/io/prestosql/metadata/TableProperties.java index 2d1c238cf0eb3..f7d0a74c959f4 100644 --- a/presto-main/src/main/java/io/prestosql/metadata/TableLayout.java +++ b/presto-main/src/main/java/io/prestosql/metadata/TableProperties.java @@ -16,7 +16,7 @@ import com.google.common.collect.ImmutableList; import io.prestosql.connector.ConnectorId; import io.prestosql.spi.connector.ColumnHandle; -import io.prestosql.spi.connector.ConnectorTableLayout; +import io.prestosql.spi.connector.ConnectorTableProperties; import io.prestosql.spi.connector.ConnectorTransactionHandle; import io.prestosql.spi.connector.DiscretePredicates; import io.prestosql.spi.connector.LocalProperty; @@ -30,41 +30,36 @@ import static java.util.Objects.requireNonNull; -public class TableLayout +public class TableProperties { - private final ConnectorTableLayout layout; + private final ConnectorTableProperties tableProperties; private final ConnectorId connectorId; private final ConnectorTransactionHandle transaction; - public TableLayout(ConnectorId connectorId, ConnectorTransactionHandle transaction, ConnectorTableLayout layout) + public TableProperties(ConnectorId connectorId, ConnectorTransactionHandle transaction, ConnectorTableProperties tableProperties) { requireNonNull(connectorId, "connectorId is null"); requireNonNull(transaction, "transaction is null"); - requireNonNull(layout, "layout is null"); + requireNonNull(tableProperties, "layout is null"); this.connectorId = connectorId; this.transaction = transaction; - this.layout = layout; - } - - public Optional> getColumns() - { - return layout.getColumns(); + this.tableProperties = tableProperties; } public TupleDomain getPredicate() { - return layout.getPredicate(); + return tableProperties.getPredicate(); } public List> getLocalProperties() { - return layout.getLocalProperties(); + return tableProperties.getLocalProperties(); } public Optional getTablePartitioning() { - return layout.getTablePartitioning() + return tableProperties.getTablePartitioning() .map(nodePartitioning -> new TablePartitioning( new PartitioningHandle( Optional.of(connectorId), @@ -75,12 +70,12 @@ public Optional getTablePartitioning() public Optional> getStreamPartitioningColumns() { - return layout.getStreamPartitioningColumns(); + return tableProperties.getStreamPartitioningColumns(); } public Optional getDiscretePredicates() { - return layout.getDiscretePredicates(); + return tableProperties.getDiscretePredicates(); } public static class TablePartitioning diff --git a/presto-main/src/main/java/io/prestosql/split/SplitManager.java b/presto-main/src/main/java/io/prestosql/split/SplitManager.java index ee36e7f1b2ad3..76b51edf75e18 100644 --- a/presto-main/src/main/java/io/prestosql/split/SplitManager.java +++ b/presto-main/src/main/java/io/prestosql/split/SplitManager.java @@ -71,17 +71,19 @@ public SplitSource getSplits(Session session, TableHandle table, SplitScheduling ConnectorSession connectorSession = session.toConnectorSession(connectorId); - ConnectorTableLayoutHandle layout = table.getLayout() - .orElseGet(() -> metadata.getLayout(session, table, Constraint.alwaysTrue(), Optional.empty()) - .get() - .getNewTableHandle() - .getLayout().get()); + ConnectorSplitSource source; + if (metadata.usesLegacyTableLayouts(session, table)) { + ConnectorTableLayoutHandle layout = table.getLayout() + .orElseGet(() -> metadata.getLayout(session, table, Constraint.alwaysTrue(), Optional.empty()) + .get() + .getNewTableHandle() + .getLayout().get()); - ConnectorSplitSource source = splitManager.getSplits( - table.getTransaction(), - connectorSession, - layout, - splitSchedulingStrategy); + source = splitManager.getSplits(table.getTransaction(), connectorSession, layout, splitSchedulingStrategy); + } + else { + source = splitManager.getSplits(table.getTransaction(), connectorSession, table.getConnectorHandle(), splitSchedulingStrategy); + } SplitSource splitSource = new ConnectorAwareSplitSource(connectorId, table.getTransaction(), source); if (minScheduleSplitBatchSize > 1) { diff --git a/presto-main/src/main/java/io/prestosql/sql/planner/PlanFragmenter.java b/presto-main/src/main/java/io/prestosql/sql/planner/PlanFragmenter.java index 90ee22c5fb928..4bad42357f9d1 100644 --- a/presto-main/src/main/java/io/prestosql/sql/planner/PlanFragmenter.java +++ b/presto-main/src/main/java/io/prestosql/sql/planner/PlanFragmenter.java @@ -22,7 +22,7 @@ import io.prestosql.execution.QueryManagerConfig; import io.prestosql.metadata.Metadata; import io.prestosql.metadata.TableHandle; -import io.prestosql.metadata.TableLayout.TablePartitioning; +import io.prestosql.metadata.TableProperties.TablePartitioning; import io.prestosql.spi.PrestoException; import io.prestosql.spi.connector.ConnectorPartitionHandle; import io.prestosql.spi.connector.ConnectorPartitioningHandle; @@ -278,7 +278,7 @@ public PlanNode visitMetadataDelete(MetadataDeleteNode node, RewriteContext context) { - PartitioningHandle partitioning = metadata.getLayout(session, node.getTable()) + PartitioningHandle partitioning = metadata.getTableProperties(session, node.getTable()) .getTablePartitioning() .map(TablePartitioning::getPartitioningHandle) .orElse(SOURCE_DISTRIBUTION); @@ -643,7 +643,7 @@ private GroupedExecutionProperties processWindowFunction(PlanNode node) @Override public GroupedExecutionProperties visitTableScan(TableScanNode node, Void context) { - Optional tablePartitioning = metadata.getLayout(session, node.getTable()).getTablePartitioning(); + Optional tablePartitioning = metadata.getTableProperties(session, node.getTable()).getTablePartitioning(); if (!tablePartitioning.isPresent()) { return GroupedExecutionProperties.notCapable(); } @@ -748,7 +748,7 @@ public PartitioningHandleReassigner(PartitioningHandle fragmentPartitioningHandl @Override public PlanNode visitTableScan(TableScanNode node, RewriteContext context) { - PartitioningHandle partitioning = metadata.getLayout(session, node.getTable()) + PartitioningHandle partitioning = metadata.getTableProperties(session, node.getTable()) .getTablePartitioning() .map(TablePartitioning::getPartitioningHandle) .orElse(SOURCE_DISTRIBUTION); diff --git a/presto-main/src/main/java/io/prestosql/sql/planner/iterative/rule/PushPredicateIntoTableScan.java b/presto-main/src/main/java/io/prestosql/sql/planner/iterative/rule/PushPredicateIntoTableScan.java index 07625f27ee587..3f169ec777430 100644 --- a/presto-main/src/main/java/io/prestosql/sql/planner/iterative/rule/PushPredicateIntoTableScan.java +++ b/presto-main/src/main/java/io/prestosql/sql/planner/iterative/rule/PushPredicateIntoTableScan.java @@ -102,7 +102,7 @@ public Result apply(FilterNode filterNode, Captures captures, Context context) { TableScanNode tableScan = captures.get(TABLE_SCAN); - PlanNode rewritten = pushFilterIntoTableScan( + Optional rewritten = pushFilterIntoTableScan( tableScan, filterNode.getPredicate(), false, @@ -113,11 +113,11 @@ public Result apply(FilterNode filterNode, Captures captures, Context context) typeAnalyzer, domainTranslator); - if (arePlansSame(filterNode, tableScan, rewritten)) { + if (!rewritten.isPresent() || arePlansSame(filterNode, tableScan, rewritten.get())) { return Result.empty(); } - return Result.ofPlanNode(rewritten); + return Result.ofPlanNode(rewritten.get()); } private boolean arePlansSame(FilterNode filter, TableScanNode tableScan, PlanNode rewritten) @@ -141,7 +141,7 @@ private boolean arePlansSame(FilterNode filter, TableScanNode tableScan, PlanNod && Objects.equals(tableScan.getEnforcedConstraint(), rewrittenTableScan.getEnforcedConstraint()); } - public static PlanNode pushFilterIntoTableScan( + public static Optional pushFilterIntoTableScan( TableScanNode node, Expression predicate, boolean pruneWithPredicateExpression, @@ -152,6 +152,10 @@ public static PlanNode pushFilterIntoTableScan( TypeAnalyzer typeAnalyzer, DomainTranslator domainTranslator) { + if (!metadata.usesLegacyTableLayouts(session, node.getTable())) { + return Optional.empty(); + } + // don't include non-deterministic predicates Expression deterministicPredicate = filterDeterministicConjuncts(predicate); @@ -196,8 +200,8 @@ public static PlanNode pushFilterIntoTableScan( .map(node.getAssignments()::get) .collect(toImmutableSet()))); - if (!layout.isPresent() || layout.get().getLayout().getPredicate().isNone()) { - return new ValuesNode(idAllocator.getNextId(), node.getOutputSymbols(), ImmutableList.of()); + if (!layout.isPresent() || layout.get().getTableProperties().getPredicate().isNone()) { + return Optional.of(new ValuesNode(idAllocator.getNextId(), node.getOutputSymbols(), ImmutableList.of())); } TableScanNode tableScan = new TableScanNode( @@ -205,7 +209,7 @@ public static PlanNode pushFilterIntoTableScan( layout.get().getNewTableHandle(), node.getOutputSymbols(), node.getAssignments(), - layout.get().getLayout().getPredicate(), + layout.get().getTableProperties().getPredicate(), computeEnforced(newDomain, layout.get().getUnenforcedConstraint())); // The order of the arguments to combineConjuncts matters: @@ -222,10 +226,10 @@ public static PlanNode pushFilterIntoTableScan( decomposedPredicate.getRemainingExpression()); if (!TRUE_LITERAL.equals(resultingPredicate)) { - return new FilterNode(idAllocator.getNextId(), tableScan, resultingPredicate); + return Optional.of(new FilterNode(idAllocator.getNextId(), tableScan, resultingPredicate)); } - return tableScan; + return Optional.of(tableScan); } private static class LayoutConstraintEvaluator diff --git a/presto-main/src/main/java/io/prestosql/sql/planner/optimizations/AddExchanges.java b/presto-main/src/main/java/io/prestosql/sql/planner/optimizations/AddExchanges.java index 57f98b9d80640..7e2653cb276d0 100644 --- a/presto-main/src/main/java/io/prestosql/sql/planner/optimizations/AddExchanges.java +++ b/presto-main/src/main/java/io/prestosql/sql/planner/optimizations/AddExchanges.java @@ -491,7 +491,11 @@ public PlanWithProperties visitDistinctLimit(DistinctLimitNode node, PreferredPr public PlanWithProperties visitFilter(FilterNode node, PreferredProperties preferredProperties) { if (node.getSource() instanceof TableScanNode) { - return planTableScan((TableScanNode) node.getSource(), node.getPredicate()); + Optional plan = planTableScan((TableScanNode) node.getSource(), node.getPredicate()); + + if (plan.isPresent()) { + return plan.get(); + } } return rebaseAndDeriveProperties(node, planChild(node, preferredProperties)); @@ -500,7 +504,8 @@ public PlanWithProperties visitFilter(FilterNode node, PreferredProperties prefe @Override public PlanWithProperties visitTableScan(TableScanNode node, PreferredProperties preferredProperties) { - return planTableScan(node, TRUE_LITERAL); + return planTableScan(node, TRUE_LITERAL) + .orElseGet(() -> new PlanWithProperties(node, deriveProperties(node, ImmutableList.of()))); } @Override @@ -530,10 +535,10 @@ else if (redistributeWrites) { return rebaseAndDeriveProperties(node, source); } - private PlanWithProperties planTableScan(TableScanNode node, Expression predicate) + private Optional planTableScan(TableScanNode node, Expression predicate) { - PlanNode plan = PushPredicateIntoTableScan.pushFilterIntoTableScan(node, predicate, true, session, types, idAllocator, metadata, typeAnalyzer, domainTranslator); - return new PlanWithProperties(plan, derivePropertiesRecursively(plan)); + return PushPredicateIntoTableScan.pushFilterIntoTableScan(node, predicate, true, session, types, idAllocator, metadata, typeAnalyzer, domainTranslator) + .map(plan -> new PlanWithProperties(plan, derivePropertiesRecursively(plan))); } @Override diff --git a/presto-main/src/main/java/io/prestosql/sql/planner/optimizations/MetadataQueryOptimizer.java b/presto-main/src/main/java/io/prestosql/sql/planner/optimizations/MetadataQueryOptimizer.java index 5da43eb4ed37b..41947df7b9a5c 100644 --- a/presto-main/src/main/java/io/prestosql/sql/planner/optimizations/MetadataQueryOptimizer.java +++ b/presto-main/src/main/java/io/prestosql/sql/planner/optimizations/MetadataQueryOptimizer.java @@ -21,7 +21,7 @@ import io.prestosql.SystemSessionProperties; import io.prestosql.execution.warnings.WarningCollector; import io.prestosql.metadata.Metadata; -import io.prestosql.metadata.TableLayout; +import io.prestosql.metadata.TableProperties; import io.prestosql.spi.connector.ColumnHandle; import io.prestosql.spi.connector.ColumnMetadata; import io.prestosql.spi.connector.DiscretePredicates; @@ -135,7 +135,7 @@ public PlanNode visitAggregation(AggregationNode node, RewriteContext cont // Materialize the list of partitions and replace the TableScan node // with a Values node - TableLayout layout = metadata.getLayout(session, tableScan.getTable()); + TableProperties layout = metadata.getTableProperties(session, tableScan.getTable()); if (!layout.getDiscretePredicates().isPresent()) { return context.defaultRewrite(node); } diff --git a/presto-main/src/main/java/io/prestosql/sql/planner/optimizations/PropertyDerivations.java b/presto-main/src/main/java/io/prestosql/sql/planner/optimizations/PropertyDerivations.java index b0de8196db49e..25dec0dbad65d 100644 --- a/presto-main/src/main/java/io/prestosql/sql/planner/optimizations/PropertyDerivations.java +++ b/presto-main/src/main/java/io/prestosql/sql/planner/optimizations/PropertyDerivations.java @@ -22,8 +22,8 @@ import io.prestosql.Session; import io.prestosql.SystemSessionProperties; import io.prestosql.metadata.Metadata; -import io.prestosql.metadata.TableLayout; -import io.prestosql.metadata.TableLayout.TablePartitioning; +import io.prestosql.metadata.TableProperties; +import io.prestosql.metadata.TableProperties.TablePartitioning; import io.prestosql.spi.connector.ColumnHandle; import io.prestosql.spi.connector.ConstantProperty; import io.prestosql.spi.connector.GroupingProperty; @@ -706,7 +706,7 @@ public ActualProperties visitValues(ValuesNode node, List cont @Override public ActualProperties visitTableScan(TableScanNode node, List inputProperties) { - TableLayout layout = metadata.getLayout(session, node.getTable()); + TableProperties layout = metadata.getTableProperties(session, node.getTable()); Map assignments = ImmutableBiMap.copyOf(node.getAssignments()).inverse(); ActualProperties.Builder properties = ActualProperties.builder(); @@ -736,7 +736,7 @@ public ActualProperties visitTableScan(TableScanNode node, List assignments, Map constants) + private Global deriveGlobalProperties(TableProperties layout, Map assignments, Map constants) { Optional> streamPartitioning = layout.getStreamPartitioningColumns() .flatMap(columns -> translateToNonConstantSymbols(columns, assignments, constants)); diff --git a/presto-main/src/main/java/io/prestosql/sql/planner/optimizations/StreamPropertyDerivations.java b/presto-main/src/main/java/io/prestosql/sql/planner/optimizations/StreamPropertyDerivations.java index ed1c0ff221d63..3dbf8de5f9179 100644 --- a/presto-main/src/main/java/io/prestosql/sql/planner/optimizations/StreamPropertyDerivations.java +++ b/presto-main/src/main/java/io/prestosql/sql/planner/optimizations/StreamPropertyDerivations.java @@ -20,7 +20,7 @@ import com.google.common.collect.Iterables; import io.prestosql.Session; import io.prestosql.metadata.Metadata; -import io.prestosql.metadata.TableLayout; +import io.prestosql.metadata.TableProperties; import io.prestosql.spi.connector.ColumnHandle; import io.prestosql.spi.connector.LocalProperty; import io.prestosql.sql.planner.Partitioning.ArgumentBinding; @@ -254,7 +254,7 @@ public StreamProperties visitValues(ValuesNode node, List cont @Override public StreamProperties visitTableScan(TableScanNode node, List inputProperties) { - TableLayout layout = metadata.getLayout(session, node.getTable()); + TableProperties layout = metadata.getTableProperties(session, node.getTable()); Map assignments = ImmutableBiMap.copyOf(node.getAssignments()).inverse(); // Globally constant assignments diff --git a/presto-main/src/test/java/io/prestosql/metadata/AbstractMockMetadata.java b/presto-main/src/test/java/io/prestosql/metadata/AbstractMockMetadata.java index 433c7e026a4cc..433ed0119c760 100644 --- a/presto-main/src/test/java/io/prestosql/metadata/AbstractMockMetadata.java +++ b/presto-main/src/test/java/io/prestosql/metadata/AbstractMockMetadata.java @@ -120,7 +120,7 @@ public Optional getLayout(Session session, TableHandle tableH } @Override - public TableLayout getLayout(Session session, TableHandle handle) + public TableProperties getTableProperties(Session session, TableHandle handle) { throw new UnsupportedOperationException(); } @@ -508,4 +508,10 @@ public boolean catalogExists(Session session, String catalogName) { throw new UnsupportedOperationException(); } + + @Override + public boolean usesLegacyTableLayouts(Session session, TableHandle table) + { + throw new UnsupportedOperationException(); + } } diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorHandleResolver.java b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorHandleResolver.java index bca90157e0f8c..3b205d6617aed 100644 --- a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorHandleResolver.java +++ b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorHandleResolver.java @@ -17,7 +17,10 @@ public interface ConnectorHandleResolver { Class getTableHandleClass(); - Class getTableLayoutHandleClass(); + default Class getTableLayoutHandleClass() + { + throw new UnsupportedOperationException(); + } Class getColumnHandleClass(); diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorMetadata.java b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorMetadata.java index ddc5a36c5f309..8b79065d60146 100644 --- a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorMetadata.java +++ b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorMetadata.java @@ -36,6 +36,7 @@ import static io.prestosql.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.lang.String.format; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.stream.Collectors.toList; @@ -88,13 +89,29 @@ default Optional getSystemTable(ConnectorSession session, SchemaTab *

* For each layout, connectors must return an "unenforced constraint" representing the part of the constraint summary that isn't guaranteed by the layout. */ - List getTableLayouts( + @Deprecated + default List getTableLayouts( ConnectorSession session, ConnectorTableHandle table, Constraint constraint, - Optional> desiredColumns); + Optional> desiredColumns) + { + if (!usesLegacyTableLayouts()) { + throw new IllegalStateException("Connector uses legacy Table Layout but doesn't implement getTableLayouts()"); + } + + throw new UnsupportedOperationException("Not yet implemented"); + } - ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle); + @Deprecated + default ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) + { + if (!usesLegacyTableLayouts()) { + throw new IllegalStateException("Connector uses legacy Table Layout but doesn't implement getTableLayout()"); + } + + throw new UnsupportedOperationException("Not yet implemented"); + } /** * Return a table layout handle whose partitioning is converted to the provided partitioning handle, @@ -131,11 +148,17 @@ default Optional getCommonPartitioningHandle(Connec * * @throws RuntimeException if table handle is no longer valid */ + @Deprecated default Optional getInfo(ConnectorTableLayoutHandle layoutHandle) { return Optional.empty(); } + default Optional getInfo(ConnectorTableHandle table) + { + return Optional.empty(); + } + /** * List table names, possibly filtered by schema. An empty list is returned if none match. */ @@ -262,29 +285,17 @@ default Optional getNewTableLayout(ConnectorSession ses */ default Optional getInsertLayout(ConnectorSession session, ConnectorTableHandle tableHandle) { - List layouts = getTableLayouts(session, tableHandle, new Constraint<>(TupleDomain.all(), map -> true), Optional.empty()) - .stream() - .map(ConnectorTableLayoutResult::getTableLayout) - .filter(layout -> layout.getTablePartitioning().isPresent()) - .collect(toList()); - - if (layouts.isEmpty()) { - return Optional.empty(); - } - - if (layouts.size() > 1) { - throw new PrestoException(NOT_SUPPORTED, "Tables with multiple layouts can not be written"); - } - - ConnectorTableLayout layout = layouts.get(0); - ConnectorPartitioningHandle partitioningHandle = layout.getTablePartitioning().get().getPartitioningHandle(); - Map columnNamesByHandle = getColumnHandles(session, tableHandle).entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)); - List partitionColumns = layout.getTablePartitioning().get().getPartitioningColumns().stream() - .map(columnNamesByHandle::get) - .collect(toList()); + ConnectorTableProperties properties = getTableProperties(session, tableHandle); + return properties.getTablePartitioning() + .map(partitioning -> { + Map columnNamesByHandle = getColumnHandles(session, tableHandle).entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)); + List partitionColumns = partitioning.getPartitioningColumns().stream() + .map(columnNamesByHandle::get) + .collect(toList()); - return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionColumns)); + return new ConnectorNewTableLayout(partitioning.getPartitioningHandle(), partitionColumns); + }); } /** @@ -538,4 +549,33 @@ default List listTablePrivileges(ConnectorSession session, SchemaTabl { return emptyList(); } + + /** + * Whether the connector uses the legacy Table Layout feature. If this method returns false, + * connectors are required to implement the following methods: + *
    + *
  • {@link #getTableProperties(ConnectorSession session, ConnectorTableHandle table)}
  • + *
  • {@link #getInfo(ConnectorTableHandle table)}
  • + *
  • {@link ConnectorSplitManager#getSplits(ConnectorTransactionHandle, ConnectorSession, ConnectorTableHandle, ConnectorSplitManager.SplitSchedulingStrategy)}
  • + *
+ */ + default boolean usesLegacyTableLayouts() + { + return true; + } + + default ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table) + { + if (!usesLegacyTableLayouts()) { + throw new IllegalStateException("getTableProperties() must be implemented if usesLegacyTableLayouts is false"); + } + + List layouts = getTableLayouts(session, table, Constraint.alwaysTrue(), Optional.empty()); + + if (layouts.size() != 1) { + throw new PrestoException(NOT_SUPPORTED, format("Connector must return a single layout for table %s, but got %s", table, layouts.size())); + } + + return new ConnectorTableProperties(layouts.get(0).getTableLayout()); + } } diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorSplitManager.java b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorSplitManager.java index d4b3026dc97f2..8b20eb2155390 100644 --- a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorSplitManager.java +++ b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorSplitManager.java @@ -15,11 +15,24 @@ public interface ConnectorSplitManager { - ConnectorSplitSource getSplits( + @Deprecated + default ConnectorSplitSource getSplits( ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout, - SplitSchedulingStrategy splitSchedulingStrategy); + SplitSchedulingStrategy splitSchedulingStrategy) + { + throw new UnsupportedOperationException(); + } + + default ConnectorSplitSource getSplits( + ConnectorTransactionHandle transaction, + ConnectorSession session, + ConnectorTableHandle table, + SplitSchedulingStrategy splitSchedulingStrategy) + { + throw new UnsupportedOperationException(); + } enum SplitSchedulingStrategy { diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorTableLayout.java b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorTableLayout.java index 33baf5201a22d..ddc561d2757dc 100644 --- a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorTableLayout.java +++ b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorTableLayout.java @@ -23,6 +23,7 @@ import static java.util.Collections.emptyList; import static java.util.Objects.requireNonNull; +@Deprecated public class ConnectorTableLayout { private final ConnectorTableLayoutHandle handle; diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorTableLayoutHandle.java b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorTableLayoutHandle.java index add2533c77a08..fdaaadd3a2844 100644 --- a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorTableLayoutHandle.java +++ b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorTableLayoutHandle.java @@ -13,6 +13,7 @@ */ package io.prestosql.spi.connector; +@Deprecated public interface ConnectorTableLayoutHandle { } diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorTableProperties.java b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorTableProperties.java new file mode 100644 index 0000000000000..8e138b046cb2c --- /dev/null +++ b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorTableProperties.java @@ -0,0 +1,145 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.spi.connector; + +import io.prestosql.spi.predicate.TupleDomain; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static java.util.Collections.emptyList; +import static java.util.Objects.requireNonNull; + +public class ConnectorTableProperties +{ + private final TupleDomain predicate; + private final Optional tablePartitioning; + private final Optional> streamPartitioningColumns; + private final Optional discretePredicates; + private final List> localProperties; + + public ConnectorTableProperties() + { + this(TupleDomain.all(), Optional.empty(), Optional.empty(), Optional.empty(), emptyList()); + } + + @Deprecated + public ConnectorTableProperties(ConnectorTableLayout layout) + { + this(layout.getPredicate(), + layout.getTablePartitioning(), + layout.getStreamPartitioningColumns(), + layout.getDiscretePredicates(), + layout.getLocalProperties()); + } + + public ConnectorTableProperties( + TupleDomain predicate, + Optional tablePartitioning, + Optional> streamPartitioningColumns, + Optional discretePredicates, + List> localProperties) + { + requireNonNull(streamPartitioningColumns, "partitioningColumns is null"); + requireNonNull(tablePartitioning, "tablePartitioning is null"); + requireNonNull(predicate, "predicate is null"); + requireNonNull(discretePredicates, "discretePredicates is null"); + requireNonNull(localProperties, "localProperties is null"); + + this.tablePartitioning = tablePartitioning; + this.streamPartitioningColumns = streamPartitioningColumns; + this.predicate = predicate; + this.discretePredicates = discretePredicates; + this.localProperties = localProperties; + } + + /** + * A TupleDomain that represents a predicate that every row in this table satisfies. + *

+ * This guarantee can have different origins. + * For example, it may be successful predicate push down, or inherent guarantee provided by the underlying data. + */ + public TupleDomain getPredicate() + { + return predicate; + } + + /** + * The partitioning of the table across the worker nodes. + *

+ * If the table is node partitioned, the connector guarantees that each combination of values for + * the distributed columns will be contained within a single worker. + */ + public Optional getTablePartitioning() + { + return tablePartitioning; + } + + /** + * The partitioning for the table streams. + * If empty, the table layout is partitioned arbitrarily. + * Otherwise, table steams are partitioned on the given set of columns (or unpartitioned, if the set is empty) + *

+ * If the table is partitioned, the connector guarantees that each combination of values for + * the partition columns will be contained within a single split (i.e., partitions cannot + * straddle multiple splits) + */ + public Optional> getStreamPartitioningColumns() + { + return streamPartitioningColumns; + } + + /** + * A collection of discrete predicates describing the data in this layout. The union of + * these predicates is expected to be equivalent to the overall predicate returned + * by {@link #getPredicate()}. They may be used by the engine for further optimizations. + */ + public Optional getDiscretePredicates() + { + return discretePredicates; + } + + /** + * Properties describing the layout of the data (grouping/sorting) within each partition + */ + public List> getLocalProperties() + { + return localProperties; + } + + @Override + public int hashCode() + { + return Objects.hash(predicate, discretePredicates, streamPartitioningColumns, tablePartitioning, localProperties); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + ConnectorTableProperties other = (ConnectorTableProperties) obj; + return Objects.equals(this.predicate, other.predicate) + && Objects.equals(this.discretePredicates, other.discretePredicates) + && Objects.equals(this.streamPartitioningColumns, other.streamPartitioningColumns) + && Objects.equals(this.tablePartitioning, other.tablePartitioning) + && Objects.equals(this.localProperties, other.localProperties); + } +} diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java b/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java index 8d1eb6542343e..227bf1b3f1a1a 100644 --- a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java @@ -30,6 +30,7 @@ import io.prestosql.spi.connector.ConnectorTableLayoutHandle; import io.prestosql.spi.connector.ConnectorTableLayoutResult; import io.prestosql.spi.connector.ConnectorTableMetadata; +import io.prestosql.spi.connector.ConnectorTableProperties; import io.prestosql.spi.connector.ConnectorViewDefinition; import io.prestosql.spi.connector.Constraint; import io.prestosql.spi.connector.SchemaTableName; @@ -205,6 +206,14 @@ public Optional getInfo(ConnectorTableLayoutHandle table) } } + @Override + public Optional getInfo(ConnectorTableHandle table) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getInfo(table); + } + } + @Override public List listTables(ConnectorSession session, Optional schemaName) { @@ -532,4 +541,20 @@ public List listTablePrivileges(ConnectorSession session, SchemaTable return delegate.listTablePrivileges(session, prefix); } } + + @Override + public boolean usesLegacyTableLayouts() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.usesLegacyTableLayouts(); + } + } + + @Override + public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getTableProperties(session, table); + } + } } diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorSplitManager.java b/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorSplitManager.java index 510bfcf12256b..1705be90c8f38 100644 --- a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorSplitManager.java +++ b/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorSplitManager.java @@ -17,6 +17,7 @@ import io.prestosql.spi.connector.ConnectorSession; import io.prestosql.spi.connector.ConnectorSplitManager; import io.prestosql.spi.connector.ConnectorSplitSource; +import io.prestosql.spi.connector.ConnectorTableHandle; import io.prestosql.spi.connector.ConnectorTableLayoutHandle; import io.prestosql.spi.connector.ConnectorTransactionHandle; @@ -41,4 +42,12 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand return delegate.getSplits(transactionHandle, session, layout, splitSchedulingStrategy); } } + + @Override + public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableHandle table, SplitSchedulingStrategy splitSchedulingStrategy) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getSplits(transaction, session, table, splitSchedulingStrategy); + } + } }