Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Phoenix to replace partial topn with limit. #8171

Merged
merged 2 commits into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,21 @@ public QueryAssert matches(MaterializedResult expected)
});
}

public final QueryAssert matches(PlanMatchPattern expectedPlan)
lhofhansl marked this conversation as resolved.
Show resolved Hide resolved
{
transaction(runner.getTransactionManager(), runner.getAccessControl())
.execute(session, session -> {
Plan plan = runner.createPlan(session, query, WarningCollector.NOOP);
assertPlan(
session,
runner.getMetadata(),
noopStatsCalculator(),
plan,
expectedPlan);
});
return this;
}

public QueryAssert containsAll(@Language("SQL") String query)
{
MaterializedResult expected = runner.execute(session, query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import io.trino.spi.type.VarcharType;
import io.trino.sql.planner.Plan;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.plan.LimitNode;
import io.trino.sql.planner.planprinter.IoPlanPrinter.ColumnConstraint;
import io.trino.sql.planner.planprinter.IoPlanPrinter.EstimatedStatsAndCost;
import io.trino.sql.planner.planprinter.IoPlanPrinter.FormattedDomain;
Expand Down Expand Up @@ -7355,25 +7354,6 @@ public void testUseSortedProperties()
assertUpdate("DROP TABLE " + tableName);
}

private Consumer<Plan> assertPartialLimitWithPreSortedInputsCount(Session session, int expectedCount)
{
return plan -> {
int actualCount = searchFrom(plan.getRoot())
.where(node -> node instanceof LimitNode && ((LimitNode) node).isPartial() && ((LimitNode) node).requiresPreSortedInputs())
.findAll()
.size();
if (actualCount != expectedCount) {
Metadata metadata = getDistributedQueryRunner().getCoordinator().getMetadata();
String formattedPlan = textLogicalPlan(plan.getRoot(), plan.getTypes(), metadata, StatsAndCosts.empty(), session, 0, false);
throw new AssertionError(format(
"Expected [\n%s\n] partial limit but found [\n%s\n] partial limit. Actual plan is [\n\n%s\n]",
expectedCount,
actualCount,
formattedPlan));
}
};
}

@Test
public void testSelectWithNoColumns()
{
Expand Down
13 changes: 13 additions & 0 deletions plugin/trino-phoenix5/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,19 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-main</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-parser</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.phoenix5;

import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.trino.plugin.jdbc.DefaultJdbcMetadata;
import io.trino.plugin.jdbc.JdbcColumnHandle;
Expand All @@ -32,8 +33,12 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.ConnectorTableSchema;
import io.trino.spi.connector.LocalProperty;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SortingProperty;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.statistics.ComputedStatistics;

Expand Down Expand Up @@ -86,6 +91,22 @@ public JdbcTableHandle getTableHandle(ConnectorSession session, SchemaTableName
.orElse(null);
}

@Override
public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table)
{
JdbcTableHandle tableHandle = (JdbcTableHandle) table;
List<LocalProperty<ColumnHandle>> sortingProperties = tableHandle.getSortOrder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could just do this in DefaultJdbcMetadata#getTableProperties
I don't see anything here unique to phoenix
Any other connector which can do TopN pushdown should automatically benefit in same way

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One case where this would not be true in the phoenix (4) connector. The topN is pushed in the plan, but the connection still can decide not honor it. The same is true for some of the other connector (Postgres, MySQL), which also also may decide not to do the topN after the plan has decided to push this down.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, doing it in DefaultJdbcMetadata just requires the assumption that the sortOrder in JdbcTableHandle will be guaranteed when the data is read from it, it's not required that the limit part of topN be guaranteed.
If that assumption also does not hold, then we can't move this to DefaultJdbcMetadata.
I'm not sure why we wouldn't drop the sortOrder from JdbcTableHandle if we weren't going to actually use that in forming the JDBC query.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in DefaultJdbcMetadata it would need to depend on io.trino.plugin.jdbc.JdbcClient#isTopNLimitGuaranteed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi is isTopNLimitGuaranteed just about whether the limit part of topN is guaranteed by connector or does it impact the sorting part as well ?
If it's just about the limit, then I think don't need to worry about that, for this optimisation the main concern is whether the sorting part of TopN is guaranteed or not, we'll apply the limit part ourselves anyway.

Copy link
Member Author

@lhofhansl lhofhansl Jun 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for now let's leave it in the Phoenix connector.

A special case is the Phoenix (4) connector, which overrides the topN function in some cases. That scenario would not work in general, right? As it would not even push the order by down.

Btw. order by pushdown (without limit) would be a useful feature anyway. Also see #8093, and it seems that I am missing something.

Copy link
Member Author

@lhofhansl lhofhansl Jun 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for isTopNGuaranteed... Phoenix will silently not enforce a limit when the limit > 2^31. Does that mean now that topN is guaranteed or not?

Wait... Not true, it will throw an exception. In that case we can change both isLimitGuaranteed and isTopNGuaranteed to return true!

Arrgghh. So here's why isLimitGuaranteed is false: Trino will issue mutliple SQL statements to Phoenix, each of which has the limit. So it's guaranteed only per "partition".

Sorry for the back-and-forth.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering what it would take to get this optimization into DefaultJdbcMetadata for cases where after TopN pushdown into TS, the result size is not strictly guaranteed (isTopNGuaranteed is false) but the ordering is.

@raunaqmorarka that could perhaps be useful in some cases (but currently JdbcMetadata lacks information to disntuigh such cases)

OTOH, it's useful for large data sets only (right?), which limits applicability of this, as JDBC connectors are not superfast anyway.

I think we should first make it work when isTopNGuaranteed is true (simpler case, less changes) and log further improvement as a separate ticket

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With a special case for Phoenix (5)...? (Where isTopGuaranteed is false, but we still want this optimization)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhofhansl good point. If Phoenix 5 is most interesting to you & special case anyway, let's proceed with Phoenix, but let's define what we want to achieve in general, in an issue, so that's easier to follow up

.map(properties -> properties
.stream()
.map(item -> (LocalProperty<ColumnHandle>) new SortingProperty<ColumnHandle>(
hashhar marked this conversation as resolved.
Show resolved Hide resolved
item.getColumn(),
item.getSortOrder()))
.collect(toImmutableList()))
.orElse(ImmutableList.of());

return new ConnectorTableProperties(TupleDomain.all(), Optional.empty(), Optional.empty(), Optional.empty(), sortingProperties);
hashhar marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public ConnectorTableSchema getTableSchema(ConnectorSession session, ConnectorTableHandle table)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
*/
package io.trino.plugin.phoenix5;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.plugin.jdbc.BaseJdbcConnectorTest;
import io.trino.plugin.jdbc.UnsupportedTypeHandling;
import io.trino.sql.planner.assertions.PlanMatchPattern;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;
import io.trino.testing.sql.SqlExecutor;
Expand All @@ -31,9 +33,26 @@
import java.sql.Statement;
import java.util.List;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.jdbc.TypeHandlingJdbcSessionProperties.UNSUPPORTED_TYPE_HANDLING;
import static io.trino.plugin.jdbc.UnsupportedTypeHandling.CONVERT_TO_VARCHAR;
import static io.trino.plugin.phoenix5.PhoenixQueryRunner.createPhoenixQueryRunner;
import static io.trino.sql.planner.assertions.PlanMatchPattern.exchange;
import static io.trino.sql.planner.assertions.PlanMatchPattern.limit;
import static io.trino.sql.planner.assertions.PlanMatchPattern.output;
import static io.trino.sql.planner.assertions.PlanMatchPattern.project;
import static io.trino.sql.planner.assertions.PlanMatchPattern.sort;
import static io.trino.sql.planner.assertions.PlanMatchPattern.tableScan;
import static io.trino.sql.planner.assertions.PlanMatchPattern.topN;
import static io.trino.sql.planner.plan.ExchangeNode.Scope.LOCAL;
import static io.trino.sql.planner.plan.ExchangeNode.Scope.REMOTE;
import static io.trino.sql.planner.plan.ExchangeNode.Type.GATHER;
import static io.trino.sql.planner.plan.TopNNode.Step.FINAL;
import static io.trino.sql.tree.SortItem.NullOrdering.FIRST;
import static io.trino.sql.tree.SortItem.NullOrdering.LAST;
import static io.trino.sql.tree.SortItem.Ordering.ASCENDING;
import static io.trino.sql.tree.SortItem.Ordering.DESCENDING;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -307,6 +326,118 @@ public void testMissingColumnsOnInsert()
assertQuery("SELECT * FROM test_col_insert", "SELECT 1, 'val1', 'val2'");
}

@Override
public void testTopNPushdown()
hashhar marked this conversation as resolved.
Show resolved Hide resolved
{
throw new SkipException("Phoenix does not support topN push down, but instead replaces partial topN with partial Limit.");
}

@Test
public void testReplacePartialTopNWithLimit()
{
List<PlanMatchPattern.Ordering> orderBy = ImmutableList.of(sort("orderkey", ASCENDING, LAST));

assertThat(query("SELECT orderkey FROM orders ORDER BY orderkey LIMIT 10"))
.matches(output(
topN(10, orderBy, FINAL,
exchange(LOCAL, GATHER, ImmutableList.of(),
exchange(REMOTE, GATHER, ImmutableList.of(),
limit(
10,
ImmutableList.of(),
true,
orderBy.stream()
.map(PlanMatchPattern.Ordering::getField)
.collect(toImmutableList()),
tableScan("orders", ImmutableMap.of("orderkey", "orderkey"))))))));

orderBy = ImmutableList.of(sort("orderkey", ASCENDING, FIRST));

assertThat(query("SELECT orderkey FROM orders ORDER BY orderkey NULLS FIRST LIMIT 10"))
.matches(output(
topN(10, orderBy, FINAL,
exchange(LOCAL, GATHER, ImmutableList.of(),
exchange(REMOTE, GATHER, ImmutableList.of(),
limit(
10,
ImmutableList.of(),
true,
orderBy.stream()
.map(PlanMatchPattern.Ordering::getField)
.collect(toImmutableList()),
tableScan("orders", ImmutableMap.of("orderkey", "orderkey"))))))));

orderBy = ImmutableList.of(sort("orderkey", DESCENDING, LAST));

assertThat(query("SELECT orderkey FROM orders ORDER BY orderkey DESC LIMIT 10"))
.matches(output(
topN(10, orderBy, FINAL,
exchange(LOCAL, GATHER, ImmutableList.of(),
exchange(REMOTE, GATHER, ImmutableList.of(),
limit(
10,
ImmutableList.of(),
true,
orderBy.stream()
.map(PlanMatchPattern.Ordering::getField)
.collect(toImmutableList()),
tableScan("orders", ImmutableMap.of("orderkey", "orderkey"))))))));

orderBy = ImmutableList.of(sort("orderkey", ASCENDING, LAST), sort("custkey", ASCENDING, LAST));

assertThat(query("SELECT orderkey FROM orders ORDER BY orderkey, custkey LIMIT 10"))
.matches(output(
project(
topN(10, orderBy, FINAL,
exchange(LOCAL, GATHER, ImmutableList.of(),
exchange(REMOTE, GATHER, ImmutableList.of(),
limit(
10,
ImmutableList.of(),
true,
orderBy.stream()
.map(PlanMatchPattern.Ordering::getField)
.collect(toImmutableList()),
tableScan("orders", ImmutableMap.of("orderkey", "orderkey", "custkey", "custkey")))))))));

orderBy = ImmutableList.of(sort("orderkey", ASCENDING, LAST), sort("custkey", DESCENDING, LAST));

assertThat(query("SELECT orderkey FROM orders ORDER BY orderkey, custkey DESC LIMIT 10"))
.matches(output(
project(
topN(10, orderBy, FINAL,
exchange(LOCAL, GATHER, ImmutableList.of(),
exchange(REMOTE, GATHER, ImmutableList.of(),
limit(
10,
ImmutableList.of(),
true,
orderBy.stream()
.map(PlanMatchPattern.Ordering::getField)
.collect(toImmutableList()),
tableScan("orders", ImmutableMap.of("orderkey", "orderkey", "custkey", "custkey")))))))));
}

/*
* Make sure that partial topN is replaced with a partial limit when the input is presorted.
*/
@Test
public void testUseSortedPropertiesForPartialTopNElimination()
{
String tableName = "test_propagate_table_scan_sorting_properties";
// salting ensures multiple splits
String createTableSql = format("" +
"CREATE TABLE %s WITH (salt_buckets = 5) AS " +
"SELECT * FROM tpch.tiny.customer",
tableName);
assertUpdate(createTableSql, 1500L);
hashhar marked this conversation as resolved.
Show resolved Hide resolved

String expected = "SELECT custkey FROM customer ORDER BY 1 NULLS FIRST LIMIT 100";
hashhar marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One doubt here, assuming that phoenix will honour whatever nulls sorting order Trino will pass down in the jdbc query to it, we can expect this to work even without the NULLS FIRST part right ?
The hive case is a bit different as there the sort order of NULLS FIRST is fixed when the sorted files are written.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It will just be slower on the Phoenix side if there happens to an index on the sort column(s), The index can only be used with NULL FIRST. So for correctness this is not needed.

String actual = format("SELECT custkey FROM %s ORDER BY 1 NULLS FIRST LIMIT 100", tableName);
assertQuery(getSession(), actual, expected, assertPartialLimitWithPreSortedInputsCount(getSession(), 1));
assertUpdate("DROP TABLE " + tableName);
}

@Override
protected TestTable createTableWithDoubleAndRealColumns(String name, List<String> rows)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,24 @@
package io.trino.testing;

import io.trino.Session;
import io.trino.cost.StatsAndCosts;
import io.trino.metadata.Metadata;
import io.trino.sql.analyzer.FeaturesConfig.JoinDistributionType;
import io.trino.sql.planner.Plan;
import io.trino.sql.planner.plan.LimitNode;
import io.trino.testing.sql.TestTable;
import org.intellij.lang.annotations.Language;
import org.testng.SkipException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.function.Consumer;
import java.util.stream.Stream;

import static io.trino.SystemSessionProperties.IGNORE_STATS_CALCULATOR_FAILURES;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.sql.planner.optimizations.PlanNodeSearcher.searchFrom;
import static io.trino.sql.planner.planprinter.PlanPrinter.textLogicalPlan;
import static io.trino.testing.DataProviders.toDataProvider;
import static io.trino.testing.QueryAssertions.assertContains;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ARRAY;
Expand All @@ -42,6 +49,7 @@
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_TOPN_PUSHDOWN;
import static io.trino.testing.assertions.Assert.assertEquals;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
import static java.lang.String.join;
import static java.util.Collections.nCopies;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -776,4 +784,23 @@ public void testRowLevelDelete()
assertQuery("SELECT count(*) FROM " + table.getName(), "VALUES 4");
}
}

protected Consumer<Plan> assertPartialLimitWithPreSortedInputsCount(Session session, int expectedCount)
hashhar marked this conversation as resolved.
Show resolved Hide resolved
{
return plan -> {
int actualCount = searchFrom(plan.getRoot())
.where(node -> node instanceof LimitNode && ((LimitNode) node).isPartial() && ((LimitNode) node).requiresPreSortedInputs())
.findAll()
.size();
if (actualCount != expectedCount) {
Metadata metadata = getDistributedQueryRunner().getCoordinator().getMetadata();
String formattedPlan = textLogicalPlan(plan.getRoot(), plan.getTypes(), metadata, StatsAndCosts.empty(), session, 0, false);
throw new AssertionError(format(
"Expected [\n%s\n] partial limit but found [\n%s\n] partial limit. Actual plan is [\n\n%s\n]",
expectedCount,
actualCount,
formattedPlan));
}
};
}
}