Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPI and engine changes for aggregation pushdown #3697

Merged
merged 1 commit into from
Jun 11, 2020

Conversation

Parth-Brahmbhatt
Copy link
Member

…connectors.

The commit also includes the top level optimizer rule and feature config to control
the experimental behavior.

@cla-bot cla-bot bot added the cla-signed label May 11, 2020
@martint martint self-requested a review May 11, 2020 17:57
@Parth-Brahmbhatt
Copy link
Member Author

@Parth-Brahmbhatt does https://github.com/prestosql/presto/blob/master/presto-base-jdbc/src/main/java/io/prestosql/plugin/jdbc/QueryBuilder.java#L104-L122 need to change too ?

These are just the SPI changes and the optimizer rule that unblocks any connectors that can support agg pushdown. I will follow up with another PR where we would do this for jdbc connector so we can leverage it in druid connector.

Comment on lines 569 to 573
booleanProperty(
ENABLE_AGG_PUSHDOWN,
"Enable aggregate pushdown",
featuresConfig.isEnableAggPushdown(),
false));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a global toggle for this. Connectors can choose whether to support aggregation pushdown by not implementing the new API (or doing it selectively, possibly based on a connector-specific setting)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

* @param groupBy
* @return
*/
default Optional<AggregatePushdownResult<ConnectorTableHandle>> applyAggregation(ConnectorSession session, ConnectorTableHandle handle, List<AggregateFunction> aggregates,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is groupBy a list of lists? For grouping sets?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. let me know if you think we should remove that right now and add it when we do review for grouping sets rule.

private final List<ConnectorExpression> inputs;
private final Type outputType;
private final Optional<List<ConnectorExpression>> sortBy;
private final Optional<Map<ConnectorExpression, SortOrder>> sortOrder;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ordering of the sorting clauses matters, so this should be a list of tuples containing an expression and a sort order.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was using sortBy to preserve the ordering and sortOrder to lookup the actual order. I like just having a list of tuples so I have updated the code with a single list of entries.

// Something to check with Martin what he thinks is the best course here, for now just exposing the fields that the connector can use to derive if
// this is an agg function it can handle and information they need to push it down.
private final String projectionName; // may be call it assignmentId? the purpose for this is to uniquely identify an agg but we are just using the projected expression's output symbol as the name here
private final String aggregateFunctionName;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to functionName

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}

private ImmutableList<AggregateFunction> toAggFunction(Map<Symbol, AggregationNode.Aggregation> aggregations)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use interface type for collection: List<AggregateFunction>. Also, don't abbreviate names: toAggregateFunction would be better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was my bad, should have noticed the type difference. name changed.

AggregationNode.Aggregation aggregation = entry.getValue();
Signature signature = aggregation.getResolvedFunction().getSignature();
Type returnType = typeManager.getType(signature.getReturnType());
String aggFunctionName = signature.getName();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String aggFunctionName = signature.getName();
String name = signature.getName();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

private final boolean isDistinct;
// TODO It is unclear if filtering and mask from the aggregate needs to be forwarded to connectors so skipping them for now.

public AggregateFunction(String projectionName, String name, List<ConnectorExpression> inputs, Type outputType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor is unnecessary at this point. Let's rely one the one below until it becomes clear that we need it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

this(projectionName, name, inputs, outputType, Optional.empty(), Optional.empty(), false);
}

public AggregateFunction(String projectionName, String aggregateFunctionName, List<ConnectorExpression> inputs, Type outputType, Optional<List<ConnectorExpression>> sortBy, Optional<Map<ConnectorExpression, SortOrder>> sortOrder, boolean isDistinct)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move outputType before inputs. That way, if we ever have to add other constructors, it's easier to support varargs for the inputs if needed and keep things consistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

public PushAggregatesIntoTableScanRuleSet(Metadata metadata)
{
this.metadata = metadata;
this.typeManager = new InternalTypeManager(metadata);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InternalTypeManager is not meant to be used by anything other that the metadata APIs. use Metadata.getType() in the calls below, instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* Attempt to push down the aggregates into the table.
* <p>
* Connectors can indicate whether they don't support aggregate pushdown or that the action had no effect
* by returning {@link Optional#empty()}. Connectors should expect this method to be called multiple times
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* by returning {@link Optional#empty()}. Connectors should expect this method to be called multiple times
* by returning {@link Optional#empty()}. Connectors should expect this method may be called multiple times

-- connector cannot assume a method will be called multiple times (it may or may not)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* by returning {@link Optional#empty()}. Connectors should expect this method to be called multiple times
* during the optimization of a given query.
* <p>
* <b>Note</b>: it's critical for connectors to return Optional.empty() if calling this method has no effect for that
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional.empty() -> {@code or {@link it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* to loop indefinitely.
* </p>
* <p>
* If the method returns a result, the list of assignment in the result will replace the existing assignments given
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* If the method returns a result, the list of assignment in the result will replace the existing assignments given
* If the method returns a result, the list of assignments in the result will replace the existing assignments given

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* to loop indefinitely.
* </p>
* <p>
* If the method returns a result, the list of assignment in the result will replace the existing assignments given
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the "given" part. It seems it's OK to return partial assignemnts (they will just be ignored).

(I think it's ok to drop this part of the sentence)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropped and modified it now that we don't want to really support this behavior.

* - aggregate
* v0 = agg_fn1(a)
* v1 = agg_fn2(b, 2)
* v2 = group_by c
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does v2 mean here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those are just variable names, I could make it explicit and I tried to do that by adding {@link Variable} at some places in javadoc.

Comment on lines 32 to 33
// Something to check with Martin what he thinks is the best course here, for now just exposing the fields that the connector can use to derive if
// this is an agg function it can handle and information they need to push it down.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure to reword (or remove) before the PR is merged.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

this(projectionName, name, inputs, outputType, Optional.empty(), Optional.empty(), false);
}

public AggregateFunction(String projectionName, String aggregateFunctionName, List<ConnectorExpression> inputs, Type outputType, Optional<List<ConnectorExpression>> sortBy, Optional<Map<ConnectorExpression, SortOrder>> sortOrder, boolean isDistinct)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put each arg on separate line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 52 to 55
this.inputs = requireNonNull(inputs, "inputs is null");
this.outputType = requireNonNull(outputType, "outputType is null");
this.sortBy = requireNonNull(sortBy, "sortBy is null");
this.sortOrder = requireNonNull(sortOrder, "sortOrder is null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make defensive copies fo Lists and Maps (eg unmodifiableList(new ArrayList<>(...)))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


ConnectorSession connectorSession = session.toConnectorSession(catalogName);
return metadata.applyAggregation(connectorSession, table.getConnectorHandle(), aggregates, assignments, groupBy)
.map(result -> new AggregatePushdownResult<>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we need to verify the return data is sane & safe.
See #3324

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Given the results are identical I extracted a common verification method.

@@ -1100,6 +1102,26 @@ public boolean usesLegacyTableLayouts(Session session, TableHandle table)
Optional.empty()));
}

@Override
public Optional<AggregatePushdownResult<TableHandle>> applyAggregation(Session session, TableHandle table, List<AggregateFunction> aggregates, Map<String, ColumnHandle> assignments, Optional<List<List<ColumnHandle>>> groupBy)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put each arg on separate line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -352,6 +354,12 @@

Optional<TableHandle> applySample(Session session, TableHandle table, SampleType sampleType, double sampleRatio);

Optional<AggregationPushdownResult<TableHandle>> applyAggregation(Session session,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Place the first argument on the next line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

import static java.util.Collections.unmodifiableList;
import static java.util.Objects.requireNonNull;

public class AggregationPushdownResult<T>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to AggregationApplicationResult for consistency with ProjectionApplicationResult

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 1123 to 1124
.map(
result -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format as:

Suggested change
.map(
result -> {
.map(result -> {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.flatMap(connectorExpression -> ConnectorExpressions.extractVariables(connectorExpression).stream())
.map(Variable::getName)
.filter(variableName -> !assignedVariables.contains(variableName))
.findAny().ifPresent(variableName -> { throw new IllegalStateException("Unbound variable: " + variableName); });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format as:

Suggested change
.findAny().ifPresent(variableName -> { throw new IllegalStateException("Unbound variable: " + variableName); });
.findAny()
.ifPresent(variableName -> { throw new IllegalStateException("Unbound variable: " + variableName); });

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -447,6 +449,16 @@ public boolean usesLegacyTableLayouts(Session session, TableHandle table)
return Optional.empty();
}

@Override
public Optional<AggregationPushdownResult<TableHandle>> applyAggregation(Session session,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put the first argument on the next line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

String name = signature.getName();
List<Expression> arguments = aggregation.getArguments();
List<Symbol> symbols = arguments.stream()
.filter(expression -> expression instanceof SymbolReference)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should unnecessary once you add the filter to the pattern as described above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 183 to 184
.map(SymbolsExtractor::extractUnique)
.flatMap(Collection::stream)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can only be a SymbolReference, so it'd be more appropriate to do:

Suggested change
.map(SymbolsExtractor::extractUnique)
.flatMap(Collection::stream)
.map(Symbol::from)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 187 to 189
if (arguments.size() != symbols.size()) {
return ImmutableList.of();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should unnecessary once you add the filter to the pattern as described above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

assignmentBuilder.build()));
}

private List<AggregateFunction> toAggregateFunction(List<AggregationNode.Aggregation> aggregations)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once you apply all the suggestions above and below, this method can simplify to:

private AggregateFunction toAggregateFunction(AggregationNode.Aggregation aggregation)
{
    Signature signature = aggregation.getResolvedFunction().getSignature();

    ImmutableList.Builder<ConnectorExpression> arguments = new ImmutableList.Builder<>();
    for (int i = 0; i < aggregation.getArguments().size(); i++) {
        SymbolReference argument = (SymbolReference) aggregation.getArguments().get(i);
        arguments.add(new Variable(argument.getName(), metadata.getType(signature.getArgumentTypes().get(i))));
    }

    Optional<OrderingScheme> orderingScheme = aggregation.getOrderingScheme();
    Optional<List<Map.Entry<String, SortOrder>>> sortBy = orderingScheme.map(orderings ->
            orderings.getOrderBy().stream()
                    .map(orderBy -> new AbstractMap.SimpleEntry<>(
                            orderBy.getName(),
                            SortOrder.valueOf(orderings.getOrderings().get(orderBy).name())))
                    .collect(toImmutableList()));

    return new AggregateFunction(
            signature.getName(),
            metadata.getType(signature.getReturnType()),
            arguments.build(),
            sortBy.orElse(ImmutableList.of()),
            aggregation.isDistinct(),
            aggregation.getFilter().map(Symbol::getName));
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about aggregation.getMask()? we should verify aggregation.getMask() is not present (yet).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, added check for masks in pattern.

Comment on lines 133 to 144
// The new scan outputs and assignments are union of existing and the ones returned by connector pushdown.
ImmutableList.Builder<Symbol> newScanOutputs = new ImmutableList.Builder<>();
newScanOutputs.addAll(tableScan.getOutputSymbols());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect. The new scan outputs should be the symbols associated with grouping columns plus the symbols associated with aggregations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment removed.

@findepi
Copy link
Member

findepi commented May 25, 2020

Specially to support the future use case of sum(x+y) -> being converted by the connector to sum(x) + sum(y)

Can't reply inline. This is not correct rewrite, unless data does not contain NULLs.

presto> SELECT sum(x), sum(y), sum(x+y), sum(x)+sum(y)
     -> FROM (VALUES (1,1), (NULL, 2), (3, NULL)) t(x, y);
 _col0 | _col1 | _col2 | _col3
-------+-------+-------+-------
     4 |     3 |     2 |     7

assignments,
groupByColumns.isEmpty() ? ImmutableList.of() : ImmutableList.of(groupByColumns));

if (!aggregationPushdownResult.isPresent()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

tableScan.getTable(),
aggregateFunctions,
assignments,
groupByColumns.isEmpty() ? ImmutableList.of() : ImmutableList.of(groupByColumns));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not ImmutableList.of(groupByColumns) always?

[ [] ] would correctly describe the fact that there is 1 grouping set and that it is a global aggregation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* to loop indefinitely.
* </p>
* <p>
* If the method returns a result, the list of assignments in the result will be unioned with existing assignments. The projections
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think #3697 (comment) is still waiting for final resolution. @martint ?

(nit: if this stays, would be good to avoid "unioned" as intellij spellchecker highlights this)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unioned removed. I have assumed @martint 's last comment is what we want the API to do and changed both the contract and docs to reflect that. Let me know if there is a gap in my understanding vs what is expected.

* As an example, given the following plan:
*
* <pre>
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
*

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* <pre>
* handle = TH0
* aggregates = [
* { functionName=agg_fn1, outputType = <some presto type> inputs = [{@link Variable} a]} ,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* { functionName=agg_fn1, outputType = <some presto type> inputs = [{@link Variable} a]} ,
* { functionName=agg_fn1, outputType = «some presto type» inputs = [{@link Variable} a]} ,

otherwise you would need to use &lt; &gt;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

variableMappings.put(assignment.getVariable(), symbol);
}

List<Expression> newPartialProjections = result.getProjections().stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does "partial project" mean here? (can this be newProjections?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed.

Comment on lines 159 to 175
groupingSets.getGroupingKeys().stream()
.forEach(groupBySymbol -> assignmentBuilder.put(groupBySymbol, groupBySymbol.toSymbolReference()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
groupingSets.getGroupingKeys().stream()
.forEach(groupBySymbol -> assignmentBuilder.put(groupBySymbol, groupBySymbol.toSymbolReference()));
groupingSets.getGroupingKeys()
.forEach(groupBySymbol -> assignmentBuilder.put(groupBySymbol, groupBySymbol.toSymbolReference()));

-- use Collection#forEach without going thru Stream

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 155 to 156
IntStream.range(0, aggregationOutputSymbols.size())
.forEach(index -> assignmentBuilder.put(aggregationOutputSymbols.get(index), newPartialProjections.get(index)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add verify(aggregationOutputSymbols.size() == newPartialProjections.size()); just before this line as this is where you depend on this

no need for good exc msg, since it's in fact already validated in MetadataManager, so it's more for documenting assumptions/dependencies of the code here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private final List<ConnectorExpression> inputs;
private final List<Map.Entry<String, SortOrder>> sortOrder;
private final boolean isDistinct;
private final Optional<String> filter;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional<Variable>

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this, Why would this need to be a variable given the filter on the agg side is just a Symbol?

Copy link
Member

@findepi findepi May 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Symbols (engine things) are mapped to Variables in ConnectorExpression.
Using Variable here would give semantics to this field -- connector would know it needs to check assignments to know which column handle this is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW why isn't filter a Optional<ConnectorExpression> (or ConnectorExpression without Optional, using new Constant(true, BOOLEAN) instead of empty).
@Parth-Brahmbhatt @martint did you discuss this bit already?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't discussed this, originally I did not have mask or filter as I couldn't quite understand how those were used. I will figure out how to get the corresponding connector expression from the symbol and update the PR today.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we haven't discussed this, I originally had no filters or masks as I did not completely understand what masks are for. @martint asked to add the filter in this PR in the first pass so the connectors have to explicitly decide to not handle them. For the current rule given it will only be matched when agg is followed by scan node the only possible place to look for filter symbol is tablescan assignment so I think it makes sense to just map it to Variable for now. I can't think of a scenario where we match this rule and it has a filter but the symbol is not in tablescan assignment so I have not added a check for that If you think we should add a defensive check I can add a condition.

I personally prefer Optional so I have kept it as is, let me know if you would like me to remove Optional.

assignmentBuilder.build()));
}

private List<AggregateFunction> toAggregateFunction(List<AggregationNode.Aggregation> aggregations)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about aggregation.getMask()? we should verify aggregation.getMask() is not present (yet).

@findepi findepi changed the title SPI changes to support nonpartial and complete aggregate pushdown to … SPI and engine changes for aggregation pushdown May 29, 2020
@Parth-Brahmbhatt Parth-Brahmbhatt force-pushed the agg-pushdown branch 2 times, most recently from 3325220 to b6a4432 Compare June 8, 2020 22:53
@@ -119,7 +120,7 @@ public Result apply(AggregationNode node, Captures captures, Context context)
.map(Entry::getKey)
.collect(toImmutableList());

if (aggregateFunctions.isEmpty()) {
if (aggregateFunctions.isEmpty() && !aggregations.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently aggregateFunctions.size() == aggregations.size(), so you don't need the && part.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot that I no longer return empty list when an aggregate can not be transformed as we already check for only simple symbol references. This check makes no sense any more.

@@ -62,7 +62,8 @@
{
private static final Capture<TableScanNode> TABLE_SCAN = newCapture();

private static final Pattern<AggregationNode> PATTERN = aggregation()
private static final Pattern<AggregationNode> PATTERN =
aggregation()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update indent of the following lines

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

.forEach(groupBySymbol -> {
// if the connector returned a new mapping from oldColumnHandle to newColumnHandle, groupBy needs to point to
// new columnHandle's symbol reference, otherwise it will continue pointing at oldColumnHandle.
ColumnHandle originalColumnHandle = assignments.get(groupBySymbol);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intellij warns me

'Map<String, ColumnHandle>' may not contain keys of type 'Symbol'

and indeed i get NPE in the next line, as originalColumnHandle comes out as null

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the code so it now does assignments.get(groupBySymbol.getName()); so there is no warning anymore. I think it is a valid assumption though that the groupBy Symbol name must be present in original scan assignment. If you think that is a wrong assumption I can add a defensive check and return an empty plan if the defensive check fails.

Copy link
Member

@martint martint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one minor comment.

Also:

  • Squash the review commits into the main one
  • Rename the main commit to "Add support for aggregation pushdown", since the current title is too long.

Once you apply those changes, I'll go ahead and merge it. Thanks for working on this!

return assignments;
}

public Map<ColumnHandle, ColumnHandle> getGroupingSetMapping()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call this groupingColumnMapping. Grouping sets are made of sets of columns, but this method is not specifically about the grouping sets themselves.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Parth-Brahmbhatt @martint what is groupingColumnMapping used for ? Thanks.

@Parth-Brahmbhatt
Copy link
Member Author

Just one minor comment.

Also:

  • Squash the review commits into the main one
  • Rename the main commit to "Add support for aggregation pushdown", since the current title is too long.

Once you apply those changes, I'll go ahead and merge it. Thanks for working on this!

Done

@martint
Copy link
Member

martint commented Jun 11, 2020

CI failures are unrelated

@martint martint merged commit 4052f33 into trinodb:master Jun 11, 2020
@martint
Copy link
Member

martint commented Jun 11, 2020

Merged. Thanks for working on this @Parth-Brahmbhatt!

@martint martint added this to the 335 milestone Jun 11, 2020
@findepi
Copy link
Member

findepi commented Jun 12, 2020

🎉 🚀 thank you @Parth-Brahmbhatt!

@findepi findepi mentioned this pull request Jan 15, 2021
18 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

5 participants