Skip to content

Commit

Permalink
[SPARK-20338]Spaces in spark.eventLog.dir are not correctly handled
Browse files Browse the repository at this point in the history
  • Loading branch information
zuotingbing committed Apr 18, 2017
1 parent 7536e28 commit 4b203f1
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 30 deletions.
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class SparkContext(config: SparkConf) extends Logging {
* ------------------------------------------------------------------------------------- */

private var _conf: SparkConf = _
private var _eventLogDir: Option[URI] = None
private var _eventLogDir: Option[String] = None
private var _eventLogCodec: Option[String] = None
private var _env: SparkEnv = _
private var _jobProgressListener: JobProgressListener = _
Expand Down Expand Up @@ -236,7 +236,7 @@ class SparkContext(config: SparkConf) extends Logging {
def appName: String = _conf.get("spark.app.name")

private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
private[spark] def eventLogDir: Option[URI] = _eventLogDir
private[spark] def eventLogDir: Option[String] = _eventLogDir
private[spark] def eventLogCodec: Option[String] = _eventLogCodec

def isLocal: Boolean = Utils.isLocalMaster(_conf)
Expand Down Expand Up @@ -405,9 +405,7 @@ class SparkContext(config: SparkConf) extends Logging {

_eventLogDir =
if (isEventLogEnabled) {
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
.stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

package org.apache.spark.deploy

import java.net.URI

private[spark] case class ApplicationDescription(
name: String,
maxCores: Option[Int],
memoryPerExecutorMB: Int,
command: Command,
appUiUrl: String,
eventLogDir: Option[URI] = None,
eventLogDir: Option[String] = None,
// short name of compression codec used when writing event logs, if any (e.g. lzf)
eventLogCodec: Option[String] = None,
coresPerExecutor: Option[Int] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.scheduler

import java.io._
import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.Locale

Expand Down Expand Up @@ -50,22 +49,22 @@ import org.apache.spark.util.{JsonProtocol, Utils}
private[spark] class EventLoggingListener(
appId: String,
appAttemptId : Option[String],
logBaseDir: URI,
logBaseDir: String,
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SparkListener with Logging {

import EventLoggingListener._

def this(appId: String, appAttemptId : Option[String], logBaseDir: URI, sparkConf: SparkConf) =
def this(appId: String, appAttemptId : Option[String], logBaseDir: String, sparkConf: SparkConf) =
this(appId, appAttemptId, logBaseDir, sparkConf,
SparkHadoopUtil.get.newConfiguration(sparkConf))

private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
private val fileSystem = new Path(logBaseDir).getFileSystem(hadoopConf)
private val compressionCodec =
if (shouldCompress) {
Some(CompressionCodec.createCodec(sparkConf))
Expand Down Expand Up @@ -96,8 +95,8 @@ private[spark] class EventLoggingListener(
}

val workingPath = logPath + IN_PROGRESS
val uri = new URI(workingPath)
val path = new Path(workingPath)
val uri = path.toUri
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"

Expand Down Expand Up @@ -303,11 +302,11 @@ private[spark] object EventLoggingListener extends Logging {
* @return A path which consists of file-system-safe characters.
*/
def getLogPath(
logBaseDir: URI,
logBaseDir: String,
appId: String,
appAttemptId: Option[String],
compressionCodecName: Option[String] = None): String = {
val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId)
val base = logBaseDir.stripSuffix("/") + "/" + sanitize(appId)
val codec = compressionCodecName.map("." + _).getOrElse("")
if (appAttemptId.isDefined) {
base + "_" + sanitize(appAttemptId.get) + codec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
inProgress: Boolean,
codec: Option[String] = None): File = {
val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId)
val logPath = new URI(logUri).getPath + ip
val logPath = EventLoggingListener.getLogPath(testDir.getAbsolutePath, appId, appAttemptId) + ip
new File(logPath)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
private var testDirPath: Path = _

before {
testDir = Utils.createTempDir()
testDir = Utils.createTempDir(namePrefix = s"event log")
testDir.deleteOnExit()
testDirPath = new Path(testDir.getAbsolutePath())
}
Expand All @@ -62,7 +62,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
test("Verify log file exist") {
// Verify logging directory exists
val conf = getLoggingConf(testDirPath)
val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
val eventLogger = new EventLoggingListener("test", None, testDirPath.toString, conf)
eventLogger.start()

val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
Expand Down Expand Up @@ -100,16 +100,15 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
val secretPassword = "secret_password"
val conf = getLoggingConf(testDirPath, None)
.set(key, secretPassword)
val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
val eventLogger = new EventLoggingListener("test", None, testDirPath.toString, conf)
val envDetails = SparkEnv.environmentDetails(conf, "FIFO", Seq.empty, Seq.empty)
val event = SparkListenerEnvironmentUpdate(envDetails)
val redactedProps = eventLogger.redactEvent(event).environmentDetails("Spark Properties").toMap
assert(redactedProps(key) == "*********(redacted)")
}

test("Log overwriting") {
val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", None)
val logPath = new URI(logUri).getPath
val logPath = EventLoggingListener.getLogPath(testDir.toString, "test", None)
// Create file before writing the event log
new FileOutputStream(new File(logPath)).close()
// Expected IOException, since we haven't enabled log overwrite.
Expand All @@ -119,7 +118,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
}

test("Event log name") {
val baseDirUri = Utils.resolveURI("/base-dir")
val baseDirUri = "/base-dir"
// without compression
assert(s"${baseDirUri.toString}/app1" === EventLoggingListener.getLogPath(
baseDirUri, "app1", None))
Expand Down Expand Up @@ -154,7 +153,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
val conf = getLoggingConf(testDirPath, compressionCodec)
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toString, conf)
val listenerBus = new LiveListenerBus(sc)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey", None)
Expand Down Expand Up @@ -190,15 +189,12 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
* This runs a simple Spark job and asserts that the expected events are logged when expected.
*/
private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
// Set defaultFS to something that would cause an exception, to make sure we don't run
// into SPARK-6688.
val conf = getLoggingConf(testDirPath, compressionCodec)
.set("spark.hadoop.fs.defaultFS", "unsupported://example.com")
sc = new SparkContext("local-cluster[2,2,1024]", "test", conf)
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
val eventLogPath = eventLogger.logPath
val expectedLogDir = testDir.toURI()
val expectedLogDir = testDir.getAbsolutePath
assert(eventLogPath === EventLoggingListener.getLogPath(
expectedLogDir, sc.applicationId, None, compressionCodec.map(CompressionCodec.getShortName)))

Expand Down Expand Up @@ -290,7 +286,7 @@ object EventLoggingListenerSuite {
val conf = new SparkConf
conf.set("spark.eventLog.enabled", "true")
conf.set("spark.eventLog.testing", "true")
conf.set("spark.eventLog.dir", logDir.toUri.toString)
conf.set("spark.eventLog.dir", logDir.toString)
compressionCodec.foreach { codec =>
conf.set("spark.eventLog.compress", "true")
conf.set("spark.io.compression.codec", codec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
* log the events.
*/
private class EventMonster(conf: SparkConf)
extends EventLoggingListener("test", None, new URI("testdir"), conf) {
extends EventLoggingListener("test", None, "test dir", conf) {

override def start() { }

Expand Down

0 comments on commit 4b203f1

Please sign in to comment.