diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 99efc4893fda4..668bcd5d19470 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -193,7 +193,7 @@ class SparkContext(config: SparkConf) extends Logging { * ------------------------------------------------------------------------------------- */ private var _conf: SparkConf = _ - private var _eventLogDir: Option[URI] = None + private var _eventLogDir: Option[String] = None private var _eventLogCodec: Option[String] = None private var _env: SparkEnv = _ private var _jobProgressListener: JobProgressListener = _ @@ -236,7 +236,7 @@ class SparkContext(config: SparkConf) extends Logging { def appName: String = _conf.get("spark.app.name") private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false) - private[spark] def eventLogDir: Option[URI] = _eventLogDir + private[spark] def eventLogDir: Option[String] = _eventLogDir private[spark] def eventLogCodec: Option[String] = _eventLogCodec def isLocal: Boolean = Utils.isLocalMaster(_conf) @@ -405,9 +405,7 @@ class SparkContext(config: SparkConf) extends Logging { _eventLogDir = if (isEventLogEnabled) { - val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR) - .stripSuffix("/") - Some(Utils.resolveURI(unresolvedDir)) + Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/")) } else { None } diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index c5c5c60923f4e..e99b1e53bea73 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -17,15 +17,13 @@ package org.apache.spark.deploy -import java.net.URI - private[spark] case class ApplicationDescription( name: String, maxCores: Option[Int], memoryPerExecutorMB: Int, command: Command, appUiUrl: String, - eventLogDir: Option[URI] = None, + eventLogDir: Option[String] = None, // short name of compression codec used when writing event logs, if any (e.g. lzf) eventLogCodec: Option[String] = None, coresPerExecutor: Option[Int] = None, diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index aecb3a980e7c1..1b68d33891039 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -18,7 +18,6 @@ package org.apache.spark.scheduler import java.io._ -import java.net.URI import java.nio.charset.StandardCharsets import java.util.Locale @@ -50,14 +49,14 @@ import org.apache.spark.util.{JsonProtocol, Utils} private[spark] class EventLoggingListener( appId: String, appAttemptId : Option[String], - logBaseDir: URI, + logBaseDir: String, sparkConf: SparkConf, hadoopConf: Configuration) extends SparkListener with Logging { import EventLoggingListener._ - def this(appId: String, appAttemptId : Option[String], logBaseDir: URI, sparkConf: SparkConf) = + def this(appId: String, appAttemptId : Option[String], logBaseDir: String, sparkConf: SparkConf) = this(appId, appAttemptId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf)) @@ -65,7 +64,7 @@ private[spark] class EventLoggingListener( private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 - private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + private val fileSystem = new Path(logBaseDir).getFileSystem(hadoopConf) private val compressionCodec = if (shouldCompress) { Some(CompressionCodec.createCodec(sparkConf)) @@ -96,8 +95,8 @@ private[spark] class EventLoggingListener( } val workingPath = logPath + IN_PROGRESS - val uri = new URI(workingPath) val path = new Path(workingPath) + val uri = path.toUri val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme val isDefaultLocal = defaultFs == null || defaultFs == "file" @@ -303,11 +302,11 @@ private[spark] object EventLoggingListener extends Logging { * @return A path which consists of file-system-safe characters. */ def getLogPath( - logBaseDir: URI, + logBaseDir: String, appId: String, appAttemptId: Option[String], compressionCodecName: Option[String] = None): String = { - val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId) + val base = logBaseDir.stripSuffix("/") + "/" + sanitize(appId) val codec = compressionCodecName.map("." + _).getOrElse("") if (appAttemptId.isDefined) { base + "_" + sanitize(appAttemptId.get) + codec diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index ec580a44b8e76..e7a5afacaaf9b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -61,8 +61,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc inProgress: Boolean, codec: Option[String] = None): File = { val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else "" - val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId) - val logPath = new URI(logUri).getPath + ip + val logPath = EventLoggingListener.getLogPath(testDir.getAbsolutePath, appId, appAttemptId) + ip new File(logPath) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 4c3d0b102152c..727a804860aab 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -50,7 +50,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit private var testDirPath: Path = _ before { - testDir = Utils.createTempDir() + testDir = Utils.createTempDir(namePrefix = s"event log") testDir.deleteOnExit() testDirPath = new Path(testDir.getAbsolutePath()) } @@ -62,7 +62,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit test("Verify log file exist") { // Verify logging directory exists val conf = getLoggingConf(testDirPath) - val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf) + val eventLogger = new EventLoggingListener("test", None, testDirPath.toString, conf) eventLogger.start() val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS) @@ -100,7 +100,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit val secretPassword = "secret_password" val conf = getLoggingConf(testDirPath, None) .set(key, secretPassword) - val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf) + val eventLogger = new EventLoggingListener("test", None, testDirPath.toString, conf) val envDetails = SparkEnv.environmentDetails(conf, "FIFO", Seq.empty, Seq.empty) val event = SparkListenerEnvironmentUpdate(envDetails) val redactedProps = eventLogger.redactEvent(event).environmentDetails("Spark Properties").toMap @@ -108,8 +108,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } test("Log overwriting") { - val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", None) - val logPath = new URI(logUri).getPath + val logPath = EventLoggingListener.getLogPath(testDir.toString, "test", None) // Create file before writing the event log new FileOutputStream(new File(logPath)).close() // Expected IOException, since we haven't enabled log overwrite. @@ -119,7 +118,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } test("Event log name") { - val baseDirUri = Utils.resolveURI("/base-dir") + val baseDirUri = "/base-dir" // without compression assert(s"${baseDirUri.toString}/app1" === EventLoggingListener.getLogPath( baseDirUri, "app1", None)) @@ -154,7 +153,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit val conf = getLoggingConf(testDirPath, compressionCodec) extraConf.foreach { case (k, v) => conf.set(k, v) } val logName = compressionCodec.map("test-" + _).getOrElse("test") - val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) + val eventLogger = new EventLoggingListener(logName, None, testDirPath.toString, conf) val listenerBus = new LiveListenerBus(sc) val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, 125L, "Mickey", None) @@ -190,15 +189,12 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit * This runs a simple Spark job and asserts that the expected events are logged when expected. */ private def testApplicationEventLogging(compressionCodec: Option[String] = None) { - // Set defaultFS to something that would cause an exception, to make sure we don't run - // into SPARK-6688. val conf = getLoggingConf(testDirPath, compressionCodec) - .set("spark.hadoop.fs.defaultFS", "unsupported://example.com") sc = new SparkContext("local-cluster[2,2,1024]", "test", conf) assert(sc.eventLogger.isDefined) val eventLogger = sc.eventLogger.get val eventLogPath = eventLogger.logPath - val expectedLogDir = testDir.toURI() + val expectedLogDir = testDir.getAbsolutePath assert(eventLogPath === EventLoggingListener.getLogPath( expectedLogDir, sc.applicationId, None, compressionCodec.map(CompressionCodec.getShortName))) @@ -290,7 +286,7 @@ object EventLoggingListenerSuite { val conf = new SparkConf conf.set("spark.eventLog.enabled", "true") conf.set("spark.eventLog.testing", "true") - conf.set("spark.eventLog.dir", logDir.toUri.toString) + conf.set("spark.eventLog.dir", logDir.toString) compressionCodec.foreach { codec => conf.set("spark.eventLog.compress", "true") conf.set("spark.io.compression.codec", codec) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 1732aca9417ea..7a8da001f5a6c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -151,7 +151,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp * log the events. */ private class EventMonster(conf: SparkConf) - extends EventLoggingListener("test", None, new URI("testdir"), conf) { + extends EventLoggingListener("test", None, "test dir", conf) { override def start() { }