Skip to content

Commit

Permalink
[SPARK-20338][CORE] Spaces in spark.eventLog.dir are not correctly ha…
Browse files Browse the repository at this point in the history
…ndled

## What changes were proposed in this pull request?

“spark.eventLog.dir” supports with space characters.

1. Update EventLoggingListenerSuite like `testDir = Utils.createTempDir(namePrefix = s"history log")`
2. Fix EventLoggingListenerSuite tests

## How was this patch tested?

update unit tests

Author: zuotingbing <[email protected]>

Closes apache#18285 from zuotingbing/spark-resolveURI.
  • Loading branch information
zuotingbing authored and Marcelo Vanzin committed Jun 16, 2017
1 parent 53e48f7 commit edcb878
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ private[spark] class EventLoggingListener(
}

val workingPath = logPath + IN_PROGRESS
val uri = new URI(workingPath)
val path = new Path(workingPath)
val uri = path.toUri
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"

Expand Down Expand Up @@ -320,7 +320,7 @@ private[spark] object EventLoggingListener extends Logging {
appId: String,
appAttemptId: Option[String],
compressionCodecName: Option[String] = None): String = {
val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId)
val base = new Path(logBaseDir).toString.stripSuffix("/") + "/" + sanitize(appId)
val codec = compressionCodecName.map("." + _).getOrElse("")
if (appAttemptId.isDefined) {
base + "_" + sanitize(appAttemptId.get) + codec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.deploy.history

import java.io._
import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.zip.{ZipInputStream, ZipOutputStream}
Expand All @@ -27,7 +26,7 @@ import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.json4s.jackson.JsonMethods._
import org.mockito.Matchers.any
Expand Down Expand Up @@ -63,7 +62,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
codec: Option[String] = None): File = {
val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId)
val logPath = new URI(logUri).getPath + ip
val logPath = new Path(logUri).toUri.getPath + ip
new File(logPath)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.scheduler

import java.io.{File, FileOutputStream, InputStream, IOException}
import java.net.URI

import scala.collection.mutable
import scala.io.Source
Expand Down Expand Up @@ -52,7 +51,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
private var testDirPath: Path = _

before {
testDir = Utils.createTempDir()
testDir = Utils.createTempDir(namePrefix = s"history log")
testDir.deleteOnExit()
testDirPath = new Path(testDir.getAbsolutePath())
}
Expand Down Expand Up @@ -111,7 +110,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit

test("Log overwriting") {
val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", None)
val logPath = new URI(logUri).getPath
val logPath = new Path(logUri).toUri.getPath
// Create file before writing the event log
new FileOutputStream(new File(logPath)).close()
// Expected IOException, since we haven't enabled log overwrite.
Expand Down Expand Up @@ -293,7 +292,7 @@ object EventLoggingListenerSuite {
val conf = new SparkConf
conf.set("spark.eventLog.enabled", "true")
conf.set("spark.eventLog.testing", "true")
conf.set("spark.eventLog.dir", logDir.toUri.toString)
conf.set("spark.eventLog.dir", logDir.toString)
compressionCodec.foreach { codec =>
conf.set("spark.eventLog.compress", "true")
conf.set("spark.io.compression.codec", codec)
Expand Down

0 comments on commit edcb878

Please sign in to comment.