Skip to content

Commit

Permalink
[SPARK-8406] [SQL] Backports SPARK-8406 and PR #6864 to branch-1.4
Browse files Browse the repository at this point in the history
Author: Cheng Lian <[email protected]>

Closes #6932 from liancheng/spark-8406-for-1.4 and squashes the following commits:

a0168fe [Cheng Lian] Backports SPARK-8406 and PR #6864 to branch-1.4
  • Loading branch information
liancheng authored and yhuai committed Jun 22, 2015
1 parent b836bac commit 451c872
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,50 +61,21 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
extends OutputWriter {

private val recordWriter: RecordWriter[Void, Row] = {
val conf = context.getConfiguration
val outputFormat = {
// When appending new Parquet files to an existing Parquet file directory, to avoid
// overwriting existing data files, we need to find out the max task ID encoded in these data
// file names.
// TODO Make this snippet a utility function for other data source developers
val maxExistingTaskId = {
// Note that `path` may point to a temporary location. Here we retrieve the real
// destination path from the configuration
val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
val fs = outputPath.getFileSystem(conf)

if (fs.exists(outputPath)) {
// Pattern used to match task ID in part file names, e.g.:
//
// part-r-00001.gz.parquet
// ^~~~~
val partFilePattern = """part-.-(\d{1,}).*""".r

fs.listStatus(outputPath).map(_.getPath.getName).map {
case partFilePattern(id) => id.toInt
case name if name.startsWith("_") => 0
case name if name.startsWith(".") => 0
case name => throw new AnalysisException(
s"Trying to write Parquet files to directory $outputPath, " +
s"but found items with illegal name '$name'.")
}.reduceOption(_ max _).getOrElse(0)
} else {
0
}
}

new ParquetOutputFormat[Row]() {
// Here we override `getDefaultWorkFile` for two reasons:
//
// 1. To allow appending. We need to generate output file name based on the max available
// task ID computed above.
// 1. To allow appending. We need to generate unique output file names to avoid
// overwriting existing files (either exist before the write job, or are just written
// by other tasks within the same write job).
//
// 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses
// `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
// partitions in the case of dynamic partitioning.
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
new Path(path, f"part-r-$split%05d$extension")
val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
val split = context.getTaskAttemptID.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
}
}
}
Expand Down
59 changes: 49 additions & 10 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.sources

import java.util.Date
import java.util.{UUID, Date}

import scala.collection.mutable

Expand Down Expand Up @@ -59,6 +59,28 @@ private[sql] case class InsertIntoDataSource(
}
}

/**
* A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
* Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a
* single write job, and owns a UUID that identifies this job. Each concrete implementation of
* [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
* each task output file. This UUID is passed to executor side via a property named
* `spark.sql.sources.writeJobUUID`.
*
* Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
* are used to write to normal tables and tables with dynamic partitions.
*
* Basic work flow of this command is:
*
* 1. Driver side setup, including output committer initialization and data source specific
* preparation work for the write job to be issued.
* 2. Issues a write job consists of one or more executor side tasks, each of which writes all
* rows within an RDD partition.
* 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
* exception is thrown during task commitment, also aborts that task.
* 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
* thrown during job commitment, also aborts the job.
*/
private[sql] case class InsertIntoHadoopFsRelation(
@transient relation: HadoopFsRelation,
@transient query: LogicalPlan,
Expand Down Expand Up @@ -271,6 +293,13 @@ private[sql] abstract class BaseWriterContainer(

protected val serializableConf = new SerializableWritable(ContextUtil.getConfiguration(job))

// This UUID is used to avoid output file name collision between different appending write jobs.
// These jobs may belong to different SparkContext instances. Concrete data source implementations
// may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
// The reason why this ID is used to identify a job rather than a single task output file is
// that, speculative tasks must generate the same output file name as the original task.
private val uniqueWriteJobId = UUID.randomUUID()

// This is only used on driver side.
@transient private val jobContext: JobContext = job

Expand Down Expand Up @@ -298,6 +327,11 @@ private[sql] abstract class BaseWriterContainer(
setupIDs(0, 0, 0)
setupConf()

// This UUID is sent to executor side together with the serialized `Configuration` object within
// the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
// unique task output files.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)

// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
// configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
Expand Down Expand Up @@ -425,15 +459,16 @@ private[sql] class DefaultWriterContainer(
assert(writer != null, "OutputWriter instance should have been initialized")
writer.close()
super.commitTask()
} catch {
case cause: Throwable =>
super.abortTask()
throw new RuntimeException("Failed to commit task", cause)
} catch { case cause: Throwable =>
// This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will
// cause `abortTask()` to be invoked.
throw new RuntimeException("Failed to commit task", cause)
}
}

override def abortTask(): Unit = {
try {
// It's possible that the task fails before `writer` gets initialized
if (writer != null) {
writer.close()
}
Expand Down Expand Up @@ -477,21 +512,25 @@ private[sql] class DynamicPartitionWriterContainer(
})
}

override def commitTask(): Unit = {
try {
private def clearOutputWriters(): Unit = {
if (outputWriters.nonEmpty) {
outputWriters.values.foreach(_.close())
outputWriters.clear()
}
}

override def commitTask(): Unit = {
try {
clearOutputWriters()
super.commitTask()
} catch { case cause: Throwable =>
super.abortTask()
throw new RuntimeException("Failed to commit task", cause)
}
}

override def abortTask(): Unit = {
try {
outputWriters.values.foreach(_.close())
outputWriters.clear()
clearOutputWriters()
} finally {
super.abortTask()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private[orc] object OrcFileOperator extends Logging{
val fspath = new Path(pathStr)
val fs = fspath.getFileSystem(conf)
val orcFiles = listOrcFiles(pathStr, conf)

logDebug(s"Creating ORC Reader from ${orcFiles.head}")
// TODO Need to consider all files when schema evolution is taken into account.
OrcFile.createReader(fs, orcFiles.head)
}
Expand All @@ -42,6 +42,7 @@ private[orc] object OrcFileOperator extends Logging{
val reader = getFileReader(path, conf)
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
logDebug(s"Reading schema from file $path, got Hive schema string: $schema")
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
}

Expand All @@ -52,14 +53,14 @@ private[orc] object OrcFileOperator extends Logging{
def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
val fs = origPath.getFileSystem(conf)
val path = origPath.makeQualified(fs)
val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
.filterNot(_.isDir)
.map(_.getPath)
.filterNot(_.getName.startsWith("_"))
.filterNot(_.getName.startsWith("."))

if (paths == null || paths.size == 0) {
if (paths == null || paths.isEmpty) {
throw new IllegalArgumentException(
s"orcFileOperator: path $path does not have valid orc files matching the pattern")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ private[orc] class OrcOutputWriter(
recordWriterInstantiated = true

val conf = context.getConfiguration
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
val partition = context.getTaskAttemptID.getTaskID.getId
val filename = f"part-r-$partition%05d-${System.currentTimeMillis}%015d.orc"
val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"

new OrcOutputFormat().getRecordWriter(
new Path(path, filename).getFileSystem(conf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import scala.collection.JavaConversions._
object TestHive
extends TestHiveContext(
new SparkContext(
"local[2]",
"local[32]",
"TestSQLContext",
new SparkConf()
.set("spark.sql.test", "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
import org.apache.spark.sql.hive.test.TestHive.implicits._

sparkContext
.makeRDD(1 to 10)
.makeRDD(1 to 100)
.map(i => OrcData(i, s"part-$i"))
.toDF()
.registerTempTable(s"orc_temp_table")
Expand All @@ -70,43 +70,43 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
}

test("create temporary orc table") {
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100))

checkAnswer(
sql("SELECT * FROM normal_orc_source"),
(1 to 10).map(i => Row(i, s"part-$i")))
(1 to 100).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT * FROM normal_orc_source where intField > 5"),
(6 to 10).map(i => Row(i, s"part-$i")))
(6 to 100).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
(1 to 10).map(i => Row(1, s"part-$i")))
(1 to 100).map(i => Row(1, s"part-$i")))
}

test("create temporary orc table as") {
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100))

checkAnswer(
sql("SELECT * FROM normal_orc_source"),
(1 to 10).map(i => Row(i, s"part-$i")))
(1 to 100).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
(6 to 10).map(i => Row(i, s"part-$i")))
(6 to 100).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
(1 to 10).map(i => Row(1, s"part-$i")))
(1 to 100).map(i => Row(1, s"part-$i")))
}

test("appending insert") {
sql("INSERT INTO TABLE normal_orc_source SELECT * FROM orc_temp_table WHERE intField > 5")

checkAnswer(
sql("SELECT * FROM normal_orc_source"),
(1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
(1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i =>
Seq.fill(2)(Row(i, s"part-$i"))
})
}
Expand All @@ -119,7 +119,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {

checkAnswer(
sql("SELECT * FROM normal_orc_as_source"),
(6 to 10).map(i => Row(i, s"part-$i")))
(6 to 100).map(i => Row(i, s"part-$i")))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
numberFormat.setGroupingUsed(false)

override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
val split = context.getTaskAttemptID.getTaskID.getId
val name = FileOutputFormat.getOutputName(context)
new Path(outputFile, s"$name-${numberFormat.format(split)}-${UUID.randomUUID()}")
new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
}
}

Expand Down Expand Up @@ -156,6 +157,7 @@ class CommitFailureTestRelation(
context: TaskAttemptContext): OutputWriter = {
new SimpleTextOutputWriter(path, context) {
override def close(): Unit = {
super.close()
sys.error("Intentional task commitment failure for testing purpose.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ import java.io.File
import com.google.common.io.Files
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.{SparkException, SparkFunSuite}

abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
override val sqlContext: SQLContext = TestHive

import sqlContext._
import sqlContext.implicits._

val dataSourceName = classOf[SimpleTextSource].getCanonicalName
val dataSourceName: String

val dataSchema =
StructType(
Expand Down Expand Up @@ -470,6 +470,33 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
}
}

// NOTE: This test suite is not super deterministic. On nodes with only relatively few cores
// (4 or even 1), it's hard to reproduce the data loss issue. But on nodes with for example 8 or
// more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this
// requirement. We probably want to move this test case to spark-integration-tests or spark-perf
// later.
test("SPARK-8406: Avoids name collision while writing Parquet files") {
withTempPath { dir =>
val path = dir.getCanonicalPath
sqlContext
.range(10000)
.repartition(250)
.write
.mode(SaveMode.Overwrite)
.format(dataSourceName)
.save(path)

assertResult(10000) {
sqlContext
.read
.format(dataSourceName)
.option("dataSchema", StructType(StructField("id", LongType) :: Nil).json)
.load(path)
.count()
}
}
}
}

class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
Expand Down Expand Up @@ -502,15 +529,17 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
}

class CommitFailureTestRelationSuite extends SparkFunSuite with SQLTestUtils {
import TestHive.implicits._

override val sqlContext = TestHive

// When committing a task, `CommitFailureTestSource` throws an exception for testing purpose.
val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName

test("SPARK-7684: commitTask() failure should fallback to abortTask()") {
withTempPath { file =>
val df = (1 to 3).map(i => i -> s"val_$i").toDF("a", "b")
// Here we coalesce partition number to 1 to ensure that only a single task is issued. This
// prevents race condition happened when FileOutputCommitter tries to remove the `_temporary`
// directory while committing/aborting the job. See SPARK-8513 for more details.
val df = sqlContext.range(0, 10).coalesce(1)
intercept[SparkException] {
df.write.format(dataSourceName).save(file.getCanonicalPath)
}
Expand Down

0 comments on commit 451c872

Please sign in to comment.