Skip to content

Commit

Permalink
[SPARK-11673][SQL] Remove the normal Project physical operator (and k…
Browse files Browse the repository at this point in the history
…eep TungstenProject)

Also make full outer join being able to produce UnsafeRows.

Author: Reynold Xin <[email protected]>

Closes #9643 from rxin/SPARK-11673.
  • Loading branch information
rxin committed Nov 12, 2015
1 parent 14cf753 commit 30e7433
Show file tree
Hide file tree
Showing 27 changed files with 80 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,6 @@ public Iterator<UnsafeRow> sort(Iterator<UnsafeRow> inputIterator) throws IOExce
return sort();
}

/**
* Return true if UnsafeExternalRowSorter can sort rows with the given schema, false otherwise.
*/
public static boolean supportsSchema(StructType schema) {
return UnsafeProjection.canSupport(schema);
}

private static final class RowComparator extends RecordComparator {
private final Ordering<InternalRow> ordering;
private final int numFields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.spark.sql

import scala.reflect.ClassTag

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{ObjectType, StructField, StructType}
import org.apache.spark.util.Utils

import scala.reflect.ClassTag

/**
* Used to convert a JVM object of type `T` to and from the internal Spark SQL representation.
*
Expand Down Expand Up @@ -123,9 +123,9 @@ object Encoders {

new ExpressionEncoder[Any](
schema,
false,
flat = false,
extractExpressions,
constructExpression,
ClassTag.apply(cls))
ClassTag(cls))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,18 @@ trait ScalaReflection {
*/
def constructorFor[T : TypeTag]: Expression = constructorFor(typeOf[T], None)

protected def constructorFor(
private def constructorFor(
tpe: `Type`,
path: Option[Expression]): Expression = ScalaReflectionLock.synchronized {

/** Returns the current path with a sub-field extracted. */
def addToPath(part: String) =
def addToPath(part: String): Expression =
path
.map(p => UnresolvedExtractValue(p, expressions.Literal(part)))
.getOrElse(UnresolvedAttribute(part))

/** Returns the current path with a field at ordinal extracted. */
def addToPathOrdinal(ordinal: Int, dataType: DataType) =
def addToPathOrdinal(ordinal: Int, dataType: DataType): Expression =
path
.map(p => GetStructField(p, StructField(s"_$ordinal", dataType), ordinal))
.getOrElse(BoundReference(ordinal, dataType, false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class EquivalentExpressions {
* an empty collection if there are none.
*/
def getEquivalentExprs(e: Expression): Seq[Expression] = {
equivalenceMap.get(Expr(e)).getOrElse(mutable.MutableList())
equivalenceMap.getOrElse(Expr(e), mutable.MutableList())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,6 @@ abstract class UnsafeProjection extends Projection {

object UnsafeProjection {

/*
* Returns whether UnsafeProjection can support given StructType, Array[DataType] or
* Seq[Expression].
*/
def canSupport(schema: StructType): Boolean = canSupport(schema.fields.map(_.dataType))
def canSupport(exprs: Seq[Expression]): Boolean = canSupport(exprs.map(_.dataType).toArray)
private def canSupport(types: Array[DataType]): Boolean = {
types.forall(GenerateUnsafeProjection.canSupport)
}

/**
* Returns an UnsafeProjection for given StructType.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ case class Exchange(
/**
* Returns true iff we can support the data type, and we are not doing range partitioning.
*/
private lazy val tungstenMode: Boolean = {
GenerateUnsafeProjection.canSupport(child.schema) &&
!newPartitioning.isInstanceOf[RangePartitioning]
}
private lazy val tungstenMode: Boolean = !newPartitioning.isInstanceOf[RangePartitioning]

override def outputPartitioning: Partitioning = newPartitioning

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
package org.apache.spark.sql.execution

import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.DataSourceStrategy

@Experimental
class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
val sparkContext: SparkContext = sqlContext.sparkContext

Expand Down Expand Up @@ -64,7 +62,7 @@ class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {

val projectSet = AttributeSet(projectList.flatMap(_.references))
val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
val filterCondition =
val filterCondition: Option[Expression] =
prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And)

// Right now we still use a projection even if the only evaluation is applying an alias
Expand All @@ -82,7 +80,7 @@ class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
filterCondition.map(Filter(_, scan)).getOrElse(scan)
} else {
val scan = scanBuilder((projectSet ++ filterSet).toSeq)
Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
TungstenProject(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* if necessary.
*/
def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = {
if (TungstenSort.supportsSchema(child.schema)) {
execution.TungstenSort(sortExprs, global, child)
} else {
execution.Sort(sortExprs, global, child)
}
execution.TungstenSort(sortExprs, global, child)
}

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
Expand Down Expand Up @@ -347,13 +343,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Sort(sortExprs, global, child) =>
getSortOperator(sortExprs, global, planLater(child)):: Nil
case logical.Project(projectList, child) =>
// If unsafe mode is enabled and we support these data types in Unsafe, use the
// Tungsten project. Otherwise, use the normal project.
if (UnsafeProjection.canSupport(projectList) && UnsafeProjection.canSupport(child.schema)) {
execution.TungstenProject(projectList, planLater(child)) :: Nil
} else {
execution.Project(projectList, planLater(child)) :: Nil
}
execution.TungstenProject(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
execution.Filter(condition, planLater(child)) :: Nil
case e @ logical.Expand(_, _, child) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,7 @@ case class Window(

// Get all relevant projections.
val result = createResultProjection(unboundExpressions)
val grouping = if (child.outputsUnsafeRows) {
UnsafeProjection.create(partitionSpec, child.output)
} else {
newProjection(partitionSpec, child.output)
}
val grouping = UnsafeProjection.create(partitionSpec, child.output)

// Manage the stream and the grouping.
var nextRow: InternalRow = EmptyRow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,9 @@ case class SortBasedAggregate(
// so return an empty iterator.
Iterator[InternalRow]()
} else {
val groupingKeyProjection = if (UnsafeProjection.canSupport(groupingExpressions)) {
val groupingKeyProjection =
UnsafeProjection.create(groupingExpressions, child.output)
} else {
newMutableProjection(groupingExpressions, child.output)()
}

val outputIter = new SortBasedAggregationIterator(
groupingKeyProjection,
groupingExpressions.map(_.toAttribute),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ object TungstenAggregate {
groupingExpressions: Seq[Expression],
aggregateBufferAttributes: Seq[Attribute]): Boolean = {
val aggregationBufferSchema = StructType.fromAttributes(aggregateBufferAttributes)
UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(aggregationBufferSchema) &&
UnsafeProjection.canSupport(groupingExpressions)
UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(aggregationBufferSchema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,6 @@ import org.apache.spark.util.random.PoissonSampler
import org.apache.spark.{HashPartitioner, SparkEnv}


case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)

override private[sql] lazy val metrics = Map(
"numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))

@transient lazy val buildProjection = newMutableProjection(projectList, child.output)

protected override def doExecute(): RDD[InternalRow] = {
val numRows = longMetric("numRows")
child.execute().mapPartitions { iter =>
val reusableProjection = buildProjection()
iter.map { row =>
numRows += 1
reusableProjection(row)
}
}
}

override def outputOrdering: Seq[SortOrder] = child.outputOrdering
}


/**
* A variant of [[Project]] that returns [[UnsafeRow]]s.
*/
case class TungstenProject(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {

override private[sql] lazy val metrics = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation)
execution.Project(projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
execution.TungstenProject(
projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,15 @@ trait HashJoin {

override def output: Seq[Attribute] = left.output ++ right.output

protected[this] def isUnsafeMode: Boolean = {
UnsafeProjection.canSupport(buildKeys) && UnsafeProjection.canSupport(self.schema)
}

override def outputsUnsafeRows: Boolean = isUnsafeMode
override def canProcessUnsafeRows: Boolean = isUnsafeMode
override def canProcessSafeRows: Boolean = !isUnsafeMode
override def outputsUnsafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = false

protected def buildSideKeyGenerator: Projection =
if (isUnsafeMode) {
UnsafeProjection.create(buildKeys, buildPlan.output)
} else {
newMutableProjection(buildKeys, buildPlan.output)()
}
UnsafeProjection.create(buildKeys, buildPlan.output)

protected def streamSideKeyGenerator: Projection =
if (isUnsafeMode) {
UnsafeProjection.create(streamedKeys, streamedPlan.output)
} else {
newMutableProjection(streamedKeys, streamedPlan.output)()
}
UnsafeProjection.create(streamedKeys, streamedPlan.output)

protected def hashJoin(
streamIter: Iterator[InternalRow],
Expand All @@ -79,13 +67,8 @@ trait HashJoin {

// Mutable per row objects.
private[this] val joinRow = new JoinedRow
private[this] val resultProjection: (InternalRow) => InternalRow = {
if (isUnsafeMode) {
UnsafeProjection.create(self.schema)
} else {
identity[InternalRow]
}
}
private[this] val resultProjection: (InternalRow) => InternalRow =
UnsafeProjection.create(self.schema)

private[this] val joinKeys = streamSideKeyGenerator

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,38 +64,18 @@ trait HashOuterJoin {
s"HashOuterJoin should not take $x as the JoinType")
}

protected[this] def isUnsafeMode: Boolean = {
joinType != FullOuter &&
UnsafeProjection.canSupport(buildKeys) &&
UnsafeProjection.canSupport(self.schema)
}

override def outputsUnsafeRows: Boolean = isUnsafeMode
override def canProcessUnsafeRows: Boolean = isUnsafeMode
override def canProcessSafeRows: Boolean = !isUnsafeMode
override def outputsUnsafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = false

protected def buildKeyGenerator: Projection =
if (isUnsafeMode) {
UnsafeProjection.create(buildKeys, buildPlan.output)
} else {
newMutableProjection(buildKeys, buildPlan.output)()
}
UnsafeProjection.create(buildKeys, buildPlan.output)

protected[this] def streamedKeyGenerator: Projection = {
if (isUnsafeMode) {
UnsafeProjection.create(streamedKeys, streamedPlan.output)
} else {
newProjection(streamedKeys, streamedPlan.output)
}
}
protected[this] def streamedKeyGenerator: Projection =
UnsafeProjection.create(streamedKeys, streamedPlan.output)

protected[this] def resultProjection: InternalRow => InternalRow = {
if (isUnsafeMode) {
UnsafeProjection.create(self.schema)
} else {
identity[InternalRow]
}
}
protected[this] def resultProjection: InternalRow => InternalRow =
UnsafeProjection.create(self.schema)

@transient private[this] lazy val DUMMY_LIST = CompactBuffer[InternalRow](null)
@transient protected[this] lazy val EMPTY_LIST = CompactBuffer[InternalRow]()
Expand Down Expand Up @@ -173,8 +153,12 @@ trait HashOuterJoin {
}

protected[this] def fullOuterIterator(
key: InternalRow, leftIter: Iterable[InternalRow], rightIter: Iterable[InternalRow],
joinedRow: JoinedRow, numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
key: InternalRow,
leftIter: Iterable[InternalRow],
rightIter: Iterable[InternalRow],
joinedRow: JoinedRow,
resultProjection: InternalRow => InternalRow,
numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
if (!key.anyNull) {
// Store the positions of records in right, if one of its associated row satisfy
// the join condition.
Expand All @@ -191,7 +175,7 @@ trait HashOuterJoin {
matched = true
// if the row satisfy the join condition, add its index into the matched set
rightMatchedSet.add(idx)
joinedRow.copy()
resultProjection(joinedRow)

} ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
// 2. For those unmatched records in left, append additional records with empty right.
Expand All @@ -201,7 +185,7 @@ trait HashOuterJoin {
// of the records in right side.
// If we didn't get any proper row, then append a single row with empty right.
numOutputRows += 1
joinedRow.withRight(rightNullRow).copy()
resultProjection(joinedRow.withRight(rightNullRow))
})
} ++ rightIter.zipWithIndex.collect {
// 3. For those unmatched records in right, append additional records with empty left.
Expand All @@ -210,15 +194,15 @@ trait HashOuterJoin {
// in the matched set.
case (r, idx) if !rightMatchedSet.contains(idx) =>
numOutputRows += 1
joinedRow(leftNullRow, r).copy()
resultProjection(joinedRow(leftNullRow, r))
}
} else {
leftIter.iterator.map[InternalRow] { l =>
numOutputRows += 1
joinedRow(l, rightNullRow).copy()
resultProjection(joinedRow(l, rightNullRow))
} ++ rightIter.iterator.map[InternalRow] { r =>
numOutputRows += 1
joinedRow(leftNullRow, r).copy()
resultProjection(joinedRow(leftNullRow, r))
}
}
}
Expand Down
Loading

0 comments on commit 30e7433

Please sign in to comment.