Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Cassandra Range pushdown #8629

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@
import com.datastax.driver.core.VersionNumber;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.plugin.cassandra.util.CassandraCqlUtils;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.google.common.collect.Iterables.getOnlyElement;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

public class CassandraClusteringPredicatesExtractor
{
Expand All @@ -48,15 +49,14 @@ public String getClusteringKeyPredicates()

public TupleDomain<ColumnHandle> getUnenforcedConstraints()
{
Map<ColumnHandle, Domain> pushedDown = clusteringPushDownResult.getDomains();
return predicates.filter(((columnHandle, domain) -> !pushedDown.containsKey(columnHandle)));
return predicates.filter(((columnHandle, domain) -> !clusteringPushDownResult.hasBeenFullyPushed(columnHandle)));
}

private static ClusteringPushDownResult getClusteringKeysSet(List<CassandraColumnHandle> clusteringColumns, TupleDomain<ColumnHandle> predicates, VersionNumber cassandraVersion)
{
ImmutableMap.Builder<ColumnHandle, Domain> domainsBuilder = ImmutableMap.builder();
ImmutableSet.Builder<ColumnHandle> fullyPushedColumnPredicates = ImmutableSet.builder();
ImmutableList.Builder<String> clusteringColumnSql = ImmutableList.builder();
int currentClusteringColumn = 0;
int allProcessedClusteringColumns = 0;
for (CassandraColumnHandle columnHandle : clusteringColumns) {
Domain domain = predicates.getDomains().get().get(columnHandle);
if (domain == null) {
Expand All @@ -65,102 +65,130 @@ private static ClusteringPushDownResult getClusteringKeysSet(List<CassandraColum
if (domain.isNullAllowed()) {
break;
}

int currentlyProcessedClusteringColumn = allProcessedClusteringColumns;
String predicateString = domain.getValues().getValuesProcessor().transform(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use something like

if (domain.isSingleValue()) {
   // construct equality predicate from domain.getSingleValue()
}
else if (domain.isNullableDiscreteSet()) {
   // construct IN predicate from domain.getNullableDiscreteSet()
}
else if (type.isOrderable()) {
   // construct range predicate from domain.getValues().getRanges().getSpan()
}

In first 2 cases it is full pushdown, in 3rd case it is not

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be equivalent but I'm not sure. If it is, let's make it a subsequent pr as it strides even further from the original implementation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be a safer and easier to understand approach but I'll leave it to @hashhar to decide if we should do it here or in a follow up.

Copy link
Member

@hashhar hashhar Aug 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks like a refactor and I agree it looks much simpler than the current code but let's keep it for a follow-up since the current code is a smaller diff that fixes the original issue.
Although I also don't know if it's logically equivalent or not.

ranges -> {
List<Object> singleValues = new ArrayList<>();
List<String> rangeConjuncts = new ArrayList<>();
String predicate = null;

for (Range range : ranges.getOrderedRanges()) {
if (range.isAll()) {
return null;
}
if (range.isSingleValue()) {
singleValues.add(columnHandle.getCassandraType().toCqlLiteral(range.getSingleValue()));
}
else {
if (!range.isLowUnbounded()) {
String lowBound = columnHandle.getCassandraType().toCqlLiteral(range.getLowBoundedValue());
rangeConjuncts.add(format(
"%s %s %s",
CassandraCqlUtils.validColumnName(columnHandle.getName()),
range.isLowInclusive() ? ">=" : ">",
lowBound));
}
if (!range.isHighUnbounded()) {
String highBound = columnHandle.getCassandraType().toCqlLiteral(range.getHighBoundedValue());
rangeConjuncts.add(format(
"%s %s %s",
CassandraCqlUtils.validColumnName(columnHandle.getName()),
range.isHighInclusive() ? "<=" : "<",
highBound));
}
}
}

if (!singleValues.isEmpty() && !rangeConjuncts.isEmpty()) {
return null;
if (ranges.getRangeCount() == 1) {
fullyPushedColumnPredicates.add(columnHandle);
return translateRangeIntoCql(columnHandle, getOnlyElement(ranges.getOrderedRanges()));
}
if (!singleValues.isEmpty()) {
if (singleValues.size() == 1) {
predicate = CassandraCqlUtils.validColumnName(columnHandle.getName()) + " = " + singleValues.get(0);
}
else {
predicate = CassandraCqlUtils.validColumnName(columnHandle.getName()) + " IN ("
+ Joiner.on(",").join(singleValues) + ")";
if (ranges.getOrderedRanges().stream().allMatch(Range::isSingleValue)) {
if (isInExpressionNotAllowed(clusteringColumns, cassandraVersion, currentlyProcessedClusteringColumn)) {
return translateRangeIntoCql(columnHandle, ranges.getSpan());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the cases in translateRangeIntoCql perform full pushdown so should the column handle be added to fullyPushedDomains too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the idea here is to have a single method that deals with the complexity of creating a Cassandra predicate to pushdown for Ranges, and the general flow that recognizes when the pushdown is full or only partial. Here it's partial, as for most common scenario we are changing WHERE x IN (1,3,4) into WHERE x >=1 AND x<=4 - which will still include x=2 that needs to be filtered out.

}

String inValues = ranges.getOrderedRanges().stream()
.map(range -> toCqlLiteral(columnHandle, range.getSingleValue()))
.collect(joining(","));
fullyPushedColumnPredicates.add(columnHandle);
return CassandraCqlUtils.validColumnName(columnHandle.getName()) + " IN (" + inValues + ")";
}
else if (!rangeConjuncts.isEmpty()) {
predicate = Joiner.on(" AND ").join(rangeConjuncts);
}
return predicate;
return translateRangeIntoCql(columnHandle, ranges.getSpan());
}, discreteValues -> {
if (discreteValues.isInclusive()) {
ImmutableList.Builder<Object> discreteValuesList = ImmutableList.builder();
for (Object discreteValue : discreteValues.getValues()) {
discreteValuesList.add(columnHandle.getCassandraType().toCqlLiteral(discreteValue));
if (discreteValues.getValuesCount() == 0) {
return null;
}
if (discreteValues.getValuesCount() == 1) {
fullyPushedColumnPredicates.add(columnHandle);
return format("%s = %s",
CassandraCqlUtils.validColumnName(columnHandle.getName()),
toCqlLiteral(columnHandle, getOnlyElement(discreteValues.getValues())));
}
String predicate = CassandraCqlUtils.validColumnName(columnHandle.getName()) + " IN ("
+ Joiner.on(",").join(discreteValuesList.build()) + ")";
return predicate;
if (isInExpressionNotAllowed(clusteringColumns, cassandraVersion, currentlyProcessedClusteringColumn)) {
return null;
}

String inValues = discreteValues.getValues().stream()
.map(columnHandle.getCassandraType()::toCqlLiteral)
.collect(joining(","));
fullyPushedColumnPredicates.add(columnHandle);
return CassandraCqlUtils.validColumnName(columnHandle.getName()) + " IN (" + inValues + " )";
}
return null;
}, allOrNone -> null);

if (predicateString == null) {
break;
}
// IN restriction only on last clustering column for Cassandra version = 2.1
if (predicateString.contains(" IN (") && cassandraVersion.compareTo(VersionNumber.parse("2.2.0")) < 0 && currentClusteringColumn != (clusteringColumns.size() - 1)) {
break;
}
clusteringColumnSql.add(predicateString);
domainsBuilder.put(columnHandle, domain);
// Check for last clustering column should only be restricted by range condition
if (predicateString.contains(">") || predicateString.contains("<")) {
break;
}
currentClusteringColumn++;
allProcessedClusteringColumns++;
}
List<String> clusteringColumnPredicates = clusteringColumnSql.build();

return new ClusteringPushDownResult(domainsBuilder.build(), Joiner.on(" AND ").join(clusteringColumnPredicates));
return new ClusteringPushDownResult(fullyPushedColumnPredicates.build(), Joiner.on(" AND ").join(clusteringColumnPredicates));
}

/**
* IN restriction allowed only on last clustering column for Cassandra version <= 2.2.0
*/
private static boolean isInExpressionNotAllowed(List<CassandraColumnHandle> clusteringColumns, VersionNumber cassandraVersion, int currentlyProcessedClusteringColumn)
{
return cassandraVersion.compareTo(VersionNumber.parse("2.2.0")) < 0 && currentlyProcessedClusteringColumn != (clusteringColumns.size() - 1);
}

private static String toCqlLiteral(CassandraColumnHandle columnHandle, Object value)
{
return columnHandle.getCassandraType().toCqlLiteral(value);
}

private static String translateRangeIntoCql(CassandraColumnHandle columnHandle, Range range)
{
if (range.isAll()) {
return null;
}
if (range.isSingleValue()) {
return format("%s = %s",
CassandraCqlUtils.validColumnName(columnHandle.getName()),
toCqlLiteral(columnHandle, range.getSingleValue()));
}

String lowerBoundPredicate = null;
String upperBoundPredicate = null;
if (!range.isLowUnbounded()) {
String lowBound = toCqlLiteral(columnHandle, range.getLowBoundedValue());
lowerBoundPredicate = format(
"%s %s %s",
CassandraCqlUtils.validColumnName(columnHandle.getName()),
range.isLowInclusive() ? ">=" : ">",
lowBound);
}
if (!range.isHighUnbounded()) {
String highBound = toCqlLiteral(columnHandle, range.getHighBoundedValue());
upperBoundPredicate = format(
"%s %s %s",
CassandraCqlUtils.validColumnName(columnHandle.getName()),
range.isHighInclusive() ? "<=" : "<",
highBound);
}
if (lowerBoundPredicate != null && upperBoundPredicate != null) {
return format("%s AND %s ", lowerBoundPredicate, upperBoundPredicate);
}
if (lowerBoundPredicate != null) {
return lowerBoundPredicate;
}
return upperBoundPredicate;
hashhar marked this conversation as resolved.
Show resolved Hide resolved
}

private static class ClusteringPushDownResult
{
private final Map<ColumnHandle, Domain> domains;
private final Set<ColumnHandle> fullyPushedColumnPredicates;
private final String domainQuery;

public ClusteringPushDownResult(Map<ColumnHandle, Domain> domains, String domainQuery)
public ClusteringPushDownResult(Set<ColumnHandle> fullyPushedColumnPredicates, String domainQuery)
{
this.domains = requireNonNull(ImmutableMap.copyOf(domains));
this.fullyPushedColumnPredicates = ImmutableSet.copyOf(requireNonNull(fullyPushedColumnPredicates, "fullyPushedColumnPredicates is null"));
this.domainQuery = requireNonNull(domainQuery);
}

public Map<ColumnHandle, Domain> getDomains()
public boolean hasBeenFullyPushed(ColumnHandle column)
{
return domains;
return fullyPushedColumnPredicates.contains(column);
}

public String getDomainQuery()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,19 @@ public void testClusteringKeyOnlyPushdown()
assertEquals(execute(sql).getRowCount(), 1);
}

@Test
public void testNotEqualPredicateOnClusteringColumn()
findepi marked this conversation as resolved.
Show resolved Hide resolved
{
String sql = "SELECT * FROM " + TABLE_CLUSTERING_KEYS_INEQUALITY + " WHERE key='key_1' AND clust_one != 'clust_one'";
assertEquals(execute(sql).getRowCount(), 0);
sql = "SELECT * FROM " + TABLE_CLUSTERING_KEYS_INEQUALITY + " WHERE key='key_1' AND clust_one='clust_one' AND clust_two != 2";
assertEquals(execute(sql).getRowCount(), 3);
sql = "SELECT * FROM " + TABLE_CLUSTERING_KEYS_INEQUALITY + " WHERE key='key_1' AND clust_one='clust_one' AND clust_two >= 2 AND clust_two != 3";
assertEquals(execute(sql).getRowCount(), 2);
sql = "SELECT * FROM " + TABLE_CLUSTERING_KEYS_INEQUALITY + " WHERE key='key_1' AND clust_one='clust_one' AND clust_two > 2 AND clust_two != 3";
assertEquals(execute(sql).getRowCount(), 1);
hashhar marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
public void testClusteringKeyPushdownInequality()
{
Expand Down