Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
imback82 committed Apr 13, 2020
1 parent a15af6b commit 40b0707
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,6 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
children.foldLeft(Option.empty[BaseType]) { (l, r) => l.orElse(r.find(f)) }
}

/**
* Tests whether a predicate holds for all nodes.
* @param p the predicate function to be applied to each node in the tree.
*/
def forall(p: BaseType => Boolean): Boolean = {
p(this) && children.forall(_.forall(p))
}

/**
* Runs the given function on this node and then recursively on [[children]].
* @param f the function to be applied to each node in the tree.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2574,8 +2574,8 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val BUCKETING_COALESCE_ENABLED =
buildConf("spark.sql.bucketing.coalesce")
val COALESCE_BUCKET_IN_JOIN_ENABLED =
buildConf("spark.sql.bucketing.coalesceBucketInJoin.enabled")
.internal()
.doc("When true, if two bucketed tables with a different number of buckets are joined, " +
"the side with a bigger number of buckets will be coalesced to have the same number " +
Expand All @@ -2585,6 +2585,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF =
buildConf("spark.sql.bucketing.coalesceBucketInJoin.maxNumBucketsDiff")
.doc("The difference in count of two buckets being coalesced should be less than or " +
"equal to this value for bucket coalescing to be applied. This configuration only " +
s"has an effect when '${COALESCE_BUCKET_IN_JOIN_ENABLED.key}' is set to true.")
.version("3.1.0")
.intConf
.checkValue(_ > 0, "The minimum number of partitions must be positive.")
.createWithDefault(256)

/**
* Holds information about keys that have been deprecated.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.bucketing.InjectBucketHint
import org.apache.spark.sql.execution.bucketing.CoalesceBucketInJoin
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
Expand Down Expand Up @@ -154,7 +154,8 @@ case class RowDataSourceScanExec(
* @param output Output attributes of the scan, including data attributes and partition attributes.
* @param requiredSchema Required schema of the underlying relation, excluding partition columns.
* @param partitionFilters Predicates to use for partition pruning.
* @param optionalBucketSet Bucket ids for bucket pruning
* @param optionalBucketSet Bucket ids for bucket pruning.
* @param optionalCoalescedNumBuckets Coalesced number of buckets.
* @param dataFilters Filters on non-partition columns.
* @param tableIdentifier identifier for the table in the metastore.
*/
Expand All @@ -164,6 +165,7 @@ case class FileSourceScanExec(
requiredSchema: StructType,
partitionFilters: Seq[Expression],
optionalBucketSet: Option[BitSet],
optionalCoalescedNumBuckets: Option[Int],
dataFilters: Seq[Expression],
tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec {
Expand Down Expand Up @@ -268,23 +270,6 @@ case class FileSourceScanExec(
}
}

/**
* A bucket can be coalesced if the number of buckets for this relation is divisible
* by the number of buckets on the other side of table for join.
*/
private lazy val coalescedNumBuckets: Option[Int] = {
val joinHintNumBuckets = relation.options.get(InjectBucketHint.JOIN_HINT_NUM_BUCKETS)
if (relation.bucketSpec.isDefined &&
SQLConf.get.getConf(SQLConf.BUCKETING_COALESCE_ENABLED) &&
joinHintNumBuckets.isDefined &&
joinHintNumBuckets.get.toInt < relation.bucketSpec.get.numBuckets &&
relation.bucketSpec.get.numBuckets % joinHintNumBuckets.get.toInt == 0) {
Some(joinHintNumBuckets.get.toInt)
} else {
None
}
}

override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
if (bucketedScan) {
// For bucketed columns:
Expand All @@ -307,7 +292,7 @@ case class FileSourceScanExec(
// above
val spec = relation.bucketSpec.get
val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
val numPartitions = coalescedNumBuckets.getOrElse(spec.numBuckets)
val numPartitions = optionalCoalescedNumBuckets.getOrElse(spec.numBuckets)
val partitioning = HashPartitioning(bucketColumns, numPartitions)
val sortColumns =
spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)
Expand All @@ -329,7 +314,7 @@ case class FileSourceScanExec(
val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)

// TODO Sort order is currently ignored if buckets are coalesced.
if (singleFilePartitions && coalescedNumBuckets.isEmpty) {
if (singleFilePartitions && optionalCoalescedNumBuckets.isEmpty) {
// TODO Currently Spark does not support writing columns sorting in descending order
// so using Ascending order. This can be fixed in future
sortColumns.map(attribute => SortOrder(attribute, Ascending))
Expand Down Expand Up @@ -558,8 +543,8 @@ case class FileSourceScanExec(
filesGroupedToBuckets
}

val filePartitions = if (coalescedNumBuckets.isDefined) {
val newNumBuckets = coalescedNumBuckets.get
val filePartitions = if (optionalCoalescedNumBuckets.isDefined) {
val newNumBuckets = optionalCoalescedNumBuckets.get
logInfo(s"Coalescing to ${newNumBuckets} buckets")
val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % newNumBuckets)
Seq.tabulate(newNumBuckets) { bucketId =>
Expand Down Expand Up @@ -625,6 +610,7 @@ case class FileSourceScanExec(
requiredSchema,
QueryPlan.normalizePredicates(partitionFilters, output),
optionalBucketSet,
optionalCoalescedNumBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.optimizer._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.execution.bucketing.InjectBucketHint
import org.apache.spark.sql.execution.bucketing.CoalesceBucketInJoin
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
import org.apache.spark.sql.execution.datasources.SchemaPruning
import org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown
Expand All @@ -45,7 +45,7 @@ class SparkOptimizer(
Batch("PartitionPruning", Once,
PartitionPruning,
OptimizeSubqueries) :+
Batch("Bucketing", Once, InjectBucketHint) :+
Batch("Bucketing", Once, CoalesceBucketInJoin) :+
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
PushDownPredicates) :+
Batch("Cleanup filters that cannot be pushed down", Once,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.bucketing

import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.internal.SQLConf

/**
* This rule injects a hint if one side of two bucketed tables can be coalesced
* when the two bucketed tables are inner-joined and they differ in the number of buckets.
*/
object CoalesceBucketInJoin extends Rule[LogicalPlan] {
val JOIN_HINT_COALESCED_NUM_BUCKETS: String = "JoinHintCoalescedNumBuckets"

private val sqlConf = SQLConf.get

private def isPlanEligible(plan: LogicalPlan): Boolean = {
def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = {
p(plan) && plan.children.forall(forall(_)(p))
}

forall(plan) {
case _: Filter | _: Project | _: LogicalRelation => true
case _ => false
}
}

private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = {
if (isPlanEligible(plan)) {
plan.collectFirst {
case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _)
if r.bucketSpec.nonEmpty && !r.options.contains(JOIN_HINT_COALESCED_NUM_BUCKETS) =>
r.bucketSpec.get
}
} else {
None
}
}

private def mayCoalesce(numBuckets1: Int, numBuckets2: Int): Option[Int] = {
assert(numBuckets1 != numBuckets2)
val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2))
// A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller
// number of buckets because bucket id is calculated by modding the total number of buckets.
if ((large % small == 0) &&
(large - small) <= sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF)) {
Some(small)
} else {
None
}
}

private def addBucketHint(plan: LogicalPlan, hint: (String, String)): LogicalPlan = {
plan.transformUp {
case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) =>
l.copy(relation = r.copy(options = r.options + hint)(r.sparkSession))
}
}

def apply(plan: LogicalPlan): LogicalPlan = {
if (!sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_ENABLED)) {
return plan
}

plan transform {
case join: Join if join.joinType == Inner =>
val leftBucket = getBucketSpec(join.left)
val rightBucket = getBucketSpec(join.right)
if (leftBucket.isEmpty || rightBucket.isEmpty) {
return plan
}

val leftBucketNumber = leftBucket.get.numBuckets
val rightBucketNumber = rightBucket.get.numBuckets
if (leftBucketNumber == rightBucketNumber) {
return plan
}

mayCoalesce(leftBucketNumber, rightBucketNumber).map { coalescedNumBuckets =>
val hint = JOIN_HINT_COALESCED_NUM_BUCKETS -> coalescedNumBuckets.toString
if (coalescedNumBuckets != leftBucketNumber) {
join.copy(left = addBucketHint(join.left, hint))
} else {
join.copy(right = addBucketHint(join.right, hint))
}
}.getOrElse(join)

case other => other
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.bucketing.CoalesceBucketInJoin
import org.apache.spark.util.collection.BitSet

/**
Expand Down Expand Up @@ -205,6 +206,7 @@ object FileSourceStrategy extends Strategy with Logging {
outputSchema,
partitionKeyFilters.toSeq,
bucketSet,
fsRelation.options.get(CoalesceBucketInJoin.JOIN_HINT_COALESCED_NUM_BUCKETS).map(_.toInt),
dataFilters,
table.map(_.identifier))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ class DataFrameJoinSuite extends QueryTest
}
assert(broadcastExchanges.size == 1)
val tables = broadcastExchanges.head.collect {
case FileSourceScanExec(_, _, _, _, _, _, Some(tableIdent)) => tableIdent
case FileSourceScanExec(_, _, _, _, _, _, _, Some(tableIdent)) => tableIdent
}
assert(tables.size == 1)
assert(tables.head === TableIdentifier(table1Name, Some(dbName)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1311,7 +1311,7 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
// need to execute the query before we can examine fs.inputRDDs()
assert(stripAQEPlan(df.queryExecution.executedPlan) match {
case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _)))) =>
fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _, _)))) =>
partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
fs.inputRDDs().forall(
_.asInstanceOf[FileScanRDD].filePartitions.forall(
Expand Down
Loading

0 comments on commit 40b0707

Please sign in to comment.