Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Phoenix to replace partial topn with limit. #8171

Merged
merged 2 commits into from
Jul 16, 2021

Conversation

lhofhansl
Copy link
Member

Adds Phoenix support for #6634 to Phoenix

This requires Phoenix 4.16.1 and Phoenix 5.1.2 on the server side to avoid memory overestimation.
4.16.1 is out, Phoenix 5.1.2 will be released soon, this PR should wait until that release.

No Phoenix client updates are needed.

@cla-bot cla-bot bot added the cla-signed label Jun 1, 2021
@lhofhansl
Copy link
Member Author

Shame on me for not running the Phoenix tests.

@lhofhansl
Copy link
Member Author

lhofhansl commented Jun 2, 2021

Turns out for the tests I do need update the client. (I had only tested with real server instances).

That means that for the phoenix 4.x code I can't make this change, since I cannot change the client to 4.16.1 as it is not guaranteed to be compatible with older 4.x version. It's currently using the 4.14.1 which forward compatible to later 4.x version.

So new proposal. The Phoenix 4.x connector does not get this change. The Phoenix 5 connector get this change once 5.1.2 and I will change the Phoenix client version in the connector to 5.1.2 in a separate PR first. (5.1.2 is backward compatible with 5.1.1 and 5.1.0).

I'll wait until 5.1.2 is out, update the client to that in separate, then rebase this one, and remove the change from the 4.x connector.

@lhofhansl
Copy link
Member Author

@findepi Is there a way to leave the Phoenix dependency at 4.14.1, but for tests use 4.16.1 for the server part? (likely not worth the effort, but thought I'd check)

@findepi
Copy link
Member

findepi commented Jun 7, 2021

Can we run Phoenix in a docker container?

@lhofhansl
Copy link
Member Author

Rebased after the change to Phoenix 5.1.2.

@findepi Re: Docker. Probably. Lemme see if can make some for that. Until then we do this optimization (only that, not a correctness issue) for Phoenix 5 only.

@@ -293,16 +292,7 @@ public boolean supportsTopN(ConnectorSession session, JdbcTableHandle handle, Li
@Override
protected Optional<TopNFunction> topNFunction()
{
return Optional.of((query, sortItems, limit) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it belongs in a different commit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed so that topN is always handled by the connector. If the connector sometimes does it, and it is not apparent from the plan, I cannot do the push down.
That said, happy to break this into a separate commit, if that is better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ttps://issues.apache.org/jira/browse/PHOENIX-6436 mentioned below is resolved (and shipped), we can address the TODO. It's fine to have this as a prep commit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, happy to break this into a separate commit, if that is better.

please do

@lhofhansl
Copy link
Member Author

Enabled the SUPPORTS_TOPN_PUSHDOWN behavior in the test. Let's wait for a test run.

@lhofhansl
Copy link
Member Author

Forgoet that SUPPORTS_TOPN_PUSHDOWN is different.

@lhofhansl
Copy link
Member Author

What I don't understand is why this test is now failing:

[ERROR] Failures: 
2021-06-09T01:13:42.8011054Z [ERROR]   TestPhoenixConnectorTest>BaseJdbcConnectorTest.testTopNPushdown:629 Plan does not match, expected [
2021-06-09T01:13:42.8012257Z 
2021-06-09T01:13:42.8013551Z - anyTree
2021-06-09T01:13:42.8014227Z     - node(TopNNode)
2021-06-09T01:13:42.8014991Z         - node(TableScanNode)
2021-06-09T01:13:42.8015439Z 
2021-06-09T01:13:42.8015869Z ] but found [
2021-06-09T01:13:42.8016215Z 
2021-06-09T01:13:42.8016694Z Output[orderkey]
2021-06-09T01:13:42.8017533Z │   Layout: [orderkey:bigint]
2021-06-09T01:13:42.8018416Z └─ TopN[10 by (orderkey ASC NULLS LAST)]
2021-06-09T01:13:42.8019256Z    │   Layout: [orderkey:bigint]
2021-06-09T01:13:42.8020119Z    └─ LocalExchange[SINGLE] ()
2021-06-09T01:13:42.8020974Z       │   Layout: [orderkey:bigint]
2021-06-09T01:13:42.8021813Z       └─ RemoteExchange[GATHER]
2021-06-09T01:13:42.8023021Z          │   Layout: [orderkey:bigint]
2021-06-09T01:13:42.8024127Z          └─ LimitPartial[10, input pre-sorted by (orderkey)]
2021-06-09T01:13:42.8025114Z             │   Layout: [orderkey:bigint]
2021-06-09T01:13:42.8026559Z             └─ TableScan[phoenix:tpch.orders TPCH.ORDERS sortOrder=[ORDERKEY:bigint:BIGINT ASC NULLS LAST] limit=10 columns=[ORDERKEY:bigint:BIGINT]]
2021-06-09T01:13:42.8027747Z                    Layout: [orderkey:bigint]
2021-06-09T01:13:42.8028437Z                    orderkey := ORDERKEY:bigint:BIGINT

This test passed when I had the if(...isEmpty()) instead of the Optional.map change, which does not make any sense since it's functionally the same.

@raunaqmorarka
Copy link
Member

This test passed when I had the if(...isEmpty()) instead of the Optional.map change, which does not make any sense since it's functionally the same.

The plan generated above is the right one, it looks like we'll need to override that test for phoenix as the behaviour of (topN pushdown + no topN limit guarantee + using sorting property) is a unique one.
Not sure what was wrong with previous version, if it's reproducible you could go back to it and try debugging.

public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table)
{
JdbcTableHandle tableHandle = (JdbcTableHandle) table;
List<LocalProperty<ColumnHandle>> sortingProperties = tableHandle.getSortOrder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could just do this in DefaultJdbcMetadata#getTableProperties
I don't see anything here unique to phoenix
Any other connector which can do TopN pushdown should automatically benefit in same way

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One case where this would not be true in the phoenix (4) connector. The topN is pushed in the plan, but the connection still can decide not honor it. The same is true for some of the other connector (Postgres, MySQL), which also also may decide not to do the topN after the plan has decided to push this down.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, doing it in DefaultJdbcMetadata just requires the assumption that the sortOrder in JdbcTableHandle will be guaranteed when the data is read from it, it's not required that the limit part of topN be guaranteed.
If that assumption also does not hold, then we can't move this to DefaultJdbcMetadata.
I'm not sure why we wouldn't drop the sortOrder from JdbcTableHandle if we weren't going to actually use that in forming the JDBC query.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in DefaultJdbcMetadata it would need to depend on io.trino.plugin.jdbc.JdbcClient#isTopNLimitGuaranteed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi is isTopNLimitGuaranteed just about whether the limit part of topN is guaranteed by connector or does it impact the sorting part as well ?
If it's just about the limit, then I think don't need to worry about that, for this optimisation the main concern is whether the sorting part of TopN is guaranteed or not, we'll apply the limit part ourselves anyway.

Copy link
Member Author

@lhofhansl lhofhansl Jun 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for now let's leave it in the Phoenix connector.

A special case is the Phoenix (4) connector, which overrides the topN function in some cases. That scenario would not work in general, right? As it would not even push the order by down.

Btw. order by pushdown (without limit) would be a useful feature anyway. Also see #8093, and it seems that I am missing something.

Copy link
Member Author

@lhofhansl lhofhansl Jun 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for isTopNGuaranteed... Phoenix will silently not enforce a limit when the limit > 2^31. Does that mean now that topN is guaranteed or not?

Wait... Not true, it will throw an exception. In that case we can change both isLimitGuaranteed and isTopNGuaranteed to return true!

Arrgghh. So here's why isLimitGuaranteed is false: Trino will issue mutliple SQL statements to Phoenix, each of which has the limit. So it's guaranteed only per "partition".

Sorry for the back-and-forth.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering what it would take to get this optimization into DefaultJdbcMetadata for cases where after TopN pushdown into TS, the result size is not strictly guaranteed (isTopNGuaranteed is false) but the ordering is.

@raunaqmorarka that could perhaps be useful in some cases (but currently JdbcMetadata lacks information to disntuigh such cases)

OTOH, it's useful for large data sets only (right?), which limits applicability of this, as JDBC connectors are not superfast anyway.

I think we should first make it work when isTopNGuaranteed is true (simpler case, less changes) and log further improvement as a separate ticket

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With a special case for Phoenix (5)...? (Where isTopGuaranteed is false, but we still want this optimization)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhofhansl good point. If Phoenix 5 is most interesting to you & special case anyway, let's proceed with Phoenix, but let's define what we want to achieve in general, in an issue, so that's easier to follow up

{
assertThat(query("SELECT orderkey FROM orders ORDER BY orderkey LIMIT 10"))
.ordered()
.isNotFullyPushedDown(TopNNode.class, ExchangeNode.class, ExchangeNode.class, LimitNode.class);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This passes the test. Not entirely sure whether it is still a useful test, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can include few more tests from BCT with modification here, like order by DESC, multiple different orders etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this different from testing that TopN pushdown doesn't happen? (which is already tested in superclass if SUPPORTS_TOPN_PUSHDOWN is false). Or is the difference that the plan looks different (the retained nodes look same to me).

i.e. would it be better to add the new cases to superclass itself?

if (!hasBehavior(SUPPORTS_TOPN_PUSHDOWN)) {
assertThat(query("SELECT orderkey FROM orders ORDER BY orderkey LIMIT 10"))
.ordered()
.isNotFullyPushedDown(TopNNode.class);
return;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the original plan:

Output[orderkey]
│   Layout: [orderkey:bigint]
└─ TopN[10 by (orderkey ASC NULLS LAST)]
│   Layout: [orderkey:bigint]
└─ LocalExchange[SINGLE] ()
│   Layout: [orderkey:bigint]
└─ RemoteExchange[GATHER]
│   Layout: [orderkey:bigint]
└─ TopNPartial[10 by (orderkey ASC NULLS LAST)]
│   Layout: [orderkey:bigint]
└─ TableScan[phoenix:tpch.orders TPCH.ORDERS sortOrder=[ORDERKEY:bigint:BIGINT ASC NULLS LAST] limit=10 columns=[ORDERKEY:bigint:BIGINT]]
Layout: [orderkey:bigint]
orderkey := ORDERKEY:bigint:BIGINT

So this indeed verifies that the topNPartial was replaced by a limit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lhofhansl . Missed that the partial got removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work.

Not sure what we are trying to test. The basic plan rewrite is done in #6634, here I just want to make that Phoenix is triggering that query rewrite. Assuming changes to #6634 would add tests, I would add the most generic version to the connectors, just making sure that the replacement actually happens.

As usually, I do not feel strongly :) Happy to add a more explicit test as you suggest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#6634 did add tests with a mock connector, hive and later on tpch connector
Here we want to test that after partial pushdown of topN into table scan takes place, we detect that through the sorting ordering in the JDBC table handle and expose the right ConnectorTableProperties for it. This part is unique to this connector. We can assume that if the right properties are set, then the planner will do the expected query rewrite. The plan assertions are an indirect but convenient way to test that the right sorting properties were set.
PlanMatchPattern would allow us to explicitly verify the preSortedInputs in LimitNode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only way I can get this to work:

        assertThat(query("SELECT orderkey FROM orders ORDER BY orderkey LIMIT 10"))
                .ordered()
                .matches(
                        output(
                                node(TopNNode.class,
                                        anyTree(
                                                node(LimitNode.class,
                                                        node(TableScanNode.class))))));

Not sure that is much better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make that slightly better by building more specific plan node patterns rather than just using Node.class, e.g. see the plan patterns used in assertions in TestPartialTopNWithPresortedInput#testWithSortedTable.
However, to me this is nice to have, not a blocker to merging the PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I had tried the more specific pattern, but in the time I had I couldn't get the plan to match. TestPartialTopNWithPresortedInput is a great example, though. I'll try that.

plugin/trino-phoenix5/pom.xml Outdated Show resolved Hide resolved
{
assertThat(query("SELECT orderkey FROM orders ORDER BY orderkey LIMIT 10"))
.ordered()
.isNotFullyPushedDown(TopNNode.class, ExchangeNode.class, ExchangeNode.class, LimitNode.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can include few more tests from BCT with modification here, like order by DESC, multiple different orders etc.

@lhofhansl
Copy link
Member Author

lhofhansl commented Jun 11, 2021

I think this is good to go (I added more tests and addressed the review comments).

Let's file a general issue for JDBC. I do agree with @findepi that for most JDBC connectors this will nor be a substantive improvement. Phoenix is special here as it actually generates splits and executes these again many nodes in the Phoenix/HBase cluster - so the extra, unnecessary topNPartial can have a significant impact.

(Next: Push down ORDER BY).

@lhofhansl
Copy link
Member Author

Hi... Just making sure this is not waiting on anything from me.

@lhofhansl
Copy link
Member Author

Ping ... :)

@lhofhansl
Copy link
Member Author

lhofhansl commented Jun 28, 2021

Thinking about how I'd be using BasePlanTest.

Copy link
Member

@hashhar hashhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few comments

{
assertThat(query("SELECT orderkey FROM orders ORDER BY orderkey LIMIT 10"))
.ordered()
.isNotFullyPushedDown(TopNNode.class, ExchangeNode.class, ExchangeNode.class, LimitNode.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhofhansl Using isNotFullyPushedDown for asserting plan shape when it has nothing to do with pushdown seems a bit dirty.

Using something similar to what Piotr shows above is more explicit in the intent of the test.

@lhofhansl
Copy link
Member Author

lhofhansl commented Jun 28, 2021

@hashhar I agree in isNofFullyPushedDown, felt the same way.
Trick is figuring out how to create a local query runner for Phoenix, which is required for subclasses of BasePlanTest.

Thinking more about it...
This is what we want to test, right? That (1) topN was not pushed down, and (2) that the plan was modified to include LimitNode instead of a partial topN.

Perhaps I can add more functions with better names to QueryAssertions?

What do you think @hashhar ?

@lhofhansl
Copy link
Member Author

I added SkipException to the test.

As for BasePlanTests... I am still looking into that. Looks like all existing such tests are for Hive...?

@lhofhansl
Copy link
Member Author

Can we merge with the current tests? (as I mentioned above, they do test what we want: That topN is actually not pushed down, but that instead the plan is modified)

@lhofhansl
Copy link
Member Author

Rebased

Copy link
Member

@hashhar hashhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % comments.

Sorry to keep you waiting @lhofhansl .

@lhofhansl
Copy link
Member Author

Hey @hashhar , no problem at all. I'll address your comments tomorrow.

Copy link
Member

@hashhar hashhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Thanks @lhofhansl.

@raunaqmorarka @sopel39 PTAL if you want to.

@raunaqmorarka
Copy link
Member

Minor comment about looking into https://github.com/trinodb/trino/pull/8171/files#r665088423
Otherwise lgtm

tableName);
assertUpdate(createTableSql, 1500L);

String expected = "SELECT custkey FROM customer ORDER BY 1 NULLS FIRST LIMIT 100";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One doubt here, assuming that phoenix will honour whatever nulls sorting order Trino will pass down in the jdbc query to it, we can expect this to work even without the NULLS FIRST part right ?
The hive case is a bit different as there the sort order of NULLS FIRST is fixed when the sorted files are written.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It will just be slower on the Phoenix side if there happens to an index on the sort column(s), The index can only be used with NULL FIRST. So for correctness this is not needed.

@lhofhansl
Copy link
Member Author

This one works with explicit plan matching.

Copy link
Member

@raunaqmorarka raunaqmorarka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the updated tests lgtm

@@ -320,21 +335,92 @@ public void testTopNPushdown()
@Test
public void testReplacePartialTopNWithLimit()
{
List<PlanMatchPattern.Ordering> orderBy = ImmutableList.of(sort("orderkey", ASCENDING, LAST));

assertThat(query("SELECT orderkey FROM orders ORDER BY orderkey LIMIT 10"))
.ordered()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need ordered now

@lhofhansl
Copy link
Member Author

Restructured commits to add QueryAssertions.matches as separate commit.

Copy link
Member

@hashhar hashhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@hashhar hashhar merged commit fe3ba1b into trinodb:master Jul 16, 2021
@hashhar hashhar added this to the 360 milestone Jul 16, 2021
@lhofhansl
Copy link
Member Author

Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

4 participants