Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
pspoerri committed Jul 14, 2023
1 parent 1c4af44 commit 15998d3
Showing 1 changed file with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.shuffle.api.ShuffleExecutorComponents
import org.apache.spark.shuffle.helper.{S3ShuffleDispatcher, S3ShuffleHelper}
import org.apache.spark.storage.S3ShuffleReader

import java.io.IOException
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
Expand Down Expand Up @@ -155,11 +156,18 @@ private[spark] class S3ShuffleManager(conf: SparkConf) extends ShuffleManager wi

Range(0, dispatcher.folderPrefixes).flatMap(idx => {
val path = new Path(f"${dispatcher.rootDir}/${idx}${dispatcher.appDir}")
dispatcher.fs.listStatus(path, shuffleIdFilter).map(f => {
Future {
dispatcher.fs.delete(f.getPath, false)
try {
dispatcher.fs.listStatus(path, shuffleIdFilter).map(f => {
Future {
dispatcher.fs.delete(f.getPath, false)
}
})
} catch {
case _: IOException => {
logDebug(s"Unable to delete ${path.getName}")
List()
}
})
}
}).foreach(Await.result(_, Duration.Inf))
}
true
Expand Down

0 comments on commit 15998d3

Please sign in to comment.