Skip to content

Commit

Permalink
add createDataFrame without conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Apr 2, 2015
1 parent 4a3767b commit 06896e4
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 3 deletions.
3 changes: 2 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,8 @@ class DataFrame private[sql](
*/
override def repartition(numPartitions: Int): DataFrame = {
sqlContext.createDataFrame(
queryExecution.toRdd.map(_.copy()).repartition(numPartitions), schema)
queryExecution.toRdd.map(_.copy()).repartition(numPartitions),
schema, needsConversion = false)
}

/**
Expand Down
15 changes: 14 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,22 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
@DeveloperApi
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
createDataFrame(rowRDD, schema, needsConversion = true)
}

/**
* Creates a DataFrame from an RDD[Row]. User can specify whether the input rows should be
* converted to Catalyst rows.
*/
private[sql]
def createDataFrame(rowRDD: RDD[Row], schema: StructType, needsConversion: Boolean) = {
// TODO: use MutableProjection when rowRDD is another DataFrame and the applied
// schema differs from the existing schema on any field data type.
val catalystRows = rowRDD.map(ScalaReflection.convertToCatalyst(_, schema).asInstanceOf[Row])
val catalystRows = if (needsConversion) {
rowRDD.map(ScalaReflection.convertToCatalyst(_, schema).asInstanceOf[Row])
} else {
rowRDD
}
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
DataFrame(this, logicalPlan)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ private[sql] class DefaultSource
val df =
sqlContext.createDataFrame(
data.queryExecution.toRdd,
data.schema.asNullable)
data.schema.asNullable,
needsConversion = false)
val createdRelation =
createRelation(sqlContext, parameters, df.schema).asInstanceOf[ParquetRelation2]
createdRelation.insert(df, overwrite = mode == SaveMode.Overwrite)
Expand Down

0 comments on commit 06896e4

Please sign in to comment.