Skip to content

Commit

Permalink
Remove deprecated split manager methods from Hive
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Jul 29, 2020
1 parent a519b32 commit cd8f939
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,15 @@
import io.prestosql.plugin.hive.util.HiveBucketing.HiveBucketFilter;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.VersionEmbedder;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.connector.ConnectorSplitSource;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.DynamicFilter;
import io.prestosql.spi.connector.FixedSplitSource;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.TableNotFoundException;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.type.TypeManager;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;
Expand All @@ -54,7 +53,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Function;
import java.util.function.Supplier;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -173,23 +171,13 @@ public HiveSplitManager(
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

@Override
public ConnectorSplitSource getSplits(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorTableHandle tableHandle,
SplitSchedulingStrategy splitSchedulingStrategy)
{
return getSplits(transaction, session, tableHandle, splitSchedulingStrategy, TupleDomain::all);
}

@Override
public ConnectorSplitSource getSplits(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorTableHandle tableHandle,
SplitSchedulingStrategy splitSchedulingStrategy,
Supplier<TupleDomain<ColumnHandle>> dynamicFilter)
DynamicFilter dynamicFilter)
{
HiveTableHandle hiveTable = (HiveTableHandle) tableHandle;
SchemaTableName tableName = hiveTable.getSchemaTableName();
Expand Down Expand Up @@ -231,7 +219,7 @@ public ConnectorSplitSource getSplits(
table,
hivePartitions,
hiveTable.getCompactEffectivePredicate(),
dynamicFilter,
dynamicFilter::getCurrentPredicate,
typeManager,
createBucketSplitInfo(bucketHandle, bucketFilter),
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.connector.ConstraintApplicationResult;
import io.prestosql.spi.connector.DiscretePredicates;
import io.prestosql.spi.connector.DynamicFilter;
import io.prestosql.spi.connector.ProjectionApplicationResult;
import io.prestosql.spi.connector.RecordCursor;
import io.prestosql.spi.connector.RecordPageSource;
Expand Down Expand Up @@ -1371,7 +1372,7 @@ public void testGetPartitionSplitsBatch()
metadata.beginQuery(session);

ConnectorTableHandle tableHandle = getTableHandle(metadata, tablePartitionFormat);
ConnectorSplitSource splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, tableHandle, UNGROUPED_SCHEDULING);
ConnectorSplitSource splitSource = getSplits(splitManager, transaction, session, tableHandle);

assertEquals(getSplitCount(splitSource), tablePartitionFormatPartitions.size());
}
Expand All @@ -1386,7 +1387,7 @@ public void testGetPartitionSplitsBatchUnpartitioned()
metadata.beginQuery(session);

ConnectorTableHandle tableHandle = getTableHandle(metadata, tableUnpartitioned);
ConnectorSplitSource splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, tableHandle, UNGROUPED_SCHEDULING);
ConnectorSplitSource splitSource = getSplits(splitManager, transaction, session, tableHandle);

assertEquals(getSplitCount(splitSource), 1);
}
Expand All @@ -1396,7 +1397,7 @@ public void testGetPartitionSplitsBatchUnpartitioned()
public void testGetPartitionSplitsBatchInvalidTable()
{
try (Transaction transaction = newTransaction()) {
splitManager.getSplits(transaction.getTransactionHandle(), newSession(), invalidTableHandle, UNGROUPED_SCHEDULING);
getSplits(splitManager, transaction, newSession(), invalidTableHandle);
}
}

Expand Down Expand Up @@ -1434,7 +1435,7 @@ public void testGetPartitionSplitsTableOfflinePartition()
tableHandle = applyFilter(metadata, tableHandle, new Constraint(tupleDomain));

try {
getSplitCount(splitManager.getSplits(transaction.getTransactionHandle(), session, tableHandle, UNGROUPED_SCHEDULING));
getSplitCount(getSplits(splitManager, transaction, session, tableHandle));
fail("Expected PartitionOfflineException");
}
catch (PartitionOfflineException e) {
Expand All @@ -1455,7 +1456,7 @@ public void testGetPartitionSplitsTableNotReadablePartition()
assertNotNull(tableHandle);

try {
getSplitCount(splitManager.getSplits(transaction.getTransactionHandle(), session, tableHandle, UNGROUPED_SCHEDULING));
getSplitCount(getSplits(splitManager, transaction, session, tableHandle));
fail("Expected HiveNotReadableException");
}
catch (HiveNotReadableException e) {
Expand Down Expand Up @@ -1942,7 +1943,7 @@ public void testPartitionSchemaNonCanonical()
HivePartition partition = getOnlyElement(((HiveTableHandle) table).getPartitions().orElseThrow(AssertionError::new));
assertEquals(getPartitionId(partition), "t_boolean=0");

ConnectorSplitSource splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, table, UNGROUPED_SCHEDULING);
ConnectorSplitSource splitSource = getSplits(splitManager, transaction, session, table);
ConnectorSplit split = getOnlyElement(getAllSplits(splitSource));

ImmutableList<ColumnHandle> columnHandles = ImmutableList.of(column);
Expand Down Expand Up @@ -4388,7 +4389,7 @@ private MaterializedResult readTable(
throws Exception
{
tableHandle = applyFilter(transaction.getMetadata(), tableHandle, new Constraint(tupleDomain));
List<ConnectorSplit> splits = getAllSplits(splitManager.getSplits(transaction.getTransactionHandle(), session, tableHandle, UNGROUPED_SCHEDULING));
List<ConnectorSplit> splits = getAllSplits(getSplits(splitManager, transaction, session, tableHandle));
if (expectedSplitCount.isPresent()) {
assertEquals(splits.size(), expectedSplitCount.getAsInt());
}
Expand Down Expand Up @@ -4425,7 +4426,7 @@ protected static int getSplitCount(ConnectorSplitSource splitSource)

private List<ConnectorSplit> getAllSplits(ConnectorTableHandle tableHandle, Transaction transaction, ConnectorSession session)
{
return getAllSplits(splitManager.getSplits(transaction.getTransactionHandle(), session, tableHandle, UNGROUPED_SCHEDULING));
return getAllSplits(getSplits(splitManager, transaction, session, tableHandle));
}

protected static List<ConnectorSplit> getAllSplits(ConnectorSplitSource splitSource)
Expand All @@ -4437,6 +4438,11 @@ protected static List<ConnectorSplit> getAllSplits(ConnectorSplitSource splitSou
return splits.build();
}

protected static ConnectorSplitSource getSplits(ConnectorSplitManager splitManager, Transaction transaction, ConnectorSession session, ConnectorTableHandle tableHandle)
{
return splitManager.getSplits(transaction.getTransactionHandle(), session, tableHandle, UNGROUPED_SCHEDULING, DynamicFilter.EMPTY);
}

protected String getPartitionId(Object partition)
{
return ((HivePartition) partition).getPartitionId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import static io.prestosql.plugin.hive.AbstractTestHive.filterNonHiddenColumnHandles;
import static io.prestosql.plugin.hive.AbstractTestHive.filterNonHiddenColumnMetadata;
import static io.prestosql.plugin.hive.AbstractTestHive.getAllSplits;
import static io.prestosql.plugin.hive.AbstractTestHive.getSplits;
import static io.prestosql.plugin.hive.HiveTestUtils.PAGE_SORTER;
import static io.prestosql.plugin.hive.HiveTestUtils.TYPE_MANAGER;
import static io.prestosql.plugin.hive.HiveTestUtils.getDefaultHiveFileWriterFactories;
Expand All @@ -98,7 +99,6 @@
import static io.prestosql.plugin.hive.HiveTestUtils.getHiveSession;
import static io.prestosql.plugin.hive.HiveTestUtils.getHiveSessionProperties;
import static io.prestosql.plugin.hive.HiveTestUtils.getTypes;
import static io.prestosql.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.testing.MaterializedResult.materializeSourceDataStream;
import static io.prestosql.testing.QueryAssertions.assertEqualsIgnoreOrder;
Expand Down Expand Up @@ -445,7 +445,7 @@ private void createTable(SchemaTableName tableName, HiveStorageFormat storageFor

// verify the data
metadata.beginQuery(session);
ConnectorSplitSource splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, tableHandle, UNGROUPED_SCHEDULING);
ConnectorSplitSource splitSource = getSplits(splitManager, transaction, session, tableHandle);
ConnectorSplit split = getOnlyElement(getAllSplits(splitSource));

try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, tableHandle, columnHandles, TupleDomain.all())) {
Expand Down Expand Up @@ -476,7 +476,7 @@ protected MaterializedResult readTable(SchemaTableName tableName)
List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, table).values());

metadata.beginQuery(session);
ConnectorSplitSource splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, table, UNGROUPED_SCHEDULING);
ConnectorSplitSource splitSource = getSplits(splitManager, transaction, session, table);

List<Type> allTypes = getTypes(columnHandles);
List<Type> dataTypes = getTypes(columnHandles.stream()
Expand Down

0 comments on commit cd8f939

Please sign in to comment.