Skip to content

Commit

Permalink
Merge branch 'ESPARK-67' into 'spark_2.1'
Browse files Browse the repository at this point in the history
[ESPARK-67] Executor hang when updateDependencies failed

resolve apache#67  
给下载依赖文件设置一个超时时间

See merge request !53
  • Loading branch information
cenyuhai committed Aug 4, 2017
2 parents e401c08 + 441c436 commit 683210d
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.io.Source
import scala.reflect.ClassTag
import scala.util.Try
Expand Down Expand Up @@ -628,6 +630,8 @@ private[spark] object Utils extends Logging {
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false)
val timeoutMs =
conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
Option(uri.getScheme).getOrElse("file") match {
case "spark" =>
if (SparkEnv.get == null) {
Expand All @@ -636,7 +640,10 @@ private[spark] object Utils extends Logging {
}
val source = SparkEnv.get.rpcEnv.openChannel(url)
val is = Channels.newInputStream(source)
downloadFile(url, is, targetFile, fileOverwrite)
val f = Future {
downloadFile(url, is, targetFile, fileOverwrite)
}
ThreadUtils.awaitResult(f, Duration(timeoutMs, TimeUnit.MILLISECONDS))
case "http" | "https" | "ftp" =>
var uc: URLConnection = null
if (securityMgr.isAuthenticationEnabled()) {
Expand All @@ -649,9 +656,6 @@ private[spark] object Utils extends Logging {
uc = new URL(url).openConnection()
}
Utils.setupSecureURLConnection(uc, securityMgr)

val timeoutMs =
conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
uc.setConnectTimeout(timeoutMs)
uc.setReadTimeout(timeoutMs)
uc.connect()
Expand Down

0 comments on commit 683210d

Please sign in to comment.