Skip to content

Commit

Permalink
HadoopFsRelation subclasses should set their output format class
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Jun 24, 2015
1 parent f04b567 commit 6db1368
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ private[sql] class ParquetRelation2(
committerClass,
classOf[ParquetOutputCommitter])

job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])

// TODO There's no need to use two kinds of WriteSupport
// We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
// complex types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSer
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, RecordWriter, Reporter}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}

Expand Down Expand Up @@ -194,6 +194,16 @@ private[sql] class OrcRelation(
}

override def prepareJobForWrite(job: Job): OutputWriterFactory = {
job.getConfiguration match {
case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat])
case conf =>
conf.setClass(
"mapred.output.format.class",
classOf[OrcOutputFormat],
classOf[MapRedOutputFormat[_, _]])
}

new OutputWriterFactory {
override def newInstance(
path: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class SimpleTextRelation(
}

override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory {
job.setOutputFormatClass(classOf[TextOutputFormat[_, _]])

override def newInstance(
path: String,
dataSchema: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,4 +719,25 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}
}

test("SPARK-8604: Parquet data source should write summary file while doing appending") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = sqlContext.range(0, 5)
df.write.mode(SaveMode.Overwrite).parquet(path)

val summaryPath = new Path(path, "_metadata")
val commonSummaryPath = new Path(path, "_common_metadata")

val fs = summaryPath.getFileSystem(configuration)
fs.delete(summaryPath, true)
fs.delete(commonSummaryPath, true)

df.write.mode(SaveMode.Append).parquet(path)
checkAnswer(sqlContext.read.parquet(path), df.unionAll(df))

assert(fs.exists(summaryPath))
assert(fs.exists(commonSummaryPath))
}
}
}

0 comments on commit 6db1368

Please sign in to comment.