Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pushing dereferences within lambdas into table scan #23148

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

zhaner08
Copy link
Contributor

Description

original PR: #21957

This is to extend the enhancement discussed here #3925, and depends/extends on the original PR #4270 that is currently rebasing by @Desmeister

Since the issue and discussion had been idled for years and this kind of optimization could be critical to anyone having highly nested schema and using Unnest, I would like to use this PR to formally restart the discussion on how the community want to eventually support this and if this is on the right direction (I have a working version locally, not this one, that speeds up the query while reducing actual data processed)

From my understanding of the previous discussions, this should be done through below steps:

  • Convert non replicate symbol dereferencing involved with Unnest into lambda functions with subscript expressions for each of the Unnests
  • Push the lambda function down
    • Type 1: lambda function is already above TableScan, in this case, this rule will help to pushdown the dereferencing further, while for any connectors that dont support dereferencing, the rule will preserve the Lambda expression to remove columns
    • Type 2: Lambda functions are not at the ~Leaf, this will be handled by PushDownDereferenceThroughUnnest and many other expression specific rules. PushDownDereferenceThroughUnnest is not handling any unnest symbols currently, but only replicated symbols. In order to support unnest symbols, I believe at least a new expression has to be created, or subscript expression has to be extended otherwise I dont see an easy way to represent the dereferences so it can be further pushed down through other unnests in anyway. I need more guidance on how this could be done or possible with what we have now, that is why this PR in particular is not handling any complex cases like nested Unnest and only push lambdas down through project and filters in a limited way.
  • Pushing dereferencing into TableScan
    • This is kind of implemented by this PR. I extended the existing visitFunctionCall in ConnectorExpressionTranslator to create a new connector expression (can be merged with existing FieldDereference expression if possible), then passing those into existing applyProjection method to let connectors decide how to handle those. For this PR, only HiveMetadata has implementation to handle those, other connectors will simply ignoring them. The applyProjection will create new projections and HiveColumnHandle for Hive with extended HiveColumnProjectionInfo.
  • Pushing dereferences into file readers
    • This is done by this PR. We need a representation of dereferencing into Array (or potentially map). Currently everything is represented by simply Arrays of String (names) or Arrays of Integers (indexes) and by just using this, we cannot pass down any dereferencing that are more complex. I cherry-picked the Subfields classes from Presto since it's already established and have similar methods already implemented for Parquet reader. Though depends on how the community want to represent this, we can swap this with another representation as long as it can supporting anything more complex than simple Structs.
  • Readers skip column readings
    • This is done by the PR, for Parquet, file schema will be pruned to only contain needed columns and other columns will just be an empty block to be returned therefore reduce the actual data scanned while also reduced any data going through local and remote exchange.

This PR is written in a way to reduce the impacts to the existing features while I can fully validate the performance impact while gathering feedbacks and directions from the community. Therefore implementations are normally wrapped in an if instead of fully refactoring the existing method

I believe if this is the right direction, changes can be contributed through below phases

  1. Replacing the existing Array<dereferences> within HiveColumnProjectionInfo to Subfields or anything similar to that and make sure all methods that used to depend on Array<dereferences> now depend on the new representation
  2. Have the newly added optimization rule fully integrate with the existing applyProjection method (or not? It can simply be a non-iterative visitor at the very end like now.)
  3. Instead of just just pruning schemas, we also prune the output symbols/types of the tableScan (currently it keeps the original symbols but just returning empty blocks to minimize changes)
  4. Remove the Lambda expression if the translations are supported by the connector. The current overhead should be small though, but the risk of wrongly removing the lambda expression while connectors are not correctly pruning nested columns are large so this PR is currently still keeping the Lambda expression after the push down.
  5. Supports dereference pushdown of unnest symbols through ~all kind of expressions. I have the two rules added to support pushing down through project and filter, probably we can live with those in short term, but eventually have to address things like how to push down through unnest or other complex expressions

The change has been fully validated except rebasing to the latest Trino release that could have a lot of conflicts due to AST/IR refactoring

trino:default> ***BEFORE*** with tmp as (
            -> SELECT
            ->     a1.data2 as d2,
            ->     a1.array11 as nestedarray
            -> FROM 
            ->     default.test_unnest_unnest_prunning_parquet
            ->     CROSS JOIN UNNEST(default.test_unnest_unnest_prunning_parquet.array1) t (a1)
            ->     where id>0
            -> )
            ->  SELECT
            ->      d2,
            ->     array2.struct1.data4,
            ->     array2.struct1.data5
            -> FROM 
            ->     tmp
            ->     CROSS JOIN UNNEST(tmp.nestedarray) t (array2);
  d2  | data4 | data5 
------+-------+-------
 -10- |   100 | -100- 
 -10- |   101 | -101- 
 -11- |   110 | -110- 
 -11- |   111 | -111- 
 -20- |   200 | -200- 
 -20- |   201 | -201- 
 -21- |   210 | -210- 
 -21- |   211 | -211- 
(8 rows)

Query 20240518_032355_00008_qhz93, FINISHED, 1 node
http://localhost:8080/ui/query.html?20240518_032355_00008_qhz93
Splits: 1 total, 1 done (100.00%)
CPU Time: 0.0s total,    80 rows/s, 16.5KB/s, 10% active
Per Node: 0.0 parallelism,     1 rows/s,   413B/s
Parallelism: 0.0
Peak Memory: 542B
1.02 [2 rows, 423B] [1 rows/s, 413B/s]


trino:default> ***After*** with tmp as (
            -> SELECT
            ->     a1.data2 as d2,
            ->     a1.array11 as nestedarray
            -> FROM 
            ->     default.test_unnest_unnest_prunning_parquet
            ->     CROSS JOIN UNNEST(default.test_unnest_unnest_prunning_parquet.array1) t (a1)
            ->     where id>0
            -> )
            ->  SELECT
            ->      d2,
            ->     array2.struct1.data4,
            ->     array2.struct1.data5
            -> FROM 
            ->     tmp
            ->     CROSS JOIN UNNEST(tmp.nestedarray) t (array2);
  d2  | data4 | data5 
------+-------+-------
 -10- |   100 | -100- 
 -10- |   101 | -101- 
 -11- |   110 | -110- 
 -11- |   111 | -111- 
 -20- |   200 | -200- 
 -20- |   201 | -201- 
 -21- |   210 | -210- 
 -21- |   211 | -211- 
(8 rows)

Query 20240518_032332_00007_qhz93, FINISHED, 1 node
http://localhost:8080/ui/query.html?20240518_032332_00007_qhz93
Splits: 1 total, 1 done (100.00%)
CPU Time: 0.0s total,    80 rows/s,   14KB/s, 9% active
Per Node: 0.0 parallelism,     1 rows/s,   344B/s
Parallelism: 0.0
Peak Memory: 542B
1.04 [2 rows, 359B] [1 rows/s, 344B/s]

Byte scanned decreased from 423B to 359B for the sample query, we've seen large performance improvement in production queries

Additional context and related issues

I would really appreciate any kind of comments or feedbacks as without clear directions, I can't further extend this without risking of throwing everything away later. Any of the component should be easily plug in if we have a clear idea of how we want to do it otherwise.

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(X) Release notes are required, with the following suggested text:

# Section
* Enhance query performance on dereference on unnest symbols

@cla-bot cla-bot bot added the cla-signed label Aug 28, 2024
@github-actions github-actions bot added delta-lake Delta Lake connector hive Hive connector labels Aug 28, 2024
@zhaner08 zhaner08 self-assigned this Aug 29, 2024
Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Sep 19, 2024
@github-actions github-actions bot removed the stale label Oct 2, 2024
@@ -122,6 +122,8 @@ public class FeaturesConfig

private boolean faultTolerantExecutionExchangeEncryptionEnabled = true;

private boolean pushFieldDereferenceLambdaIntoScanEnabled;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be with other optimizer properties in OptimizerConfig

TBH: I think it should be connector property. We already have io.trino.plugin.iceberg.IcebergConfig#projectionPushdownEnabled so it might not be needed at all

return pushFieldDereferenceLambdaIntoScanEnabled;
}

@Config("experimental.enable-push-field-dereference-lambda-into-scan.enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it should really be experimental.

TBH I think it should be connector property

@@ -694,6 +706,37 @@ else if (functionName.equals(builtinFunctionName(MODULUS))) {
new io.trino.spi.expression.Call(node.type(), MODULUS_FUNCTION_NAME, ImmutableList.of(left, right))));
}

// Very narrow case that only tries to extract a particular type of lambda expression
// TODO: Expand the scope
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create Trino issue which would track the TODO and reference it here.

// Very narrow case that only tries to extract a particular type of lambda expression
// TODO: Expand the scope
if (translateArrayFieldReference && functionName.equals(builtinFunctionName(ARRAY_TRANSFORM_NAME))) {
List<Expression> allNodeArgument = node.arguments();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just arguments

List<Expression> allNodeArgument = node.arguments();
// at this point, SubscriptExpression should already been pushed down by PushProjectionIntoTableScan,
// if not, it means its referenced by other expressions. we only care about SymbolReference at this moment
List<Expression> inputExpressions = allNodeArgument.stream().filter(Reference.class::isInstance)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check arguments.getFirst() explicitly, see SpecializeTransformWithJsonParse

if (lambdaExpressions.get(0).body() instanceof Row row) {
List<Expression> rowFields = row.items();
List<ConnectorExpression> translatedRowFields =
rowFields.stream().map(e -> process(e)).filter(Optional::isPresent).map(Optional::get).collect(toImmutableList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

List<ConnectorExpression> translatedRowFields = rowFields.stream()
  .map(this::process)
  .filter(Optional::isPresent)
  .collect(toImmutableList());


// This class is used to represent expression with dereferences into Array
// Target is the actual reference to the array. elementFieldDereferences are the field dereferences
public class ArrayFieldDereference
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit confusing for me.

Let's assume that the table schema is:

field1 <- bigint
field2 <- array(row(x:bigint y:bigint, z:bigint))

What does
ArrayFieldDereference(field2, [x]) produce?
Is it array(row(x)) or array(x)?

what does
ArrayFieldDereference(field2, [x, y]) produce?
Is it array(row(x, y))?

public ArrayFieldDereference(Type type, ConnectorExpression target, List<ConnectorExpression> elementFieldDereference)
{
super(type);
checkArgument(type instanceof ArrayType, "wrong input type for ArrayFieldDereference");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a check:

checkArgument(!elementFieldDereference.isEmpty(), ...)
checkArgument(elementFieldDereference.size() == 1 || ((ArrayType) type).getElementType() instanceOf Row, ...)

@@ -999,6 +1005,15 @@ public PlanOptimizers(
new PushPartialAggregationThroughExchange(plannerContext),
new PruneJoinColumns(),
new PruneJoinChildrenColumns())));
// This rule does not touch query plans, but only add subfields if necessary. Trigger at the near end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there queries that actually fail to get pushdown if we don't have this rule here? Maybe it's not needed?

@@ -132,4 +143,113 @@ private static boolean prefixExists(Expression expression, Set<Expression> expre
verify(current instanceof Reference);
return false;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: public methods above private methods

*/
public static Optional<Reference> getSubscriptLambdaInputExpression(Expression expression)
{
if (expression instanceof Call functionCall) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if you revert it:

if (!(expression instanceOf Call functionCall) ||
  !functionCall.equals(builtinFunctionName(ARRAY_TRANSFORM_NAME))) {
  return Optional.empty();
}

// rest of code

that the happy path does not have extra indents.

CatalogSchemaFunctionName functionName = functionCall.function().name();

if (functionName.equals(builtinFunctionName(ARRAY_TRANSFORM_NAME))) {
List<Expression> allNodeArgument = functionCall.arguments();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar pattern happens in ConnectorExpressionTranslator. Maybe we can extract something like

record ArrayDereferenceExpression(Expression argument, Lambda lambda)

then we could have method Optional<ArrayDereferenceExpression> getArrayDereferenceExpression(Expression expression)
and use it in rules and in ConnectorExpressionTranslator?

/**
* Extract the sub-expressions of type {@link Reference} from the {@param expression}
*/
public static List<Reference> getReferences(Expression expression)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this you should use io.trino.sql.planner.SymbolsExtractor#extractUnique(io.trino.sql.ir.Expression) instead. This is because lambdas nested within an expression can reference lamba arguments, see: io.trino.sql.planner.SymbolsExtractor.SymbolBuilderVisitor

/**
* Extract the sub-expressions of type {@link Reference} and subscript lambda {@link FunctionCall} from the {@param expression}
*/
private static Map<Expression, Reference> getSymbolReferencesAndSubscriptLambdas(Expression expression)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it does what it means to. See io.trino.sql.planner.SymbolsExtractor.SymbolBuilderVisitor. Symbols within lambdas need to be handled carefully

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See also #23649

/**
* Transforms:
* <pre>
* Project(c := f(a, x -> x[1]), d := g(b))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should limit it to RowType dereference, e.g:

            if (!(node.base().type() instanceof RowType)) {
                return Optional.empty();
            }

because support for array dereference is generally harder

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

// This class is used to represent expression with dereferences into Array
Copy link
Member

@sopel39 sopel39 Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could generally be simpler, e.g: we don't really need private final List<ConnectorExpression> elementFieldDereferences;. We could just use combination of
io.trino.spi.expression.Call and
singular io.trino.spi.expression.FieldDereference.

Consider example:

foo:=transform(x, row(x.fieldA, x.fieldB))

it can generally be transformed into

foo:=$arrayOfRows(arrayFieldA, arrayFieldB) -> array<row<fieldA, fieldB>>
|
arrayFieldA:=transform(x, x.fieldA) -> array<fieldA>
arrayFieldB:=transform(x, x.fieldB) -> array<fieldB>

then each transform(x, x.fieldA) and transform(x, x.fieldB) could be pushed separately. Engine then would optimized method $arrayOfRows which would call io.trino.spi.block.RowBlock#fromFieldBlocks.

This way connectors would be much less complex and we could keep using

private final List<Integer> path;

from IcebergColumnHandle (and similar from HiveColumnProjectionInfo). Essentially, de-reference path expressed as index array would be sufficient. I would guess we could also avoid SubField and SubfieldTokenizer.

TBH. I don't think we need to support transform(x, row(x.fieldA, x.fieldB)) case initially. We could add support later on

Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@mosabua
Copy link
Member

mosabua commented Nov 5, 2024

Fyi @martint and @electrum - added performance label.

@github-actions github-actions bot removed the stale label Nov 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector hive Hive connector performance
Development

Successfully merging this pull request may close these issues.

3 participants