Skip to content

Commit

Permalink
[SPARK-8604] [SQL] HadoopFsRelation subclasses should set their outpu…
Browse files Browse the repository at this point in the history
…t format class

`HadoopFsRelation` subclasses, especially `ParquetRelation2` should set its own output format class, so that the default output committer can be setup correctly when doing appending (where we ignore user defined output committers).

Author: Cheng Lian <[email protected]>

Closes #6998 from liancheng/spark-8604 and squashes the following commits:

9be51d1 [Cheng Lian] Adds more comments
6db1368 [Cheng Lian] HadoopFsRelation subclasses should set their output format class

(cherry picked from commit c337844)
Signed-off-by: Cheng Lian <[email protected]>
  • Loading branch information
liancheng committed Jun 25, 2015
1 parent 792ed7a commit 0605e08
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ private[sql] class ParquetRelation2(
committerClass,
classOf[ParquetOutputCommitter])

// We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override
// it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why
// we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is
// bundled with `ParquetOutputFormat[Row]`.
job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])

// TODO There's no need to use two kinds of WriteSupport
// We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
// complex types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSer
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, RecordWriter, Reporter}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}

Expand Down Expand Up @@ -193,6 +193,16 @@ private[sql] class OrcRelation(
}

override def prepareJobForWrite(job: Job): OutputWriterFactory = {
job.getConfiguration match {
case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat])
case conf =>
conf.setClass(
"mapred.output.format.class",
classOf[OrcOutputFormat],
classOf[MapRedOutputFormat[_, _]])
}

new OutputWriterFactory {
override def newInstance(
path: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class SimpleTextRelation(
}

override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory {
job.setOutputFormatClass(classOf[TextOutputFormat[_, _]])

override def newInstance(
path: String,
dataSchema: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,4 +719,25 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}
}

test("SPARK-8604: Parquet data source should write summary file while doing appending") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = sqlContext.range(0, 5)
df.write.mode(SaveMode.Overwrite).parquet(path)

val summaryPath = new Path(path, "_metadata")
val commonSummaryPath = new Path(path, "_common_metadata")

val fs = summaryPath.getFileSystem(configuration)
fs.delete(summaryPath, true)
fs.delete(commonSummaryPath, true)

df.write.mode(SaveMode.Append).parquet(path)
checkAnswer(sqlContext.read.parquet(path), df.unionAll(df))

assert(fs.exists(summaryPath))
assert(fs.exists(commonSummaryPath))
}
}
}

0 comments on commit 0605e08

Please sign in to comment.