diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 46bd60daa1f78..2dda3ad1211fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -921,12 +921,15 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[InternalRow], 1) /** - * Prepares a planned SparkPlan for execution by inserting shuffle operations as needed. + * Prepares a planned SparkPlan for execution by inserting shuffle operations and internal + * row format conversions as needed. */ @transient protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] { - val batches = - Batch("Add exchange", Once, EnsureRequirements(self)) :: Nil + val batches = Seq( + Batch("Add exchange", Once, EnsureRequirements(self)), + Batch("Add row converters", Once, EnsureRowFormats) + ) } protected[sql] def openSession(): SQLSession = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index d4819a76656b0..9a338e51268f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -80,16 +80,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** Specifies sort order for each partition requirements on the input data for this operator. */ def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil) + /** Specifies whether this operator outputs UnsafeRows */ + def outputsUnsafeRows: Boolean = false + + /** Specifies whether this operator is capable of processing UnsafeRows */ + def canProcessUnsafeRows: Boolean = false + /** - * The format of this operator's output rows as a function of its input rows' format. This assumes - * that all this operator's children have the same row output format; this property is guaranteed - * to hold since the physical planner includes a rule to automatically convert all of an - * operators' inputs to the same row format. + * Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows + * that are not UnsafeRows). */ - def outputRowFormat(inputFormat: RowFormat): RowFormat = SafeRowFormat - - /** The set of row formats that this operator can process. */ - def supportedInputRowFormats: Set[RowFormat] = Set(SafeRowFormat) + def canProcessSafeRows: Boolean = true /** * Returns the result of this query as an RDD[InternalRow] by delegating to doExecute diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/rowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/rowConverters.scala index 030dcdc58d587..bc94076d01de8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/rowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/rowConverters.scala @@ -20,43 +20,23 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow, UnsafeRowConverter} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.unsafe.PlatformDependent - - -sealed trait RowFormat -case object UnsafeRowFormat extends RowFormat -case object SafeRowFormat extends RowFormat - /** * :: DeveloperApi :: * Converts Java-object-based rows into [[UnsafeRow]]s. */ @DeveloperApi -case class ToUnsafeRow(child: SparkPlan) extends UnaryNode { +case class ConvertToUnsafe(child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output - override protected def doExecute(): RDD[UnsafeRow] = { - // TODO: this will change after SPARK-9022 / #7437 + override def outputsUnsafeRows: Boolean = true + override def canProcessUnsafeRows: Boolean = false + override def canProcessSafeRows: Boolean = true + override protected def doExecute(): RDD[InternalRow] = { child.execute().mapPartitions { iter => - val toUnsafeConverter = new UnsafeRowConverter(child.output.map(_.dataType).toArray) - val unsafeRow = new UnsafeRow() - var buffer = new Array[Byte](64) - val numFields = child.output.size - def convert(row: InternalRow): UnsafeRow = { - val sizeRequirement = toUnsafeConverter.getSizeRequirement(row) - if (sizeRequirement > buffer.length) { - buffer = new Array[Byte](sizeRequirement) - } - // TODO: how do we want to handle object pools here? - toUnsafeConverter.writeRow( - row, buffer, PlatformDependent.BYTE_ARRAY_OFFSET, sizeRequirement, null) - unsafeRow.pointTo( - buffer, PlatformDependent.BYTE_ARRAY_OFFSET, numFields, sizeRequirement, null) - unsafeRow - } - iter.map(convert) + val convertToUnsafe = UnsafeProjection.create(child.schema) + iter.map(convertToUnsafe) } } } @@ -66,24 +46,62 @@ case class ToUnsafeRow(child: SparkPlan) extends UnaryNode { * Converts [[UnsafeRow]]s back into Java-object-based rows. */ @DeveloperApi -case class FromUnsafeRow(child: SparkPlan) extends UnaryNode { +case class ConvertFromUnsafe(child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output + override def outputsUnsafeRows: Boolean = false + override def canProcessUnsafeRows: Boolean = true + override def canProcessSafeRows: Boolean = false override protected def doExecute(): RDD[InternalRow] = { - // TODO: this will change after SPARK-9022 / #7437 child.execute().mapPartitions { iter => - val proj = newMutableProjection(null, child.output)() - iter.map(proj) + val convertToSafe = FromUnsafeProjection(child.output.map(_.dataType)) + iter.map(convertToSafe) } } } private[sql] object EnsureRowFormats extends Rule[SparkPlan] { - private def meetsRequirements(operator: åSparkPlan): Boolean = { - operator.children.flatMap(_.) - } + private def onlyHandlesSafeRows(operator: SparkPlan): Boolean = + operator.canProcessSafeRows && !operator.canProcessUnsafeRows - override def apply(operator: SparkPlan): SparkPlan = { + private def onlyHandlesUnsafeRows(operator: SparkPlan): Boolean = + operator.canProcessUnsafeRows && !operator.canProcessSafeRows + private def handlesBothSafeAndUnsafeRows(operator: SparkPlan): Boolean = + operator.canProcessSafeRows && operator.canProcessUnsafeRows + + override def apply(operator: SparkPlan): SparkPlan = operator.transformUp { + case operator: SparkPlan if onlyHandlesSafeRows(operator) => + if (operator.children.exists(_.outputsUnsafeRows)) { + operator.withNewChildren { + operator.children.map { + c => if (c.outputsUnsafeRows) ConvertFromUnsafe(c) else c + } + } + } else { + operator + } + case operator: SparkPlan if onlyHandlesUnsafeRows(operator) => + if (operator.children.exists(!_.outputsUnsafeRows)) { + operator.withNewChildren { + operator.children.map { + c => if (!c.outputsUnsafeRows) ConvertToUnsafe(c) else c + } + } + } else { + operator + } + case operator: SparkPlan if handlesBothSafeAndUnsafeRows(operator) => + if (operator.children.map(_.outputsUnsafeRows).toSet.size != 1) { + // If this operator's children produce both unsafe and safe rows, then convert everything + // to safe rows + operator.withNewChildren { + operator.children.map { + c => if (c.outputsUnsafeRows) ConvertFromUnsafe(c) else c + } + } + } else { + operator + } } } \ No newline at end of file