diff --git a/src/main/scala/org/apache/spark/shuffle/sort/S3ShuffleManager.scala b/src/main/scala/org/apache/spark/shuffle/sort/S3ShuffleManager.scala index 2c111bb..82be178 100644 --- a/src/main/scala/org/apache/spark/shuffle/sort/S3ShuffleManager.scala +++ b/src/main/scala/org/apache/spark/shuffle/sort/S3ShuffleManager.scala @@ -30,6 +30,7 @@ import org.apache.spark.shuffle.api.ShuffleExecutorComponents import org.apache.spark.shuffle.helper.{S3ShuffleDispatcher, S3ShuffleHelper} import org.apache.spark.storage.S3ShuffleReader +import java.io.IOException import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.ExecutionContext.Implicits.global @@ -155,11 +156,18 @@ private[spark] class S3ShuffleManager(conf: SparkConf) extends ShuffleManager wi Range(0, dispatcher.folderPrefixes).flatMap(idx => { val path = new Path(f"${dispatcher.rootDir}/${idx}${dispatcher.appDir}") - dispatcher.fs.listStatus(path, shuffleIdFilter).map(f => { - Future { - dispatcher.fs.delete(f.getPath, false) + try { + dispatcher.fs.listStatus(path, shuffleIdFilter).map(f => { + Future { + dispatcher.fs.delete(f.getPath, false) + } + }) + } catch { + case _: IOException => { + logDebug(s"Unable to delete ${path.getName}") + List() } - }) + } }).foreach(Await.result(_, Duration.Inf)) } true