Skip to content

Commit

Permalink
Merge branch 'master' into join
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Apr 18, 2018
2 parents 561db44 + 1e3b876 commit 5e41208
Show file tree
Hide file tree
Showing 48 changed files with 1,614 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val IGNORE_MISSING_FILES = ConfigBuilder("spark.files.ignoreMissingFiles")
.doc("Whether to ignore missing files. If true, the Spark jobs will continue to run when " +
"encountering missing files and the contents that have been read will still be returned.")
.booleanConf
.createWithDefault(false)

private[spark] val APP_CALLER_CONTEXT = ConfigBuilder("spark.log.callerContext")
.stringConf
.createOptional
Expand Down
43 changes: 32 additions & 11 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.rdd

import java.io.IOException
import java.io.{FileNotFoundException, IOException}
import java.text.SimpleDateFormat
import java.util.{Date, Locale}

Expand All @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.mapred._
import org.apache.hadoop.mapred.lib.CombineFileSplit
import org.apache.hadoop.mapreduce.TaskType
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.util.ReflectionUtils

import org.apache.spark._
Expand Down Expand Up @@ -134,6 +135,8 @@ class HadoopRDD[K, V](

private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)

private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)

private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
Expand Down Expand Up @@ -197,17 +200,24 @@ class HadoopRDD[K, V](
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0)
} else {
allInputSplits
}
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
try {
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0)
} else {
allInputSplits
}
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
} catch {
case e: InvalidInputException if ignoreMissingFiles =>
logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
s" partitions returned from this path.", e)
Array.empty[Partition]
}
array
}

override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
Expand Down Expand Up @@ -256,6 +266,12 @@ class HadoopRDD[K, V](
try {
inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${split.inputSplit}", e)
finished = true
null
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
finished = true
Expand All @@ -276,6 +292,11 @@ class HadoopRDD[K, V](
try {
finished = !reader.next(key, value)
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${split.inputSplit}", e)
finished = true
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
finished = true
Expand Down
45 changes: 33 additions & 12 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.rdd

import java.io.IOException
import java.io.{FileNotFoundException, IOException}
import java.text.SimpleDateFormat
import java.util.{Date, Locale}

Expand All @@ -28,7 +28,7 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileInputFormat, FileSplit, InvalidInputException}
import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}

import org.apache.spark._
Expand Down Expand Up @@ -90,6 +90,8 @@ class NewHadoopRDD[K, V](

private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)

private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)

private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)

def getConf: Configuration = {
Expand Down Expand Up @@ -124,17 +126,25 @@ class NewHadoopRDD[K, V](
configurable.setConf(_conf)
case _ =>
}
val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala
val rawSplits = if (ignoreEmptySplits) {
allRowSplits.filter(_.getLength > 0)
} else {
allRowSplits
}
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
try {
val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala
val rawSplits = if (ignoreEmptySplits) {
allRowSplits.filter(_.getLength > 0)
} else {
allRowSplits
}
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
result(i) =
new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
result
} catch {
case e: InvalidInputException if ignoreMissingFiles =>
logWarning(s"${_conf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
s" partitions returned from this path.", e)
Array.empty[Partition]
}
result
}

override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
Expand Down Expand Up @@ -189,6 +199,12 @@ class NewHadoopRDD[K, V](
_reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
_reader
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${split.serializableHadoopSplit}", e)
finished = true
null
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",
Expand All @@ -213,6 +229,11 @@ class NewHadoopRDD[K, V](
try {
finished = !reader.nextKeyValue
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${split.serializableHadoopSplit}", e)
finished = true
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",
Expand Down
33 changes: 18 additions & 15 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1092,17 +1092,16 @@ class DAGScheduler(
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)

val debugString = stage match {
stage match {
case stage: ShuffleMapStage =>
s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})"
logDebug(s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})")
markMapStageJobsAsFinished(stage)
case stage : ResultStage =>
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
}
logDebug(debugString)

submitWaitingChildStages(stage)
}
}
Expand Down Expand Up @@ -1307,13 +1306,7 @@ class DAGScheduler(
shuffleStage.findMissingPartitions().mkString(", "))
submitStage(shuffleStage)
} else {
// Mark any map-stage jobs waiting on this stage as finished
if (shuffleStage.mapStageJobs.nonEmpty) {
val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
for (job <- shuffleStage.mapStageJobs) {
markMapStageJobAsFinished(job, stats)
}
}
markMapStageJobsAsFinished(shuffleStage)
submitWaitingChildStages(shuffleStage)
}
}
Expand Down Expand Up @@ -1433,6 +1426,16 @@ class DAGScheduler(
}
}

private[scheduler] def markMapStageJobsAsFinished(shuffleStage: ShuffleMapStage): Unit = {
// Mark any map-stage jobs waiting on this stage as finished
if (shuffleStage.isAvailable && shuffleStage.mapStageJobs.nonEmpty) {
val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
for (job <- shuffleStage.mapStageJobs) {
markMapStageJobAsFinished(job, stats)
}
}
}

/**
* Responds to an executor being lost. This is called inside the event loop, so it assumes it can
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
Expand Down
69 changes: 68 additions & 1 deletion core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.zip.GZIPOutputStream

import scala.io.Source

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.DefaultCodec
Expand All @@ -32,7 +33,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}

import org.apache.spark.internal.config._
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -596,4 +597,70 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
actualPartitionNum = 5,
expectedPartitionNum = 2)
}

test("spark.files.ignoreMissingFiles should work both HadoopRDD and NewHadoopRDD") {
// "file not found" can happen both when getPartitions or compute in HadoopRDD/NewHadoopRDD,
// We test both cases here.

val deletedPath = new Path(tempDir.getAbsolutePath, "test-data-1")
val fs = deletedPath.getFileSystem(new Configuration())
fs.delete(deletedPath, true)
intercept[FileNotFoundException](fs.open(deletedPath))

def collectRDDAndDeleteFileBeforeCompute(newApi: Boolean): Array[_] = {
val dataPath = new Path(tempDir.getAbsolutePath, "test-data-2")
val writer = new OutputStreamWriter(new FileOutputStream(new File(dataPath.toString)))
writer.write("hello\n")
writer.write("world\n")
writer.close()
val rdd = if (newApi) {
sc.newAPIHadoopFile(dataPath.toString, classOf[NewTextInputFormat],
classOf[LongWritable], classOf[Text])
} else {
sc.textFile(dataPath.toString)
}
rdd.partitions
fs.delete(dataPath, true)
// Exception happens when initialize record reader in HadoopRDD/NewHadoopRDD.compute
// because partitions' info already cached.
rdd.collect()
}

// collect HadoopRDD and NewHadoopRDD when spark.files.ignoreMissingFiles=false by default.
sc = new SparkContext("local", "test")
intercept[org.apache.hadoop.mapred.InvalidInputException] {
// Exception happens when HadoopRDD.getPartitions
sc.textFile(deletedPath.toString).collect()
}

var e = intercept[SparkException] {
collectRDDAndDeleteFileBeforeCompute(false)
}
assert(e.getCause.isInstanceOf[java.io.FileNotFoundException])

intercept[org.apache.hadoop.mapreduce.lib.input.InvalidInputException] {
// Exception happens when NewHadoopRDD.getPartitions
sc.newAPIHadoopFile(deletedPath.toString, classOf[NewTextInputFormat],
classOf[LongWritable], classOf[Text]).collect
}

e = intercept[SparkException] {
collectRDDAndDeleteFileBeforeCompute(true)
}
assert(e.getCause.isInstanceOf[java.io.FileNotFoundException])

sc.stop()

// collect HadoopRDD and NewHadoopRDD when spark.files.ignoreMissingFiles=true.
val conf = new SparkConf().set(IGNORE_MISSING_FILES, true)
sc = new SparkContext("local", "test", conf)
assert(sc.textFile(deletedPath.toString).collect().isEmpty)

assert(collectRDDAndDeleteFileBeforeCompute(false).isEmpty)

assert(sc.newAPIHadoopFile(deletedPath.toString, classOf[NewTextInputFormat],
classOf[LongWritable], classOf[Text]).collect().isEmpty)

assert(collectRDDAndDeleteFileBeforeCompute(true).isEmpty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2146,6 +2146,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assertDataStructuresEmpty()
}

test("Trigger mapstage's job listener in submitMissingTasks") {
val rdd1 = new MyRDD(sc, 2, Nil)
val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2))
val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker)
val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2))

val listener1 = new SimpleListener
val listener2 = new SimpleListener

submitMapStage(dep1, listener1)
submitMapStage(dep2, listener2)

// Complete the stage0.
assert(taskSets(0).stageId === 0)
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", rdd1.partitions.length)),
(Success, makeMapStatus("hostB", rdd1.partitions.length))))
assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
assert(listener1.results.size === 1)

// When attempting stage1, trigger a fetch failure.
assert(taskSets(1).stageId === 1)
complete(taskSets(1), Seq(
(Success, makeMapStatus("hostC", rdd2.partitions.length)),
(FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null)))
scheduler.resubmitFailedStages()
// Stage1 listener should not have a result yet
assert(listener2.results.size === 0)

// Speculative task succeeded in stage1.
runEvent(makeCompletionEvent(
taskSets(1).tasks(1),
Success,
makeMapStatus("hostD", rdd2.partitions.length)))
// stage1 listener still should not have a result, though there's no missing partitions
// in it. Because stage1 has been failed and is not inside `runningStages` at this moment.
assert(listener2.results.size === 0)

// Stage0 should now be running as task set 2; make its task succeed
assert(taskSets(2).stageId === 0)
complete(taskSets(2), Seq(
(Success, makeMapStatus("hostC", rdd2.partitions.length))))
assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
Set(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))

// After stage0 is finished, stage1 will be submitted and found there is no missing
// partitions in it. Then listener got triggered.
assert(listener2.results.size === 1)
assertDataStructuresEmpty()
}

/**
* In this test, we run a map stage where one of the executors fails but we still receive a
* "zombie" complete message from that executor. We want to make sure the stage is not reported
Expand Down
Loading

0 comments on commit 5e41208

Please sign in to comment.