Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove TableLayout API from Raptor connector #1015

Merged
merged 3 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,11 @@ public boolean supportsMetadataDelete(Session session, TableHandle tableHandle)
{
CatalogName catalogName = tableHandle.getCatalogName();
ConnectorMetadata metadata = getMetadata(session, catalogName);

if (!metadata.usesLegacyTableLayouts()) {
return false;
}

return metadata.supportsMetadataDelete(
session.toConnectorSession(catalogName),
tableHandle.getConnectorHandle(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.prestosql.spi.connector.ConnectorPartitioningHandle;
import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
import io.prestosql.spi.connector.ConnectorTransactionHandle;

public class RaptorHandleResolver
Expand All @@ -38,12 +37,6 @@ public Class<? extends ColumnHandle> getColumnHandleClass()
return RaptorColumnHandle.class;
}

@Override
public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass()
{
return RaptorTableLayoutHandle.class;
}

@Override
public Class<? extends ConnectorSplit> getSplitClass()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@
import io.prestosql.spi.connector.ConnectorPartitioningHandle;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTableLayout;
import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
import io.prestosql.spi.connector.ConnectorTableLayoutResult;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.ConnectorTablePartitioning;
import io.prestosql.spi.connector.ConnectorTableProperties;
import io.prestosql.spi.connector.ConnectorViewDefinition;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.connector.ConstraintApplicationResult;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.SchemaTablePrefix;
import io.prestosql.spi.connector.SystemTable;
Expand Down Expand Up @@ -185,6 +185,8 @@ private RaptorTableHandle getTableHandle(SchemaTableName tableName)
table.getBucketCount(),
table.isOrganized(),
OptionalLong.empty(),
TupleDomain.all(),
table.getDistributionId().map(shardManager::getBucketAssignments),
false);
}

Expand Down Expand Up @@ -297,35 +299,52 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
}

@Override
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint constraint, Optional<Set<ColumnHandle>> desiredColumns)
public boolean usesLegacyTableLayouts()
{
RaptorTableHandle handle = (RaptorTableHandle) table;
ConnectorTableLayout layout = getTableLayout(session, handle, constraint.getSummary());
return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
return false;
}

@Override
public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle)
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle handle, Constraint constraint)
{
RaptorTableLayoutHandle raptorHandle = (RaptorTableLayoutHandle) handle;
return getTableLayout(session, raptorHandle.getTable(), raptorHandle.getConstraint());
RaptorTableHandle table = (RaptorTableHandle) handle;
TupleDomain<RaptorColumnHandle> newDomain = constraint.getSummary().transform(RaptorColumnHandle.class::cast);

if (newDomain.equals(table.getConstraint())) {
return Optional.empty();
}

return Optional.of(new ConstraintApplicationResult(
new RaptorTableHandle(table.getSchemaName(),
Praveen2112 marked this conversation as resolved.
Show resolved Hide resolved
table.getTableName(),
table.getTableId(),
table.getDistributionId(),
table.getDistributionName(),
table.getBucketCount(),
table.isOrganized(),
table.getTransactionId(),
newDomain.intersect(table.getConstraint()),
table.getBucketAssignments(),
table.isDelete()),
constraint.getSummary()));
}

private ConnectorTableLayout getTableLayout(ConnectorSession session, RaptorTableHandle handle, TupleDomain<ColumnHandle> constraint)
@Override
public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle handle)
{
if (!handle.getDistributionId().isPresent()) {
return new ConnectorTableLayout(new RaptorTableLayoutHandle(handle, constraint, Optional.empty()));
RaptorTableHandle table = (RaptorTableHandle) handle;

if (!table.getPartitioningHandle().isPresent()) {
return new ConnectorTableProperties();
}

List<RaptorColumnHandle> bucketColumnHandles = getBucketColumnHandles(handle.getTableId());
List<RaptorColumnHandle> bucketColumnHandles = getBucketColumnHandles(table.getTableId());

RaptorPartitioningHandle partitioning = getPartitioningHandle(handle.getDistributionId().getAsLong());
RaptorPartitioningHandle partitioning = table.getPartitioningHandle().get();

boolean oneSplitPerBucket = handle.getBucketCount().getAsInt() >= getOneSplitPerBucketThreshold(session);
boolean oneSplitPerBucket = table.getBucketCount().getAsInt() >= getOneSplitPerBucketThreshold(session);

return new ConnectorTableLayout(
new RaptorTableLayoutHandle(handle, constraint, Optional.of(partitioning)),
Optional.empty(),
return new ConnectorTableProperties(
TupleDomain.all(),
Optional.of(new ConnectorTablePartitioning(
partitioning,
Expand Down Expand Up @@ -780,6 +799,8 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
handle.getBucketCount(),
handle.isOrganized(),
OptionalLong.of(transactionId),
TupleDomain.all(),
handle.getBucketAssignments(),
true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ public RaptorPageSourceProvider(StorageManager storageManager)
public ConnectorPageSource createPageSource(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<ColumnHandle> columns)
{
RaptorSplit raptorSplit = (RaptorSplit) split;
RaptorTableHandle raptorTable = (RaptorTableHandle) table;

OptionalInt bucketNumber = raptorSplit.getBucketNumber();
TupleDomain<RaptorColumnHandle> predicate = raptorSplit.getEffectivePredicate();
TupleDomain<RaptorColumnHandle> predicate = raptorTable.getConstraint();
ReaderAttributes attributes = ReaderAttributes.from(session);
OptionalLong transactionId = raptorSplit.getTransactionId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.common.collect.ImmutableSet;
import io.prestosql.spi.HostAddress;
import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.predicate.TupleDomain;

import java.util.List;
import java.util.OptionalInt;
Expand All @@ -36,49 +35,43 @@ public class RaptorSplit
private final Set<UUID> shardUuids;
private final OptionalInt bucketNumber;
private final List<HostAddress> addresses;
private final TupleDomain<RaptorColumnHandle> effectivePredicate;
private final OptionalLong transactionId;

@JsonCreator
public RaptorSplit(
@JsonProperty("shardUuids") Set<UUID> shardUuids,
@JsonProperty("bucketNumber") OptionalInt bucketNumber,
@JsonProperty("effectivePredicate") TupleDomain<RaptorColumnHandle> effectivePredicate,
@JsonProperty("transactionId") OptionalLong transactionId)
{
this(shardUuids, bucketNumber, ImmutableList.of(), effectivePredicate, transactionId);
this(shardUuids, bucketNumber, ImmutableList.of(), transactionId);
}

public RaptorSplit(
UUID shardUuid,
List<HostAddress> addresses,
TupleDomain<RaptorColumnHandle> effectivePredicate,
OptionalLong transactionId)
{
this(ImmutableSet.of(shardUuid), OptionalInt.empty(), addresses, effectivePredicate, transactionId);
this(ImmutableSet.of(shardUuid), OptionalInt.empty(), addresses, transactionId);
}

public RaptorSplit(
Set<UUID> shardUuids,
int bucketNumber,
HostAddress address,
TupleDomain<RaptorColumnHandle> effectivePredicate,
OptionalLong transactionId)
{
this(shardUuids, OptionalInt.of(bucketNumber), ImmutableList.of(address), effectivePredicate, transactionId);
this(shardUuids, OptionalInt.of(bucketNumber), ImmutableList.of(address), transactionId);
}

private RaptorSplit(
Set<UUID> shardUuids,
OptionalInt bucketNumber,
List<HostAddress> addresses,
TupleDomain<RaptorColumnHandle> effectivePredicate,
OptionalLong transactionId)
{
this.shardUuids = ImmutableSet.copyOf(requireNonNull(shardUuids, "shardUuid is null"));
this.bucketNumber = requireNonNull(bucketNumber, "bucketNumber is null");
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null");
this.transactionId = requireNonNull(transactionId, "transactionId is null");
}

Expand Down Expand Up @@ -106,12 +99,6 @@ public OptionalInt getBucketNumber()
return bucketNumber;
}

@JsonProperty
public TupleDomain<RaptorColumnHandle> getEffectivePredicate()
{
return effectivePredicate;
}

@JsonProperty
public OptionalLong getTransactionId()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@
import io.prestosql.spi.HostAddress;
import io.prestosql.spi.Node;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorPartitionHandle;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.connector.ConnectorSplitSource;
import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.predicate.TupleDomain;
import org.skife.jdbi.v2.ResultIterator;
Expand Down Expand Up @@ -91,18 +90,16 @@ public void destroy()
}

@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout, SplitSchedulingStrategy splitSchedulingStrategy)
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableHandle handle, SplitSchedulingStrategy splitSchedulingStrategy)
{
RaptorTableLayoutHandle handle = (RaptorTableLayoutHandle) layout;
RaptorTableHandle table = handle.getTable();
TupleDomain<RaptorColumnHandle> effectivePredicate = toRaptorTupleDomain(handle.getConstraint());
electrum marked this conversation as resolved.
Show resolved Hide resolved
RaptorTableHandle table = (RaptorTableHandle) handle;
long tableId = table.getTableId();
boolean bucketed = table.getBucketCount().isPresent();
boolean merged = bucketed && !table.isDelete() && (table.getBucketCount().getAsInt() >= getOneSplitPerBucketThreshold(session));
OptionalLong transactionId = table.getTransactionId();
Optional<List<String>> bucketToNode = handle.getPartitioning().map(RaptorPartitioningHandle::getBucketToNode);
Optional<List<String>> bucketToNode = table.getBucketAssignments();
verify(bucketed == bucketToNode.isPresent(), "mismatched bucketCount and bucketToNode presence");
return new RaptorSplitSource(tableId, merged, effectivePredicate, transactionId, bucketToNode);
return new RaptorSplitSource(tableId, merged, table.getConstraint(), transactionId, bucketToNode);
}

private static List<HostAddress> getAddressesForNodes(Map<String, Node> nodeMap, Iterable<String> nodeIdentifiers)
Expand All @@ -117,12 +114,6 @@ private static List<HostAddress> getAddressesForNodes(Map<String, Node> nodeMap,
return nodes.build();
}

@SuppressWarnings("unchecked")
private static TupleDomain<RaptorColumnHandle> toRaptorTupleDomain(TupleDomain<ColumnHandle> tupleDomain)
{
return tupleDomain.transform(handle -> (RaptorColumnHandle) handle);
}

private static <T> T selectRandom(Iterable<T> elements)
{
List<T> list = ImmutableList.copyOf(elements);
Expand All @@ -134,10 +125,10 @@ private class RaptorSplitSource
{
private final Map<String, Node> nodesById = uniqueIndex(nodeSupplier.getWorkerNodes(), Node::getNodeIdentifier);
private final long tableId;
private final TupleDomain<RaptorColumnHandle> effectivePredicate;
private final OptionalLong transactionId;
private final Optional<List<String>> bucketToNode;
private final ResultIterator<BucketShards> iterator;
private final TupleDomain<RaptorColumnHandle> effectivePredicate;

@GuardedBy("this")
private CompletableFuture<ConnectorSplitBatch> future;
Expand Down Expand Up @@ -233,7 +224,7 @@ private ConnectorSplit createSplit(BucketShards bucketShards)
addresses = ImmutableList.of(node.getHostAndPort());
}

return new RaptorSplit(shardId, addresses, effectivePredicate, transactionId);
return new RaptorSplit(shardId, addresses, transactionId);
}

private ConnectorSplit createBucketSplit(int bucketNumber, Set<ShardNodes> shards)
Expand All @@ -252,7 +243,7 @@ private ConnectorSplit createBucketSplit(int bucketNumber, Set<ShardNodes> shard
.collect(toSet());
HostAddress address = node.getHostAndPort();

return new RaptorSplit(shardUuids, bucketNumber, address, effectivePredicate, transactionId);
return new RaptorSplit(shardUuids, bucketNumber, address, transactionId);
}
}
}
Loading