Skip to content

Commit

Permalink
Implement Distinct Count Pushdown in Pinot
Browse files Browse the repository at this point in the history
  • Loading branch information
elonazoulay committed Aug 22, 2021
1 parent a1feed2 commit 8723d41
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 6 deletions.
5 changes: 5 additions & 0 deletions plugin/trino-pinot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@
<version>1.15</version>
</dependency>

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>

<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;

import javax.annotation.PostConstruct;
import javax.validation.constraints.NotNull;

import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;

public class PinotConfig
Expand Down Expand Up @@ -54,6 +57,7 @@ public class PinotConfig
private int maxRowsPerSplitForSegmentQueries = 50_000;
private int maxRowsForBrokerQueries = 50_000;
private boolean aggregationPushdownEnabled = true;
private boolean countDistinctPushdownEnabled = true;

@NotNull
public List<URI> getControllerUrls()
Expand Down Expand Up @@ -295,4 +299,25 @@ public PinotConfig setAggregationPushdownEnabled(boolean aggregationPushdownEnab
this.aggregationPushdownEnabled = aggregationPushdownEnabled;
return this;
}

public boolean isCountDistinctPushdownEnabled()
{
return countDistinctPushdownEnabled;
}

@Config("pinot.count-distinct-pushdown.enabled")
@ConfigDescription("Controls whether distinct count is pushed down to Pinot. Distinct count pushdown can cause Pinot to do a full scan. Aggregation pushdown must also be enabled in addition to this parameter otherwise no pushdowns will be enabled.")
public PinotConfig setCountDistinctPushdownEnabled(boolean countDistinctPushdownEnabled)
{
this.countDistinctPushdownEnabled = countDistinctPushdownEnabled;
return this;
}

@PostConstruct
public void validate()
{
checkState(
!countDistinctPushdownEnabled || aggregationPushdownEnabled,
"Invalid configuration: pinot.aggregation-pushdown.enabled must be enabled if pinot.count-distinct-pushdown.enabled");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.plugin.pinot.query.aggregation.ImplementApproxDistinct;
import io.trino.plugin.pinot.query.aggregation.ImplementAvg;
import io.trino.plugin.pinot.query.aggregation.ImplementCountAll;
import io.trino.plugin.pinot.query.aggregation.ImplementCountDistinct;
import io.trino.plugin.pinot.query.aggregation.ImplementMinMax;
import io.trino.plugin.pinot.query.aggregation.ImplementSum;
import io.trino.spi.connector.AggregateFunction;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
Expand All @@ -90,6 +92,7 @@ public class PinotMetadata
private final LoadingCache<Object, List<String>> allTablesCache;
private final int maxRowsPerBrokerQuery;
private final AggregateFunctionRewriter aggregateFunctionRewriter;
private final ImplementCountDistinct implementCountDistinct;

@Inject
public PinotMetadata(
Expand Down Expand Up @@ -118,12 +121,14 @@ public List<PinotColumn> load(String tableName)

executor.execute(() -> this.allTablesCache.refresh(ALL_TABLES_CACHE_KEY));
this.maxRowsPerBrokerQuery = pinotConfig.getMaxRowsForBrokerQueries();
this.implementCountDistinct = new ImplementCountDistinct();
this.aggregateFunctionRewriter = new AggregateFunctionRewriter(identity(), ImmutableSet.<AggregateFunctionRule>builder()
.add(new ImplementCountAll())
.add(new ImplementAvg())
.add(new ImplementMinMax())
.add(new ImplementSum())
.add(new ImplementApproxDistinct())
.add(implementCountDistinct)
.build());
}

Expand Down Expand Up @@ -344,11 +349,10 @@ public Optional<AggregationApplicationResult<ConnectorTableHandle>> applyAggrega
}

PinotTableHandle tableHandle = (PinotTableHandle) handle;
// If aggregate and grouping columns are present than no further aggregations
// If aggregate are present than no further aggregations
// can be pushed down: there are currently no subqueries in pinot
if (tableHandle.getQuery().isPresent() &&
(!tableHandle.getQuery().get().getAggregateColumns().isEmpty() ||
!tableHandle.getQuery().get().getGroupingColumns().isEmpty())) {
!tableHandle.getQuery().get().getAggregateColumns().isEmpty()) {
return Optional.empty();
}

Expand All @@ -358,6 +362,7 @@ public Optional<AggregationApplicationResult<ConnectorTableHandle>> applyAggrega

for (AggregateFunction aggregate : aggregates) {
Optional<PinotColumnHandle> rewriteResult = aggregateFunctionRewriter.rewrite(session, aggregate, assignments);
rewriteResult = applyCountDistinct(session, aggregate, assignments, tableHandle, rewriteResult);
if (rewriteResult.isEmpty()) {
return Optional.empty();
}
Expand Down Expand Up @@ -393,6 +398,43 @@ public Optional<AggregationApplicationResult<ConnectorTableHandle>> applyAggrega
return Optional.of(new AggregationApplicationResult<>(tableHandle, projections.build(), resultAssignments.build(), ImmutableMap.of(), false));
}

private Optional<PinotColumnHandle> applyCountDistinct(ConnectorSession session, AggregateFunction aggregate, Map<String, ColumnHandle> assignments, PinotTableHandle tableHandle, Optional<PinotColumnHandle> rewriteResult)
{
AggregateFunctionRule.RewriteContext context = new AggregateFunctionRule.RewriteContext()
{
@Override
public Map<String, ColumnHandle> getAssignments()
{
return assignments;
}

@Override
public Function<String, String> getIdentifierQuote()
{
return identity();
}

@Override
public ConnectorSession getSession()
{
return session;
}
};

if (implementCountDistinct.getPattern().matches(aggregate, context)) {
Variable input = (Variable) getOnlyElement(aggregate.getInputs());
// If this is the second pass to applyAggregation for count distinct then
// the first pass will have added the distinct column to the grouping columns,
// otherwise do not push down the aggregation.
// This is to avoid count(column_name) being pushed into pinot, which is currently unsupported.
// Currently Pinot treats count(column_name) as count(*), i.e. it counts nulls.
if (tableHandle.getQuery().isEmpty() || !tableHandle.getQuery().get().getGroupingColumns().contains(input.getName())) {
return Optional.empty();
}
}
return rewriteResult;
}

@Override
public boolean usesLegacyTableLayouts()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
Expand All @@ -35,6 +36,7 @@ public class PinotSessionProperties
private static final String RETRY_COUNT = "retry_count";
private static final String NON_AGGREGATE_LIMIT_FOR_BROKER_QUERIES = "non_aggregate_limit_for_broker_queries";
private static final String AGGREGATION_PUSHDOWN_ENABLED = "aggregation_pushdown_enabled";
private static final String COUNT_DISTINCT_PUSHDOWN_ENABLED = "count_distinct_pushdown_enabled";

@VisibleForTesting
public static final String FORBID_SEGMENT_QUERIES = "forbid_segment_queries";
Expand Down Expand Up @@ -80,6 +82,13 @@ public static boolean isAggregationPushdownEnabled(ConnectorSession session)
return session.getProperty(AGGREGATION_PUSHDOWN_ENABLED, Boolean.class);
}

public static boolean isCountDistinctPushdownEnabled(ConnectorSession session)
{
// This should never fail as this method would never be called unless aggregation pushdown is enabled
verify(isAggregationPushdownEnabled(session), "%s must be enabled when %s is enabled", AGGREGATION_PUSHDOWN_ENABLED, COUNT_DISTINCT_PUSHDOWN_ENABLED);
return session.getProperty(COUNT_DISTINCT_PUSHDOWN_ENABLED, Boolean.class);
}

@Inject
public PinotSessionProperties(PinotConfig pinotConfig)
{
Expand Down Expand Up @@ -119,6 +128,11 @@ public PinotSessionProperties(PinotConfig pinotConfig)
AGGREGATION_PUSHDOWN_ENABLED,
"Enable aggregation pushdown",
pinotConfig.isAggregationPushdownEnabled(),
false),
booleanProperty(
COUNT_DISTINCT_PUSHDOWN_ENABLED,
"Enable count distinct pushdown",
pinotConfig.isCountDistinctPushdownEnabled(),
false));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.pinot.query.aggregation;

import io.trino.matching.Capture;
import io.trino.matching.Captures;
import io.trino.matching.Pattern;
import io.trino.plugin.base.expression.AggregateFunctionRule;
import io.trino.plugin.pinot.PinotColumnHandle;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.expression.Variable;

import java.util.Optional;

import static com.google.common.base.Verify.verify;
import static io.trino.matching.Capture.newCapture;
import static io.trino.plugin.base.expression.AggregateFunctionPatterns.basicAggregation;
import static io.trino.plugin.base.expression.AggregateFunctionPatterns.functionName;
import static io.trino.plugin.base.expression.AggregateFunctionPatterns.outputType;
import static io.trino.plugin.base.expression.AggregateFunctionPatterns.singleInput;
import static io.trino.plugin.base.expression.AggregateFunctionPatterns.variable;
import static io.trino.plugin.pinot.PinotSessionProperties.isCountDistinctPushdownEnabled;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.String.format;

public class ImplementCountDistinct
implements AggregateFunctionRule
{
private static final Capture<Variable> INPUT = newCapture();

@Override
public Pattern<AggregateFunction> getPattern()
{
return basicAggregation()
.with(functionName().equalTo("count"))
.with(outputType().equalTo(BIGINT))
.with(singleInput().matching(variable().capturedAs(INPUT)));
}

@Override
public Optional<PinotColumnHandle> rewrite(AggregateFunction aggregateFunction, Captures captures, RewriteContext context)
{
if (!isCountDistinctPushdownEnabled(context.getSession())) {
return Optional.empty();
}
Variable input = captures.get(INPUT);
verify(aggregateFunction.getOutputType() == BIGINT);
return Optional.of(new PinotColumnHandle(format("distinctcount(%s)", context.getIdentifierQuote().apply(input.getName())), aggregateFunction.getOutputType(), false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestPinotConfig
{
@Test
Expand All @@ -45,7 +47,8 @@ public void testDefaults()
.setRequestTimeout(new Duration(30, TimeUnit.SECONDS))
.setMaxRowsPerSplitForSegmentQueries(50_000)
.setMaxRowsForBrokerQueries(50_000)
.setAggregationPushdownEnabled(true));
.setAggregationPushdownEnabled(true)
.setCountDistinctPushdownEnabled(true));
}

@Test
Expand All @@ -70,6 +73,7 @@ public void testExplicitPropertyMappings()
.put("pinot.max-rows-per-split-for-segment-queries", "10")
.put("pinot.max-rows-for-broker-queries", "5000")
.put("pinot.aggregation-pushdown.enabled", "false")
.put("pinot.count-distinct-pushdown.enabled", "false")
.build();

PinotConfig expected = new PinotConfig()
Expand All @@ -90,8 +94,20 @@ public void testExplicitPropertyMappings()
.setRequestTimeout(new Duration(1, TimeUnit.MINUTES))
.setMaxRowsPerSplitForSegmentQueries(10)
.setMaxRowsForBrokerQueries(5000)
.setAggregationPushdownEnabled(false);
.setAggregationPushdownEnabled(false)
.setCountDistinctPushdownEnabled(false);

ConfigAssertions.assertFullMapping(properties, expected);
}

@Test
public void testInvalidCountDistinctPushdown()
{
assertThatThrownBy(() -> new PinotConfig()
.setAggregationPushdownEnabled(false)
.setCountDistinctPushdownEnabled(true)
.validate())
.isInstanceOf(IllegalStateException.class)
.hasMessage("Invalid configuration: pinot.aggregation-pushdown.enabled must be enabled if pinot.count-distinct-pushdown.enabled");
}
}
Loading

0 comments on commit 8723d41

Please sign in to comment.