Skip to content

Commit

Permalink
Deprecate table layouts
Browse files Browse the repository at this point in the history
Introduce a usesLegacyMetadata() method to ConnectorMetadata for
connectors to indicate if they use the legacy Table Layouts feature.
The method returns true by default during the transition period.

The following methods are deprecated:

    ConnectorMetadata.getTableLayouts()
    ConnectorMetadata.getTableLayout()
    ConnectorMetadata.getInfo(ConnectorTableLayoutHandle)
    ConnectorHandleResolver.getTableLayoutHandleClass()
    ConnectorSplitManager.getSplits(..., ConnectorTableLayoutHandle, ...)

Connectors that do not support Table Layouts need to implement
new methods instead:

    ConnectorMetadata.getTableProperties()
    ConnectorMetadata.getInto(ConnectorTableHandle)
    ConnectorSplitManager.getSplits(..., ConnectorTableHandle, ...)
  • Loading branch information
martint committed Mar 11, 2019
1 parent f04adeb commit f4ce2a2
Show file tree
Hide file tree
Showing 21 changed files with 380 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.prestosql.spi.connector.ConnectorPageSourceProvider;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.connector.ConnectorSplitSource;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTableLayout;
import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
Expand Down Expand Up @@ -130,8 +131,13 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
@Override
public ConnectorSplitManager getSplitManager()
{
return (transactionHandle, session, layout, splitSchedulingStrategy) -> {
throw new UnsupportedOperationException();
return new ConnectorSplitManager()
{
@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableHandle table, SplitSchedulingStrategy splitSchedulingStrategy)
{
throw new UnsupportedOperationException();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ public interface Metadata

Optional<TableHandle> getTableHandleForStatisticsCollection(Session session, QualifiedObjectName tableName, Map<String, Object> analyzeProperties);

@Deprecated
Optional<TableLayoutResult> getLayout(Session session, TableHandle tableHandle, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns);

TableLayout getLayout(Session session, TableHandle handle);
TableProperties getTableProperties(Session session, TableHandle handle);

/**
* Return a table handle whose partitioning is converted to the provided partitioning handle,
Expand Down Expand Up @@ -380,4 +381,7 @@ public interface Metadata
ColumnPropertyManager getColumnPropertyManager();

AnalyzePropertyManager getAnalyzePropertyManager();

@Deprecated
boolean usesLegacyTableLayouts(Session session, TableHandle table);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
import io.prestosql.spi.connector.ConnectorTableLayoutResult;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.ConnectorTableProperties;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.ConnectorViewDefinition;
import io.prestosql.spi.connector.Constraint;
Expand Down Expand Up @@ -88,6 +89,7 @@
import java.util.concurrent.ConcurrentMap;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.prestosql.metadata.QualifiedObjectName.convertFromSchemaTableName;
import static io.prestosql.metadata.ViewDefinition.ViewColumn;
Expand Down Expand Up @@ -391,6 +393,9 @@ public Optional<TableLayoutResult> getLayout(Session session, TableHandle table,

CatalogMetadata catalogMetadata = getCatalogMetadata(session, connectorId);
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(connectorId);

checkState(metadata.usesLegacyTableLayouts(), "getLayout() was called even though connector doesn't support legacy Table Layout");

ConnectorTransactionHandle transaction = catalogMetadata.getTransactionHandleFor(connectorId);
ConnectorSession connectorSession = session.toConnectorSession(connectorId);
List<ConnectorTableLayoutResult> layouts = metadata.getTableLayouts(connectorSession, connectorTable, constraint, desiredColumns);
Expand All @@ -405,23 +410,27 @@ public Optional<TableLayoutResult> getLayout(Session session, TableHandle table,
ConnectorTableLayout tableLayout = layouts.get(0).getTableLayout();
return Optional.of(new TableLayoutResult(
new TableHandle(connectorId, connectorTable, transaction, Optional.of(tableLayout.getHandle())),
new TableLayout(connectorId, transaction, tableLayout),
new TableProperties(connectorId, transaction, new ConnectorTableProperties(tableLayout)),
layouts.get(0).getUnenforcedConstraint()));
}

@Override
public TableLayout getLayout(Session session, TableHandle handle)
public TableProperties getTableProperties(Session session, TableHandle handle)
{
ConnectorId connectorId = handle.getConnectorId();
CatalogMetadata catalogMetadata = getCatalogMetadata(session, connectorId);
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(connectorId);
ConnectorSession connectorSession = session.toConnectorSession(connectorId);

return handle.getLayout()
.map(layout -> new TableLayout(connectorId, handle.getTransaction(), metadata.getTableLayout(connectorSession, layout)))
.orElseGet(() -> getLayout(session, handle, Constraint.alwaysTrue(), Optional.empty())
.get()
.getLayout());
if (metadata.usesLegacyTableLayouts()) {
return handle.getLayout()
.map(layout -> new TableProperties(connectorId, handle.getTransaction(), new ConnectorTableProperties(metadata.getTableLayout(connectorSession, layout))))
.orElseGet(() -> getLayout(session, handle, Constraint.alwaysTrue(), Optional.empty())
.get()
.getTableProperties());
}

return new TableProperties(connectorId, handle.getTransaction(), metadata.getTableProperties(connectorSession, handle.getConnectorHandle()));
}

@Override
Expand Down Expand Up @@ -461,14 +470,18 @@ public Optional<Object> getInfo(Session session, TableHandle handle)
ConnectorId connectorId = handle.getConnectorId();
ConnectorMetadata metadata = getMetadata(session, connectorId);

ConnectorTableLayoutHandle layoutHandle = handle.getLayout()
.orElseGet(() -> getLayout(session, handle, Constraint.alwaysTrue(), Optional.empty())
.get()
.getNewTableHandle()
.getLayout()
.get());
if (usesLegacyTableLayouts(session, handle)) {
ConnectorTableLayoutHandle layoutHandle = handle.getLayout()
.orElseGet(() -> getLayout(session, handle, Constraint.alwaysTrue(), Optional.empty())
.get()
.getNewTableHandle()
.getLayout()
.get());

return metadata.getInfo(layoutHandle);
}

return metadata.getInfo(layoutHandle);
return metadata.getInfo(handle.getConnectorHandle());
}

@Override
Expand Down Expand Up @@ -1151,6 +1164,12 @@ public AnalyzePropertyManager getAnalyzePropertyManager()
return analyzePropertyManager;
}

@Override
public boolean usesLegacyTableLayouts(Session session, TableHandle table)
{
return getMetadata(session, table.getConnectorId()).usesLegacyTableLayouts();
}

private ViewDefinition deserializeView(String data)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

@Deprecated
public class TableLayoutResult
{
private final TableHandle newTableHandle;
private final TableLayout layout;
private final TableProperties layout;
private final TupleDomain<ColumnHandle> unenforcedConstraint;

public TableLayoutResult(TableHandle newTable, TableLayout layout, TupleDomain<ColumnHandle> unenforcedConstraint)
public TableLayoutResult(TableHandle newTable, TableProperties layout, TupleDomain<ColumnHandle> unenforcedConstraint)
{
this.newTableHandle = requireNonNull(newTable, "newTable is null");
this.layout = requireNonNull(layout, "layout is null");
Expand All @@ -41,7 +42,7 @@ public TableHandle getNewTableHandle()
return newTableHandle;
}

public TableLayout getLayout()
public TableProperties getTableProperties()
{
return layout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.google.common.collect.ImmutableList;
import io.prestosql.connector.ConnectorId;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorTableLayout;
import io.prestosql.spi.connector.ConnectorTableProperties;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.DiscretePredicates;
import io.prestosql.spi.connector.LocalProperty;
Expand All @@ -30,41 +30,36 @@

import static java.util.Objects.requireNonNull;

public class TableLayout
public class TableProperties
{
private final ConnectorTableLayout layout;
private final ConnectorTableProperties tableProperties;
private final ConnectorId connectorId;
private final ConnectorTransactionHandle transaction;

public TableLayout(ConnectorId connectorId, ConnectorTransactionHandle transaction, ConnectorTableLayout layout)
public TableProperties(ConnectorId connectorId, ConnectorTransactionHandle transaction, ConnectorTableProperties tableProperties)
{
requireNonNull(connectorId, "connectorId is null");
requireNonNull(transaction, "transaction is null");
requireNonNull(layout, "layout is null");
requireNonNull(tableProperties, "layout is null");

this.connectorId = connectorId;
this.transaction = transaction;
this.layout = layout;
}

public Optional<List<ColumnHandle>> getColumns()
{
return layout.getColumns();
this.tableProperties = tableProperties;
}

public TupleDomain<ColumnHandle> getPredicate()
{
return layout.getPredicate();
return tableProperties.getPredicate();
}

public List<LocalProperty<ColumnHandle>> getLocalProperties()
{
return layout.getLocalProperties();
return tableProperties.getLocalProperties();
}

public Optional<TablePartitioning> getTablePartitioning()
{
return layout.getTablePartitioning()
return tableProperties.getTablePartitioning()
.map(nodePartitioning -> new TablePartitioning(
new PartitioningHandle(
Optional.of(connectorId),
Expand All @@ -75,12 +70,12 @@ public Optional<TablePartitioning> getTablePartitioning()

public Optional<Set<ColumnHandle>> getStreamPartitioningColumns()
{
return layout.getStreamPartitioningColumns();
return tableProperties.getStreamPartitioningColumns();
}

public Optional<DiscretePredicates> getDiscretePredicates()
{
return layout.getDiscretePredicates();
return tableProperties.getDiscretePredicates();
}

public static class TablePartitioning
Expand Down
22 changes: 12 additions & 10 deletions presto-main/src/main/java/io/prestosql/split/SplitManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,19 @@ public SplitSource getSplits(Session session, TableHandle table, SplitScheduling

ConnectorSession connectorSession = session.toConnectorSession(connectorId);

ConnectorTableLayoutHandle layout = table.getLayout()
.orElseGet(() -> metadata.getLayout(session, table, Constraint.alwaysTrue(), Optional.empty())
.get()
.getNewTableHandle()
.getLayout().get());
ConnectorSplitSource source;
if (metadata.usesLegacyTableLayouts(session, table)) {
ConnectorTableLayoutHandle layout = table.getLayout()
.orElseGet(() -> metadata.getLayout(session, table, Constraint.alwaysTrue(), Optional.empty())
.get()
.getNewTableHandle()
.getLayout().get());

ConnectorSplitSource source = splitManager.getSplits(
table.getTransaction(),
connectorSession,
layout,
splitSchedulingStrategy);
source = splitManager.getSplits(table.getTransaction(), connectorSession, layout, splitSchedulingStrategy);
}
else {
source = splitManager.getSplits(table.getTransaction(), connectorSession, table.getConnectorHandle(), splitSchedulingStrategy);
}

SplitSource splitSource = new ConnectorAwareSplitSource(connectorId, table.getTransaction(), source);
if (minScheduleSplitBatchSize > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.prestosql.execution.QueryManagerConfig;
import io.prestosql.metadata.Metadata;
import io.prestosql.metadata.TableHandle;
import io.prestosql.metadata.TableLayout.TablePartitioning;
import io.prestosql.metadata.TableProperties.TablePartitioning;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ConnectorPartitionHandle;
import io.prestosql.spi.connector.ConnectorPartitioningHandle;
Expand Down Expand Up @@ -278,7 +278,7 @@ public PlanNode visitMetadataDelete(MetadataDeleteNode node, RewriteContext<Frag
@Override
public PlanNode visitTableScan(TableScanNode node, RewriteContext<FragmentProperties> context)
{
PartitioningHandle partitioning = metadata.getLayout(session, node.getTable())
PartitioningHandle partitioning = metadata.getTableProperties(session, node.getTable())
.getTablePartitioning()
.map(TablePartitioning::getPartitioningHandle)
.orElse(SOURCE_DISTRIBUTION);
Expand Down Expand Up @@ -643,7 +643,7 @@ private GroupedExecutionProperties processWindowFunction(PlanNode node)
@Override
public GroupedExecutionProperties visitTableScan(TableScanNode node, Void context)
{
Optional<TablePartitioning> tablePartitioning = metadata.getLayout(session, node.getTable()).getTablePartitioning();
Optional<TablePartitioning> tablePartitioning = metadata.getTableProperties(session, node.getTable()).getTablePartitioning();
if (!tablePartitioning.isPresent()) {
return GroupedExecutionProperties.notCapable();
}
Expand Down Expand Up @@ -748,7 +748,7 @@ public PartitioningHandleReassigner(PartitioningHandle fragmentPartitioningHandl
@Override
public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)
{
PartitioningHandle partitioning = metadata.getLayout(session, node.getTable())
PartitioningHandle partitioning = metadata.getTableProperties(session, node.getTable())
.getTablePartitioning()
.map(TablePartitioning::getPartitioningHandle)
.orElse(SOURCE_DISTRIBUTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public Result apply(FilterNode filterNode, Captures captures, Context context)
{
TableScanNode tableScan = captures.get(TABLE_SCAN);

PlanNode rewritten = pushFilterIntoTableScan(
Optional<PlanNode> rewritten = pushFilterIntoTableScan(
tableScan,
filterNode.getPredicate(),
false,
Expand All @@ -113,11 +113,11 @@ public Result apply(FilterNode filterNode, Captures captures, Context context)
typeAnalyzer,
domainTranslator);

if (arePlansSame(filterNode, tableScan, rewritten)) {
if (!rewritten.isPresent() || arePlansSame(filterNode, tableScan, rewritten.get())) {
return Result.empty();
}

return Result.ofPlanNode(rewritten);
return Result.ofPlanNode(rewritten.get());
}

private boolean arePlansSame(FilterNode filter, TableScanNode tableScan, PlanNode rewritten)
Expand All @@ -141,7 +141,7 @@ private boolean arePlansSame(FilterNode filter, TableScanNode tableScan, PlanNod
&& Objects.equals(tableScan.getEnforcedConstraint(), rewrittenTableScan.getEnforcedConstraint());
}

public static PlanNode pushFilterIntoTableScan(
public static Optional<PlanNode> pushFilterIntoTableScan(
TableScanNode node,
Expression predicate,
boolean pruneWithPredicateExpression,
Expand All @@ -152,6 +152,10 @@ public static PlanNode pushFilterIntoTableScan(
TypeAnalyzer typeAnalyzer,
DomainTranslator domainTranslator)
{
if (!metadata.usesLegacyTableLayouts(session, node.getTable())) {
return Optional.empty();
}

// don't include non-deterministic predicates
Expression deterministicPredicate = filterDeterministicConjuncts(predicate);

Expand Down Expand Up @@ -196,16 +200,16 @@ public static PlanNode pushFilterIntoTableScan(
.map(node.getAssignments()::get)
.collect(toImmutableSet())));

if (!layout.isPresent() || layout.get().getLayout().getPredicate().isNone()) {
return new ValuesNode(idAllocator.getNextId(), node.getOutputSymbols(), ImmutableList.of());
if (!layout.isPresent() || layout.get().getTableProperties().getPredicate().isNone()) {
return Optional.of(new ValuesNode(idAllocator.getNextId(), node.getOutputSymbols(), ImmutableList.of()));
}

TableScanNode tableScan = new TableScanNode(
node.getId(),
layout.get().getNewTableHandle(),
node.getOutputSymbols(),
node.getAssignments(),
layout.get().getLayout().getPredicate(),
layout.get().getTableProperties().getPredicate(),
computeEnforced(newDomain, layout.get().getUnenforcedConstraint()));

// The order of the arguments to combineConjuncts matters:
Expand All @@ -222,10 +226,10 @@ public static PlanNode pushFilterIntoTableScan(
decomposedPredicate.getRemainingExpression());

if (!TRUE_LITERAL.equals(resultingPredicate)) {
return new FilterNode(idAllocator.getNextId(), tableScan, resultingPredicate);
return Optional.of(new FilterNode(idAllocator.getNextId(), tableScan, resultingPredicate));
}

return tableScan;
return Optional.of(tableScan);
}

private static class LayoutConstraintEvaluator
Expand Down
Loading

0 comments on commit f4ce2a2

Please sign in to comment.