-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Cassandra Range pushdown #8629
Fix Cassandra Range pushdown #8629
Conversation
if (rangesAreEquivalentToSingleValueNegation(orderedRanges)) { | ||
return null; | ||
} | ||
for (Range range : orderedRanges) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the predicate = Joiner.on(" AND ").join(rangeConjuncts);
below is incorrect, isn't it?
Should be " OR "
i guess?
is it what the bug report is actually about?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the pointed code is the cause. However, Cassandra doesn't support OR
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we should remove that code -- first.
and limit pushdowns to single-value (or INs).
and this will implicitly solve the negation problem as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok so this would fix the issue, but we would lose pushdown of a single multivalued ranges, which is not that great.
I think that we need to detect when we should skip pushing down
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok so this would fix the issue, but we would lose pushdown of a single multivalued ranges, which is not that great.
i didn't say that
depends how the code is written
return first.intersect(second).isEmpty() | ||
&& !first.isHighInclusive() | ||
&& !first.isHighUnbounded() | ||
&& !second.isLowInclusive() | ||
&& !second.isLowUnbounded() | ||
&& first.getHighBoundedValue().equals(second.getLowBoundedValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it equivalent to return domain.complement().isSingleValue()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's what it should detect. I have a different solution however.
plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java
Show resolved
Hide resolved
5d1211b
to
e9b3947
Compare
Cassandra seems to support
So:
BTW can you please give the PR appropriate title after we found our the root cause? |
cc1870e
to
bfd46c8
Compare
...assandra/src/main/java/io/trino/plugin/cassandra/CassandraClusteringPredicatesExtractor.java
Outdated
Show resolved
Hide resolved
|
||
return discreteValues.getValues().stream() | ||
.map(columnHandle.getCassandraType()::toCqlLiteral) | ||
.reduce((first, second) -> first + "," + second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this O(n²)
?
use .collect( [Collectors.] joining(",", "IN(", ")") )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, changing it to collect anyway.
} | ||
|
||
/** | ||
* IN restriction allowed only on last clustering column for Cassandra version = 2.1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<= 2.1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code seems to indicate that. I've just moved it around. Will change the comment.
return cassandraVersion.compareTo(VersionNumber.parse("2.2.0")) < 0 && currentlyProcessedClusteringColumn != (clusteringColumns.size() - 1); | ||
} | ||
|
||
private static String translateRangeIntoCQL(CassandraColumnHandle columnHandle, Range range) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CQL -> Cql
} | ||
|
||
private static ClusteringPushDownResult getClusteringKeysSet(List<CassandraColumnHandle> clusteringColumns, TupleDomain<ColumnHandle> predicates, VersionNumber cassandraVersion) | ||
{ | ||
ImmutableMap.Builder<ColumnHandle, Domain> domainsBuilder = ImmutableMap.builder(); | ||
ImmutableMap.Builder<ColumnHandle, Domain> fullyPushedDomains = ImmutableMap.builder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fullyPushedDomains
is a Map, but its values are apparently never read. did you mean to make it a Set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I've left it as a map, as that's how it has been implemented before (although it's values weren't used), I've just changed the names a bit to better reflect what's going on here. It could be a Set though. I can change it to one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I've left it as a map, as that's how it has been implemented before (although it's values weren't used)
could be nice prep commit
I've just changed the names a bit to better reflect what's going on here
that's a good change in general
as it is a Map, i though we're comparing fully pushed down domains with some other domains, so was a bit confused when realized this isn't the case
bfd46c8
to
925ecd8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
editorials
over to @hashhar
predicate = Joiner.on(" AND ").join(rangeConjuncts); | ||
if (ranges.getOrderedRanges().stream().allMatch(Range::isSingleValue)) { | ||
if (isInStatementNotAllowed(clusteringColumns, cassandraVersion, currentlyProcessedClusteringColumn)) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could still do return translateRangeIntoCql(columnHandle, ranges.getSpan());
(as below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, let's do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try to do the same for the discreteValues - as we are not pushing at all in case of IN beeing not supported.
} | ||
|
||
if (!singleValues.isEmpty() && !rangeConjuncts.isEmpty()) { | ||
if (ranges.getSpan().isAll()) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (ranges.getSpan().isAll())
then you still could benefit from ranges.getOrderedRanges().stream().allMatch(Range::isSingleValue)
handling.
(we don't have a logic like that in core yet, but technically {10, 5, MIN_VALUE, MAX_VALUE}
is set of 4 values, with span being "all")
It seems that you can simply remove
if (ranges.getSpan().isAll()) {
return null;
}
block since, this case nicely handled by the following code anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm we still need to do this check, to not be pushing WHERE x > MIN AND x < MAX, as it doesn't make sense. I can move it to the method responsible for handling range pushdown though.
} | ||
if (ranges.getRangeCount() == 1) { | ||
fullyPushedDomains.add(columnHandle); | ||
return translateRangeIntoCql(columnHandle, ranges.getOrderedRanges().get(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: .get(0)
-> [Iterables.] getOnlyElement
(copy&paste fool-proof)
discreteValues.getValues().stream().findFirst() | ||
.orElseThrow()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getOnlyElement(discreteValues.getValues())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
/** | ||
* IN restriction allowed only on last clustering column for Cassandra version <= 2.1 | ||
*/ | ||
private static boolean isInStatementNotAllowed(List<CassandraColumnHandle> clusteringColumns, VersionNumber cassandraVersion, int currentlyProcessedClusteringColumn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isIn...Allowed
(without "Not") would read better.
intellij's suggestion Java | Data flow | Boolean method is always inverted
is not something i would follow unconditionally
also "InStatement" -> "InExpression"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just to not force a reader to go through !true -> not isInStatementAllowed, which is less descriptive than full sentence isInStatementNotAllowed. The other argument for leaving it as it is, is that we are only interested in the negative scenario.
} | ||
|
||
public Map<ColumnHandle, Domain> getDomains() | ||
public boolean hasNotBeenFullyPushed(ColumnHandle column) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would rather let the calling code to apply negation, so that method semantics are clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, makes sense as thanks to that this method is more descriptive in context of its class.
925ecd8
to
5c90b90
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some general comments. I'll defer to someone else like @ebyhr or @raunaqmorarka for the correctness of the impl.
} | ||
|
||
private static ClusteringPushDownResult getClusteringKeysSet(List<CassandraColumnHandle> clusteringColumns, TupleDomain<ColumnHandle> predicates, VersionNumber cassandraVersion) | ||
{ | ||
ImmutableMap.Builder<ColumnHandle, Domain> domainsBuilder = ImmutableMap.builder(); | ||
ImmutableSet.Builder<ColumnHandle> fullyPushedDomains = ImmutableSet.builder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Extract the Map -> Set changes to a separate commit. Makes it easy to differentiate the fix from other changes.
} | ||
|
||
/** | ||
* IN restriction allowed only on last clustering column for Cassandra version <= 2.1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Adjust the comment since the condition checks for v < 2.2.0 (there are versions between 2.1 and 2.2.0).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep you are right
} | ||
if (ranges.getOrderedRanges().stream().allMatch(Range::isSingleValue)) { | ||
if (isInStatementNotAllowed(clusteringColumns, cassandraVersion, currentlyProcessedClusteringColumn)) { | ||
return translateRangeIntoCql(columnHandle, ranges.getSpan()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of the cases in translateRangeIntoCql
perform full pushdown so should the column handle be added to fullyPushedDomains
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the idea here is to have a single method that deals with the complexity of creating a Cassandra predicate to pushdown for Ranges, and the general flow that recognizes when the pushdown is full or only partial. Here it's partial, as for most common scenario we are changing WHERE x IN (1,3,4)
into WHERE x >=1 AND x<=4
- which will still include x=2 that needs to be filtered out.
plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java
Show resolved
Hide resolved
...assandra/src/main/java/io/trino/plugin/cassandra/CassandraClusteringPredicatesExtractor.java
Show resolved
Hide resolved
List<Range> ranges = discreteValues.getValues().stream() | ||
.map(value -> Range.equal(domain.getType(), value)) | ||
.collect(toImmutableList()); | ||
return translateRangeIntoCql(columnHandle, SortedRangeSet.copyOf(domain.getType(), ranges).getSpan()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We get here for EquatableValueSet and that is used only for types are comparable but not orderable, so taking a span of these values is probably wrong. E.g. you could try this for ColorType, Range will probably fail to create comparisonOperator from that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok so I should probably have a check here for isOrderable rather than isComparable and this pushdown should be fine, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You will never get here when isOrderable is true because EquatableValueSet is used only when isOrderable is false and isComparable is true.
@@ -65,102 +67,135 @@ private static ClusteringPushDownResult getClusteringKeysSet(List<CassandraColum | |||
if (domain.isNullAllowed()) { | |||
break; | |||
} | |||
|
|||
int currentlyProcessedClusteringColumn = allProcesedClusteringColumns; | |||
String predicateString = domain.getValues().getValuesProcessor().transform( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just use something like
if (domain.isSingleValue()) {
// construct equality predicate from domain.getSingleValue()
}
else if (domain.isNullableDiscreteSet()) {
// construct IN predicate from domain.getNullableDiscreteSet()
}
else if (type.isOrderable()) {
// construct range predicate from domain.getValues().getRanges().getSpan()
}
In first 2 cases it is full pushdown, in 3rd case it is not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be equivalent but I'm not sure. If it is, let's make it a subsequent pr as it strides even further from the original implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be a safer and easier to understand approach but I'll leave it to @hashhar to decide if we should do it here or in a follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That looks like a refactor and I agree it looks much simpler than the current code but let's keep it for a follow-up since the current code is a smaller diff that fixes the original issue.
Although I also don't know if it's logically equivalent or not.
5c90b90
to
587f848
Compare
Let's also do #8629 (comment) in a follow-up since it helps a lot with making sense of what's going on. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left only minor comments.
ImmutableList.Builder<String> clusteringColumnSql = ImmutableList.builder(); | ||
int currentClusteringColumn = 0; | ||
int allProcesedClusteringColumns = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix typo: allProcessedClusteringColumns
.map(range -> toCqlLiteral(columnHandle, range.getSingleValue())) | ||
.collect(joining(",")); | ||
fullyPushedColumnPredicates.add(columnHandle); | ||
return CassandraCqlUtils.validColumnName(columnHandle.getName()) + "IN (" + inValues + ")"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add a space before IN
to avoid "column_name"IN (...
587f848
to
0138644
Compare
Due to the fact that Cassandra supports only =, <, >, .... IN (....) ... AND ... When we have single single-valued range, we use =. When we have single range, we use low bound < x AND x < high bound (or <= when appropriate) When we have multiple single-valued range, we use IN (...). In all other cases, including when IN is not supported in Cassandra, we push down min/max bounds (domain.getValues().getRanges().getSpan()) using low bound < x AND x < high bound (or <= when appropriate)
0138644
to
33ef97b
Compare
It's a pr that tries to fix this issue #401. First by fixing the correctness (so disabling the pushdown all together).
It seems to me however that logic which drives ranges pushdown in CassandraClusteringPredicatesExtractor is trying to deal with generic ranges and joint them with AND operator, when in reality these ranges comes from SortedRangeSet and should be treat as a sum of Range rather than an intersection. Since Cassandra doesn't support OR operator, I think that we should only push single range values all the time - so in a simplest scenario we should only handle pushdown for SortedRangeSets that size equals 1.
I would like to quickly refactor CassandraClusteringPredicatesExtractor's range handling to reflect that directly.
Ok I've clearly forgotten about IN statement, which results in similar situation. I would stil refactor this though, just with a bit more subtle approach.