Skip to content

Commit

Permalink
Finish writing EnsureRowFormats planner rule
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 17, 2015
1 parent b5df19b commit d5f9005
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 47 deletions.
9 changes: 6 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -921,12 +921,15 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[InternalRow], 1)

/**
* Prepares a planned SparkPlan for execution by inserting shuffle operations as needed.
* Prepares a planned SparkPlan for execution by inserting shuffle operations and internal
* row format conversions as needed.
*/
@transient
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
val batches =
Batch("Add exchange", Once, EnsureRequirements(self)) :: Nil
val batches = Seq(
Batch("Add exchange", Once, EnsureRequirements(self)),
Batch("Add row converters", Once, EnsureRowFormats)
)
}

protected[sql] def openSession(): SQLSession = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
/** Specifies sort order for each partition requirements on the input data for this operator. */
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)

/** Specifies whether this operator outputs UnsafeRows */
def outputsUnsafeRows: Boolean = false

/** Specifies whether this operator is capable of processing UnsafeRows */
def canProcessUnsafeRows: Boolean = false

/**
* The format of this operator's output rows as a function of its input rows' format. This assumes
* that all this operator's children have the same row output format; this property is guaranteed
* to hold since the physical planner includes a rule to automatically convert all of an
* operators' inputs to the same row format.
* Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows
* that are not UnsafeRows).
*/
def outputRowFormat(inputFormat: RowFormat): RowFormat = SafeRowFormat

/** The set of row formats that this operator can process. */
def supportedInputRowFormats: Set[RowFormat] = Set(SafeRowFormat)
def canProcessSafeRows: Boolean = true

/**
* Returns the result of this query as an RDD[InternalRow] by delegating to doExecute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,23 @@ package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow, UnsafeRowConverter}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.unsafe.PlatformDependent


sealed trait RowFormat
case object UnsafeRowFormat extends RowFormat
case object SafeRowFormat extends RowFormat


/**
* :: DeveloperApi ::
* Converts Java-object-based rows into [[UnsafeRow]]s.
*/
@DeveloperApi
case class ToUnsafeRow(child: SparkPlan) extends UnaryNode {
case class ConvertToUnsafe(child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override protected def doExecute(): RDD[UnsafeRow] = {
// TODO: this will change after SPARK-9022 / #7437
override def outputsUnsafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = false
override def canProcessSafeRows: Boolean = true
override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitions { iter =>
val toUnsafeConverter = new UnsafeRowConverter(child.output.map(_.dataType).toArray)
val unsafeRow = new UnsafeRow()
var buffer = new Array[Byte](64)
val numFields = child.output.size
def convert(row: InternalRow): UnsafeRow = {
val sizeRequirement = toUnsafeConverter.getSizeRequirement(row)
if (sizeRequirement > buffer.length) {
buffer = new Array[Byte](sizeRequirement)
}
// TODO: how do we want to handle object pools here?
toUnsafeConverter.writeRow(
row, buffer, PlatformDependent.BYTE_ARRAY_OFFSET, sizeRequirement, null)
unsafeRow.pointTo(
buffer, PlatformDependent.BYTE_ARRAY_OFFSET, numFields, sizeRequirement, null)
unsafeRow
}
iter.map(convert)
val convertToUnsafe = UnsafeProjection.create(child.schema)
iter.map(convertToUnsafe)
}
}
}
Expand All @@ -66,24 +46,62 @@ case class ToUnsafeRow(child: SparkPlan) extends UnaryNode {
* Converts [[UnsafeRow]]s back into Java-object-based rows.
*/
@DeveloperApi
case class FromUnsafeRow(child: SparkPlan) extends UnaryNode {
case class ConvertFromUnsafe(child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override def outputsUnsafeRows: Boolean = false
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = false
override protected def doExecute(): RDD[InternalRow] = {
// TODO: this will change after SPARK-9022 / #7437
child.execute().mapPartitions { iter =>
val proj = newMutableProjection(null, child.output)()
iter.map(proj)
val convertToSafe = FromUnsafeProjection(child.output.map(_.dataType))
iter.map(convertToSafe)
}
}
}

private[sql] object EnsureRowFormats extends Rule[SparkPlan] {

private def meetsRequirements(operator: åSparkPlan): Boolean = {
operator.children.flatMap(_.)
}
private def onlyHandlesSafeRows(operator: SparkPlan): Boolean =
operator.canProcessSafeRows && !operator.canProcessUnsafeRows

override def apply(operator: SparkPlan): SparkPlan = {
private def onlyHandlesUnsafeRows(operator: SparkPlan): Boolean =
operator.canProcessUnsafeRows && !operator.canProcessSafeRows

private def handlesBothSafeAndUnsafeRows(operator: SparkPlan): Boolean =
operator.canProcessSafeRows && operator.canProcessUnsafeRows

override def apply(operator: SparkPlan): SparkPlan = operator.transformUp {
case operator: SparkPlan if onlyHandlesSafeRows(operator) =>
if (operator.children.exists(_.outputsUnsafeRows)) {
operator.withNewChildren {
operator.children.map {
c => if (c.outputsUnsafeRows) ConvertFromUnsafe(c) else c
}
}
} else {
operator
}
case operator: SparkPlan if onlyHandlesUnsafeRows(operator) =>
if (operator.children.exists(!_.outputsUnsafeRows)) {
operator.withNewChildren {
operator.children.map {
c => if (!c.outputsUnsafeRows) ConvertToUnsafe(c) else c
}
}
} else {
operator
}
case operator: SparkPlan if handlesBothSafeAndUnsafeRows(operator) =>
if (operator.children.map(_.outputsUnsafeRows).toSet.size != 1) {
// If this operator's children produce both unsafe and safe rows, then convert everything
// to safe rows
operator.withNewChildren {
operator.children.map {
c => if (c.outputsUnsafeRows) ConvertFromUnsafe(c) else c
}
}
} else {
operator
}
}
}

0 comments on commit d5f9005

Please sign in to comment.