Skip to content

Commit

Permalink
[SPARK-8406] [SQL] Adding UUID to output file name to avoid accidenta…
Browse files Browse the repository at this point in the history
…l overwriting

This PR fixes a Parquet output file name collision bug which may cause data loss.  Changes made:

1.  Identify each write job issued by `InsertIntoHadoopFsRelation` with a UUID

    All concrete data sources which extend `HadoopFsRelation` (Parquet and ORC for now) must use this UUID to generate task output file path to avoid name collision.

2.  Make `TestHive` use a local mode `SparkContext` with 32 threads to increase parallelism

    The major reason for this is that, the original parallelism of 2 is too low to reproduce the data loss issue.  Also, higher concurrency may potentially caught more concurrency bugs during testing phase. (It did help us spotted SPARK-8501.)

3. `OrcSourceSuite` was updated to workaround SPARK-8501, which we detected along the way.

NOTE: This PR is made a little bit more complicated than expected because we hit two other bugs on the way and have to work them around. See [SPARK-8501] [1] and [SPARK-8513] [2].

[1]: https://github.com/liancheng/spark/tree/spark-8501
[2]: https://github.com/liancheng/spark/tree/spark-8513

----

Some background and a summary of offline discussion with yhuai about this issue for better understanding:

In 1.4.0, we added `HadoopFsRelation` to abstract partition support of all data sources that are based on Hadoop `FileSystem` interface.  Specifically, this makes partition discovery, partition pruning, and writing dynamic partitions for data sources much easier.

To support appending, the Parquet data source tries to find out the max part number of part-files in the destination directory (i.e., `<id>` in output file name `part-r-<id>.gz.parquet`) at the beginning of the write job.  In 1.3.0, this step happens on driver side before any files are written.  However, in 1.4.0, this is moved to task side.  Unfortunately, for tasks scheduled later, they may see wrong max part number generated of files newly written by other finished tasks within the same job.  This actually causes a race condition.  In most cases, this only causes nonconsecutive part numbers in output file names.  But when the DataFrame contains thousands of RDD partitions, it's likely that two tasks may choose the same part number, then one of them gets overwritten by the other.

Before `HadoopFsRelation`, Spark SQL already supports appending data to Hive tables.  From a user's perspective, these two look similar.  However, they differ a lot internally.  When data are inserted into Hive tables via Spark SQL, `InsertIntoHiveTable` simulates Hive's behaviors:

1.  Write data to a temporary location

2.  Move data in the temporary location to the final destination location using

    -   `Hive.loadTable()` for non-partitioned table
    -   `Hive.loadPartition()` for static partitions
    -   `Hive.loadDynamicPartitions()` for dynamic partitions

The important part is that, `Hive.copyFiles()` is invoked in step 2 to move the data to the destination directory (I found the name is kinda confusing since no "copying" occurs here, we are just moving and renaming stuff).  If a file in the source directory and another file in the destination directory happen to have the same name, say `part-r-00001.parquet`, the former is moved to the destination directory and renamed with a `_copy_N` postfix (`part-r-00001_copy_1.parquet`).  That's how Hive handles appending and avoids name collision between different write jobs.

Some alternatives fixes considered for this issue:

1.  Use a similar approach as Hive

    This approach is not preferred in Spark 1.4.0 mainly because file metadata operations in S3 tend to be slow, especially for tables with lots of file and/or partitions.  That's why `InsertIntoHadoopFsRelation` just inserts to destination directory directly, and is often used together with `DirectParquetOutputCommitter` to reduce latency when working with S3.  This means, we don't have the chance to do renaming, and must avoid name collision from the very beginning.

2.  Same as 1.3, just move max part number detection back to driver side

    This isn't doable because unlike 1.3, 1.4 also takes dynamic partitioning into account.  When inserting into dynamic partitions, we don't know which partition directories will be touched on driver side before issuing the write job.  Checking all partition directories is simply too expensive for tables with thousands of partitions.

3.  Add extra component to output file names to avoid name collision

    This seems to be the only reasonable solution for now.  To be more specific, we need a JOB level unique identifier to identify all write jobs issued by `InsertIntoHadoopFile`.  Notice that TASK level unique identifiers can NOT be used.  Because in this way a speculative task will write to a different output file from the original task.  If both tasks succeed, duplicate output will be left behind.  Currently, the ORC data source adds `System.currentTimeMillis` to the output file name for uniqueness.  This doesn't work because of exactly the same reason.

    That's why this PR adds a job level random UUID in `BaseWriterContainer` (which is used by `InsertIntoHadoopFsRelation` to issue write jobs).  The drawback is that record order is not preserved any more (output files of a later job may be listed before those of a earlier job).  However, we never promise to preserve record order when writing data, and Hive doesn't promise this either because the `_copy_N` trick breaks the order.

Author: Cheng Lian <[email protected]>

Closes apache#6864 from liancheng/spark-8406 and squashes the following commits:

db7a46a [Cheng Lian] More comments
f5c1133 [Cheng Lian] Addresses comments
85c478e [Cheng Lian] Workarounds SPARK-8513
088c76c [Cheng Lian] Adds comment about SPARK-8501
99a5e7e [Cheng Lian] Uses job level UUID in SimpleTextRelation and avoids double task abortion
4088226 [Cheng Lian] Works around SPARK-8501
1d7d206 [Cheng Lian] Adds more logs
8966bbb [Cheng Lian] Fixes Scala style issue
18b7003 [Cheng Lian] Uses job level UUID to take speculative tasks into account
3806190 [Cheng Lian] Lets TestHive use all cores by default
748dbd7 [Cheng Lian] Adding UUID to output file name to avoid accidental overwriting
  • Loading branch information
liancheng authored and animesh committed Jun 25, 2015
1 parent 0dbf106 commit 2e1bf12
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.{Logging, SparkException, Partition => SparkPartition}
import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}

private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
Expand All @@ -60,50 +60,21 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
extends OutputWriter {

private val recordWriter: RecordWriter[Void, InternalRow] = {
val conf = context.getConfiguration
val outputFormat = {
// When appending new Parquet files to an existing Parquet file directory, to avoid
// overwriting existing data files, we need to find out the max task ID encoded in these data
// file names.
// TODO Make this snippet a utility function for other data source developers
val maxExistingTaskId = {
// Note that `path` may point to a temporary location. Here we retrieve the real
// destination path from the configuration
val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
val fs = outputPath.getFileSystem(conf)

if (fs.exists(outputPath)) {
// Pattern used to match task ID in part file names, e.g.:
//
// part-r-00001.gz.parquet
// ^~~~~
val partFilePattern = """part-.-(\d{1,}).*""".r

fs.listStatus(outputPath).map(_.getPath.getName).map {
case partFilePattern(id) => id.toInt
case name if name.startsWith("_") => 0
case name if name.startsWith(".") => 0
case name => throw new AnalysisException(
s"Trying to write Parquet files to directory $outputPath, " +
s"but found items with illegal name '$name'.")
}.reduceOption(_ max _).getOrElse(0)
} else {
0
}
}

new ParquetOutputFormat[InternalRow]() {
// Here we override `getDefaultWorkFile` for two reasons:
//
// 1. To allow appending. We need to generate output file name based on the max available
// task ID computed above.
// 1. To allow appending. We need to generate unique output file names to avoid
// overwriting existing files (either exist before the write job, or are just written
// by other tasks within the same write job).
//
// 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses
// `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
// partitions in the case of dynamic partitioning.
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
new Path(path, f"part-r-$split%05d$extension")
val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
val split = context.getTaskAttemptID.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
}
}
}
Expand Down
64 changes: 51 additions & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.spark.sql.sources

import java.util.Date
import java.util.{Date, UUID}

import scala.collection.mutable

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter => MapReduceFileOutputCommitter}
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}

import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
Expand Down Expand Up @@ -59,6 +58,28 @@ private[sql] case class InsertIntoDataSource(
}
}

/**
* A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
* Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a
* single write job, and owns a UUID that identifies this job. Each concrete implementation of
* [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
* each task output file. This UUID is passed to executor side via a property named
* `spark.sql.sources.writeJobUUID`.
*
* Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
* are used to write to normal tables and tables with dynamic partitions.
*
* Basic work flow of this command is:
*
* 1. Driver side setup, including output committer initialization and data source specific
* preparation work for the write job to be issued.
* 2. Issues a write job consists of one or more executor side tasks, each of which writes all
* rows within an RDD partition.
* 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
* exception is thrown during task commitment, also aborts that task.
* 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
* thrown during job commitment, also aborts the job.
*/
private[sql] case class InsertIntoHadoopFsRelation(
@transient relation: HadoopFsRelation,
@transient query: LogicalPlan,
Expand Down Expand Up @@ -261,7 +282,14 @@ private[sql] abstract class BaseWriterContainer(
with Logging
with Serializable {

protected val serializableConf = new SerializableConfiguration(ContextUtil.getConfiguration(job))
protected val serializableConf = new SerializableConfiguration(job.getConfiguration)

// This UUID is used to avoid output file name collision between different appending write jobs.
// These jobs may belong to different SparkContext instances. Concrete data source implementations
// may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
// The reason why this ID is used to identify a job rather than a single task output file is
// that, speculative tasks must generate the same output file name as the original task.
private val uniqueWriteJobId = UUID.randomUUID()

// This is only used on driver side.
@transient private val jobContext: JobContext = job
Expand Down Expand Up @@ -290,6 +318,11 @@ private[sql] abstract class BaseWriterContainer(
setupIDs(0, 0, 0)
setupConf()

// This UUID is sent to executor side together with the serialized `Configuration` object within
// the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
// unique task output files.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)

// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
// configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
Expand Down Expand Up @@ -417,15 +450,16 @@ private[sql] class DefaultWriterContainer(
assert(writer != null, "OutputWriter instance should have been initialized")
writer.close()
super.commitTask()
} catch {
case cause: Throwable =>
super.abortTask()
throw new RuntimeException("Failed to commit task", cause)
} catch { case cause: Throwable =>
// This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will
// cause `abortTask()` to be invoked.
throw new RuntimeException("Failed to commit task", cause)
}
}

override def abortTask(): Unit = {
try {
// It's possible that the task fails before `writer` gets initialized
if (writer != null) {
writer.close()
}
Expand Down Expand Up @@ -469,21 +503,25 @@ private[sql] class DynamicPartitionWriterContainer(
})
}

override def commitTask(): Unit = {
try {
private def clearOutputWriters(): Unit = {
if (outputWriters.nonEmpty) {
outputWriters.values.foreach(_.close())
outputWriters.clear()
}
}

override def commitTask(): Unit = {
try {
clearOutputWriters()
super.commitTask()
} catch { case cause: Throwable =>
super.abortTask()
throw new RuntimeException("Failed to commit task", cause)
}
}

override def abortTask(): Unit = {
try {
outputWriters.values.foreach(_.close())
outputWriters.clear()
clearOutputWriters()
} finally {
super.abortTask()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.hive.HiveMetastoreTypes
import org.apache.spark.sql.types.StructType

private[orc] object OrcFileOperator extends Logging{
private[orc] object OrcFileOperator extends Logging {
def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
val conf = config.getOrElse(new Configuration)
val fspath = new Path(pathStr)
val fs = fspath.getFileSystem(conf)
val orcFiles = listOrcFiles(pathStr, conf)

logDebug(s"Creating ORC Reader from ${orcFiles.head}")
// TODO Need to consider all files when schema evolution is taken into account.
OrcFile.createReader(fs, orcFiles.head)
}
Expand All @@ -42,6 +42,7 @@ private[orc] object OrcFileOperator extends Logging{
val reader = getFileReader(path, conf)
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
logDebug(s"Reading schema from file $path, got Hive schema string: $schema")
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
}

Expand All @@ -52,14 +53,14 @@ private[orc] object OrcFileOperator extends Logging{
def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
val fs = origPath.getFileSystem(conf)
val path = origPath.makeQualified(fs)
val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
.filterNot(_.isDir)
.map(_.getPath)
.filterNot(_.getName.startsWith("_"))
.filterNot(_.getName.startsWith("."))

if (paths == null || paths.size == 0) {
if (paths == null || paths.isEmpty) {
throw new IllegalArgumentException(
s"orcFileOperator: path $path does not have valid orc files matching the pattern")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, Reco
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}

import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.{HadoopRDD, RDD}
Expand All @@ -39,7 +40,6 @@ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreType
import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{Logging}
import org.apache.spark.util.SerializableConfiguration

/* Implicit conversions */
Expand Down Expand Up @@ -105,8 +105,9 @@ private[orc] class OrcOutputWriter(
recordWriterInstantiated = true

val conf = context.getConfiguration
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
val partition = context.getTaskAttemptID.getTaskID.getId
val filename = f"part-r-$partition%05d-${System.currentTimeMillis}%015d.orc"
val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"

new OrcOutputFormat().getRecordWriter(
new Path(path, filename).getFileSystem(conf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import scala.collection.JavaConversions._
object TestHive
extends TestHiveContext(
new SparkContext(
System.getProperty("spark.sql.test.master", "local[2]"),
System.getProperty("spark.sql.test.master", "local[32]"),
"TestSQLContext",
new SparkConf()
.set("spark.sql.test", "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,14 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
orcTableDir.mkdir()
import org.apache.spark.sql.hive.test.TestHive.implicits._

// Originally we were using a 10-row RDD for testing. However, when default parallelism is
// greater than 10 (e.g., running on a node with 32 cores), this RDD contains empty partitions,
// which result in empty ORC files. Unfortunately, ORC doesn't handle empty files properly and
// causes build failure on Jenkins, which happens to have 32 cores. Please refer to SPARK-8501
// for more details. To workaround this issue before fixing SPARK-8501, we simply increase row
// number in this RDD to avoid empty partitions.
sparkContext
.makeRDD(1 to 10)
.makeRDD(1 to 100)
.map(i => OrcData(i, s"part-$i"))
.toDF()
.registerTempTable(s"orc_temp_table")
Expand All @@ -70,43 +76,43 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
}

test("create temporary orc table") {
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100))

checkAnswer(
sql("SELECT * FROM normal_orc_source"),
(1 to 10).map(i => Row(i, s"part-$i")))
(1 to 100).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT * FROM normal_orc_source where intField > 5"),
(6 to 10).map(i => Row(i, s"part-$i")))
(6 to 100).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
(1 to 10).map(i => Row(1, s"part-$i")))
(1 to 100).map(i => Row(1, s"part-$i")))
}

test("create temporary orc table as") {
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100))

checkAnswer(
sql("SELECT * FROM normal_orc_source"),
(1 to 10).map(i => Row(i, s"part-$i")))
(1 to 100).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
(6 to 10).map(i => Row(i, s"part-$i")))
(6 to 100).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
(1 to 10).map(i => Row(1, s"part-$i")))
(1 to 100).map(i => Row(1, s"part-$i")))
}

test("appending insert") {
sql("INSERT INTO TABLE normal_orc_source SELECT * FROM orc_temp_table WHERE intField > 5")

checkAnswer(
sql("SELECT * FROM normal_orc_source"),
(1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
(1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i =>
Seq.fill(2)(Row(i, s"part-$i"))
})
}
Expand All @@ -119,7 +125,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {

checkAnswer(
sql("SELECT * FROM normal_orc_as_source"),
(6 to 10).map(i => Row(i, s"part-$i")))
(6 to 100).map(i => Row(i, s"part-$i")))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
numberFormat.setGroupingUsed(false)

override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
val split = context.getTaskAttemptID.getTaskID.getId
val name = FileOutputFormat.getOutputName(context)
new Path(outputFile, s"$name-${numberFormat.format(split)}-${UUID.randomUUID()}")
new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
}
}

Expand Down Expand Up @@ -156,6 +157,7 @@ class CommitFailureTestRelation(
context: TaskAttemptContext): OutputWriter = {
new SimpleTextOutputWriter(path, context) {
override def close(): Unit = {
super.close()
sys.error("Intentional task commitment failure for testing purpose.")
}
}
Expand Down
Loading

0 comments on commit 2e1bf12

Please sign in to comment.