Skip to content

Commit

Permalink
[SPARK-37949][SQL] Improve Rebalance statistics estimation
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Match `RebalancePartitions` in `SizeInBytesOnlyStatsPlanVisitor` and `BasicStatsPlanVisitor`.

### Why are the changes needed?

The defualt statistics estimation only consider the size in bytes, which may lost the row rount and columns statistics.

The `RebalancePartitions` actually does not change the statistics of plan, so we can use the statistics of its child for more accurate.

### Does this PR introduce _any_ user-facing change?

no, only affect the statistics of plan

### How was this patch tested?

Unify the test in `BasicStatsEstimationSuite`

Closes apache#35235 from ulysses-you/SPARK-37949.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
ulysses-you authored and cloud-fan committed Jan 18, 2022
1 parent 54f91d3 commit 1f496fb
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ trait LogicalPlanVisitor[T] {
case p: Project => visitProject(p)
case p: Repartition => visitRepartition(p)
case p: RepartitionByExpression => visitRepartitionByExpr(p)
case p: RebalancePartitions => visitRebalancePartitions(p)
case p: Sample => visitSample(p)
case p: ScriptTransformation => visitScriptTransform(p)
case p: Union => visitUnion(p)
Expand Down Expand Up @@ -77,6 +78,8 @@ trait LogicalPlanVisitor[T] {

def visitRepartitionByExpr(p: RepartitionByExpression): T

def visitRebalancePartitions(p: RebalancePartitions): T

def visitSample(p: Sample): T

def visitScriptTransform(p: ScriptTransformation): T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ object BasicStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {

override def visitRepartitionByExpr(p: RepartitionByExpression): Statistics = fallback(p)

override def visitRebalancePartitions(p: RebalancePartitions): Statistics = fallback(p)

override def visitSample(p: Sample): Statistics = fallback(p)

override def visitScriptTransform(p: ScriptTransformation): Statistics = default(p)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ object SizeInBytesOnlyStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {

override def visitRepartitionByExpr(p: RepartitionByExpression): Statistics = p.child.stats

override def visitRebalancePartitions(p: RebalancePartitions): Statistics = p.child.stats

override def visitSample(p: Sample): Statistics = {
val ratio = p.upperBound - p.lowerBound
var sizeInBytes = EstimationUtils.ceil(BigDecimal(p.child.stats.sizeInBytes) * ratio)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,16 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
expectedStatsCboOff = Statistics.DUMMY)
}

test("SPARK-35203: Improve Repartition statistics estimation") {
test("Improve Repartition statistics estimation") {
// SPARK-35203 for repartition and repartitionByExpr
// SPARK-37949 for rebalance
Seq(
RepartitionByExpression(plan.output, plan, 10),
RepartitionByExpression(Nil, plan, None),
plan.repartition(2),
plan.coalesce(3)).foreach { rep =>
plan.coalesce(3),
plan.rebalance(),
plan.rebalance(plan.output: _*)).foreach { rep =>
val expectedStats = Statistics(plan.size.get, Some(plan.rowCount), plan.attributeStats)
checkStats(
rep,
Expand Down

0 comments on commit 1f496fb

Please sign in to comment.