-
Notifications
You must be signed in to change notification settings - Fork 246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify aggregation push-down #153
Conversation
originalAggExpr.isDistinct, | ||
originalAggExpr.resultId | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This local function is replaced by inlined .copy()
calls.
val toAlias: AggregateExpression => Alias = { | ||
lazy val deterministicAggAliases = aggregateExpressions.collect { | ||
case e if e.deterministic => e -> Alias(e.canonicalized, e.toString())() | ||
}.toMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, no need to be lazy
, updating...
@@ -35,7 +34,6 @@ class TiContext(val session: SparkSession) extends Serializable with Logging { | |||
|
|||
TiUtils.sessionInitialize(session, tiSession) | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accidental whitespace changes introduced by scalafmt.
request.addOrderByItem( | ||
TiByItem.create( | ||
BasicExpression.convertToTiExpr(order.child).get, | ||
order.direction.sql.equalsIgnoreCase("DESC") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensured below that sortOrder
will never be null.
(aggregateExpressions ++ extraAggregateExpressions).distinct, | ||
rewrittenResultExpressions, | ||
child | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made TiAggregation
always return results w/o any occurrences of Average
. I think this is reasonable since the purpose of TiAggregation
is to extract aggregate functions that can be pushed down to TiKV, while Average
is not directly pushable.
Extracting the Average
rewriting code path into a separate method also LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning the original aggregateExpressions
which may still contain an average
expression, passing it directly to method groupAggregateProjection
(line 388) will cause an IllegalStateException
.
I suggest remove the average
expression in the final aggregateExpressions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, nice catch!
@Novemser This is reviewable now. |
// If sortOrder is not null, limit must be greater than 0 | ||
if (limit < 0 || (sortOrder == null && limit == 0)) { | ||
// If sortOrder is empty, limit must be greater than 0 | ||
if (limit < 0 || (sortOrder.isEmpty && limit == 0)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did I miss something here? I don't think SortOrder
can be null in Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry it's my fault, originally takeOrderedAndProject
is called in collectLimit
, and an explicit null sortOrder will be passed, I removed that call in collectLimit
but forgot to remove these checks here, thanks for your dedicated review!
case e: Sum => aggExpr.copy(aggregateFunction = e.copy(child = partialResultRef)) | ||
case e: First => aggExpr.copy(aggregateFunction = e.copy(child = partialResultRef)) | ||
case _: Count => aggExpr.copy(aggregateFunction = Sum(partialResultRef)) | ||
case _: Average => throw new IllegalStateException("All AVGs should have been rewritten.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems rewritten logic of Average
to sum
/count
doesn't go as expected.
Such querys like:
select avg(any_col) from any_table
will trigger IllegalStateException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fixed now (but I cannot test it...).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the inconvenience, I shall do tests for you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the other hand, I'm happy to code w/o writing any tests, LOL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to check average rewrite logic according to comment above.
@Novemser Updated according to the comments. Thanks! |
} | ||
def aliasPushedPartialResult(e: AggregateExpression): Alias = { | ||
deterministicAggAliases.getOrElse(e, Alias(e, e.toString())()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Novemser @ilovesoup The only non-deterministic aggregate function handled by TiStrategy
is First
, which doesn't seem to be necessary to be special-cased here. Any counter examples?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems true, currently I cannot find any counter example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to remove this logic? Was trying to add comments to explain why determinism is a concern here and then realized that it's not...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There seems to be some issue with deterministicAggAliases
, originally aliasMap
was designed to eliminate duplicated aggregations, e.g.
SQL:
select count(col+1),count(1+col) from any_table
count(col + 1)
and count(1 + col)
will be reduced to only one canonicalized expression like count(col + 1)
, and push this canonicalized expression down to TiKV, but current implementation doesn't serve the same as the original design(could be verified by executing the above sql).
Similar issue has been discussed here #45
@liancheng We may need some further discussion on this topic. : )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, my bad, it should be:
deterministicAggAliases.getOrElse(e.canonicalized, Alias(e, e.toString())())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very pleased to receive your response so fast, BTW I think determinism logic could be simplified as you suggested : )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can open a separate PR for this change. This one is already too big to review and track.
Test report: DAG Result is as expected |
This PR LGTM. @zhexuany @ilovesoup PTAL |
projects | ||
.map { _.toAttribute.name } | ||
.map { TiColumnRef.create } | ||
.foreach { dagReq.addRequiredColumn } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I missed the addRequiredColumn
call whiles cleaning up the original code path, thanks for fixing it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not your problem, this code snippet was just introduced into master
branch few hours ago via this PR #143, and I merged that change into this branch.
@Novemser I made a minor refactoring in my last commit, which is arguably better (in the sense that eliminating mutable collections), depending on the coding style PingCAP prefers. In Spark (or more specifically, Catalyst), we tend not to use mutable states (var or mutable collections) whenever possible for non-critical code paths to minimize side effects. Part of the reason why I neglected the If Again, this is quite subjective and feel free to disagree and revert :) |
Use JavaConverters instead for easier tracking Scala/Java collections conversion.
@liancheng I totally agree with you on the thought of not to use mutable states here and pretty appreciate your elegant solution of using immutable states. As for Thanks again for your dedicated work! |
I reran the test, results are follows Test report: DAG All results are as expected |
BTW it's interesting and glad to serve like a manual only judge here, LOL |
This PR still need further review.
Please let me know if there's anything that I'm expected to do to get this moving forward. One of the things I can think of is to split this single refactoring PR into smaller ones for easier tracking and reducing potential conflicts with other important outstanding PRs. |
filters: Seq[Expression], | ||
source: TiDBRelation, | ||
dagRequest: TiDAGRequest = new TiDAGRequest(pushDownType(), timeZoneOffset()) | ||
): TiDAGRequest = { | ||
val tiFilters: Seq[TiExpr] = filters.collect { case BasicExpression(expr) => expr } | ||
val tiFilters = filters.collect { case BasicExpression(expr) => expr }.asJava |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add type information back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed. Thanks!
order.direction.sql.equalsIgnoreCase("DESC") | ||
) | ||
private def addSortOrder(request: TiDAGRequest, sortOrder: Seq[SortOrder]): Unit = | ||
sortOrder.foreach { order: SortOrder => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we delete null
value check here? If sortOrder is null
, foreach
could throw an NPE here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just saw you change null to nil when you call this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, please see this comment thread for more details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. change to nil means empty list. Null value check is unnecessary.
@liancheng We appreciate for your excellent work! : ) |
This PR tries to simplify/refactor the aggregation push-down code path in
TiStrategy
to improve readability and maintainability by:Use more Scala idiomatic coding style
In some cases, it may introduce extra expression traversals. It shouldn't be an issue since the number of expressions is quite limited but feel free to disagree.
Unify
Average
push-downInstead of special-casing
Average
ingroupAggregateProjection
, we may want to convertAverage
s intoCount
s andSum
s earlier in the form of Catalyst expressions so that we can remove the clumsy code paths involvingavgPushdownRewriteMap
,avgFinalRewriteMap
, and building partial/final aggregate expressions.Opening this PR early for discussion and to make sure it makes Travis happy. If people would like to split this work into a few separate PRs for easier code review and future Git blaming, I'd be happy to do that.
This change is