Skip to content

Commit

Permalink
[SPARK-7842] [SQL] Makes task committing/aborting in InsertIntoHadoop…
Browse files Browse the repository at this point in the history
…FsRelation more robust

When committing/aborting a write task issued in `InsertIntoHadoopFsRelation`, if an exception is thrown from `OutputWriter.close()`, the committing/aborting process will be interrupted, and leaves messy stuff behind (e.g., the `_temporary` directory created by `FileOutputCommitter`).

This PR makes these two process more robust by catching potential exceptions and falling back to normal task committment/abort.

Author: Cheng Lian <[email protected]>

Closes #6378 from liancheng/spark-7838 and squashes the following commits:

f18253a [Cheng Lian] Makes task committing/aborting in InsertIntoHadoopFsRelation more robust
  • Loading branch information
liancheng committed May 25, 2015
1 parent bfeedc6 commit 8af1bf1
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,22 @@ private[sql] class DefaultWriterContainer(
override def outputWriterForRow(row: Row): OutputWriter = writer

override def commitTask(): Unit = {
writer.close()
super.commitTask()
try {
writer.close()
super.commitTask()
} catch {
case cause: Throwable =>
super.abortTask()
throw new RuntimeException("Failed to commit task", cause)
}
}

override def abortTask(): Unit = {
writer.close()
super.abortTask()
try {
writer.close()
} finally {
super.abortTask()
}
}
}

Expand Down Expand Up @@ -422,13 +431,21 @@ private[sql] class DynamicPartitionWriterContainer(
}

override def commitTask(): Unit = {
outputWriters.values.foreach(_.close())
super.commitTask()
try {
outputWriters.values.foreach(_.close())
super.commitTask()
} catch { case cause: Throwable =>
super.abortTask()
throw new RuntimeException("Failed to commit task", cause)
}
}

override def abortTask(): Unit = {
outputWriters.values.foreach(_.close())
super.abortTask()
try {
outputWriters.values.foreach(_.close())
} finally {
super.abortTask()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Row, SQLContext}

/**
Expand Down Expand Up @@ -67,7 +67,9 @@ class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends
recordWriter.write(null, new Text(serialized))
}

override def close(): Unit = recordWriter.close(context)
override def close(): Unit = {
recordWriter.close(context)
}
}

/**
Expand Down Expand Up @@ -120,3 +122,39 @@ class SimpleTextRelation(
}
}
}

/**
* A simple example [[HadoopFsRelationProvider]].
*/
class CommitFailureTestSource extends HadoopFsRelationProvider {
override def createRelation(
sqlContext: SQLContext,
paths: Array[String],
schema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): HadoopFsRelation = {
new CommitFailureTestRelation(paths, schema, partitionColumns, parameters)(sqlContext)
}
}

class CommitFailureTestRelation(
override val paths: Array[String],
maybeDataSchema: Option[StructType],
override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
@transient sqlContext: SQLContext)
extends SimpleTextRelation(
paths, maybeDataSchema, userDefinedPartitionColumns, parameters)(sqlContext) {
override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory {
override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
new SimpleTextOutputWriter(path, context) {
override def close(): Unit = {
sys.error("Intentional task commitment failure for testing purpose.")
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.sql.sources

import org.apache.hadoop.fs.Path
import org.scalatest.FunSuite

import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.hive.test.TestHive
Expand Down Expand Up @@ -477,6 +479,26 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}

class CommitFailureTestRelationSuite extends FunSuite with SQLTestUtils {
import TestHive.implicits._

override val sqlContext = TestHive

val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName

test("SPARK-7684: commitTask() failure should fallback to abortTask()") {
withTempPath { file =>
val df = (1 to 3).map(i => i -> s"val_$i").toDF("a", "b")
intercept[SparkException] {
df.write.format(dataSourceName).save(file.getCanonicalPath)
}

val fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf)
assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary")))
}
}
}

class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName

Expand Down

0 comments on commit 8af1bf1

Please sign in to comment.