Skip to content

Commit

Permalink
Add support for running pipe tasks is separate directories
Browse files Browse the repository at this point in the history
  • Loading branch information
tgravescs committed Mar 12, 2014
1 parent af7f2f1 commit 1ab49ca
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 12 deletions.
9 changes: 4 additions & 5 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,13 @@
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
62 changes: 59 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.rdd

import java.io.File
import java.io.FilenameFilter
import java.io.PrintWriter
import java.util.StringTokenizer

Expand All @@ -26,7 +28,9 @@ import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag

import org.apache.commons.io.FileUtils
import org.apache.spark.{Partition, SparkEnv, TaskContext}
import org.apache.spark.util.Utils


/**
Expand All @@ -38,7 +42,8 @@ class PipedRDD[T: ClassTag](
command: Seq[String],
envVars: Map[String, String],
printPipeContext: (String => Unit) => Unit,
printRDDElement: (T, String => Unit) => Unit)
printRDDElement: (T, String => Unit) => Unit,
separateWorkingDir: Boolean)
extends RDD[String](prev) {

// Similar to Runtime.exec(), if we are given a single string, split it into words
Expand All @@ -48,12 +53,24 @@ class PipedRDD[T: ClassTag](
command: String,
envVars: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null) =
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement)
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false) =
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
separateWorkingDir)


override def getPartitions: Array[Partition] = firstParent[T].partitions

/**
* A FilenameFilter that accepts anything that isn't equal to the name passed in.
* @param name of file or directory to leave out
*/
class NotEqualsFileNameFilter(name: String) extends FilenameFilter {
def accept(dir: File, name: String): Boolean = {
!name.equals(name)
}
}

override def compute(split: Partition, context: TaskContext): Iterator[String] = {
val pb = new ProcessBuilder(command)
// Add the environmental variables to the process.
Expand All @@ -67,6 +84,38 @@ class PipedRDD[T: ClassTag](
currentEnvVars.putAll(hadoopSplit.getPipeEnvVars())
}

// When spark.worker.separated.working.directory option is turned on, each
// task will be run in separate directory. This should be resolve file
// access conflict issue
val taskDirectory = "./tasks/" + java.util.UUID.randomUUID.toString
var workInTaskDirectory = false
logDebug("taskDirectory = " + taskDirectory)
if (separateWorkingDir == true) {
val currentDir = new File(".")
logDebug("currentDir = " + currentDir)
val taskDirFile = new File(taskDirectory)
taskDirFile.mkdirs()

try {
val tasksDirFilter = new NotEqualsFileNameFilter("tasks")

// Need to add symlinks to jars, files, and directories. On Yarn we could have
// directories and other files not known to the SparkContext that were added via the
// Hadoop distributed cache. We also don't want to symlink to the /tasks directories we
// are creating here.
for (file <- currentDir.list(tasksDirFilter)) {
val fileWithDir = new File(currentDir, file)
Utils.symlink(new File(fileWithDir.getAbsolutePath()),
new File(taskDirectory + "/" + fileWithDir.getName()))
}
pb.directory(taskDirFile)
workInTaskDirectory = true
} catch {
case e: Exception => logError("Unable to setup task working directory: " + e.getMessage +
" (" + taskDirectory + ")")
}
}

val proc = pb.start()
val env = SparkEnv.get

Expand Down Expand Up @@ -112,6 +161,13 @@ class PipedRDD[T: ClassTag](
if (exitStatus != 0) {
throw new Exception("Subprocess exited with status " + exitStatus)
}

// cleanup task working directory if used
if(workInTaskDirectory == true) {
FileUtils.deleteQuietly(new File(taskDirectory))
logDebug("Removed task working directory " + taskDirectory)
}

false
}
}
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -478,16 +478,19 @@ abstract class RDD[T: ClassTag](
* instead of constructing a huge String to concat all the elements:
* def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
* for (e <- record._2){f(e)}
* @param separateWorkingDir Use separate working directories for each task.
* @return the result RDD
*/
def pipe(
command: Seq[String],
env: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null): RDD[String] = {
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false): RDD[String] = {
new PipedRDD(this, command, env,
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
if (printRDDElement ne null) sc.clean(printRDDElement) else null)
if (printRDDElement ne null) sc.clean(printRDDElement) else null,
separateWorkingDir)
}

/**
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.SortedSet
import scala.io.Source
import scala.reflect.ClassTag

Expand Down Expand Up @@ -895,4 +896,24 @@ private[spark] object Utils extends Logging {
}
count
}

/**
* Creates a symlink. Note jdk1.7 has Files.createSymbolicLink but not used here
* for jdk1.6 support. Doesn't support windows or any other platform without 'ln'.
* @param src absolute path to the source
* @param dst relative path for the destination
*/
def symlink(src: File, dst: File) {
if (!src.isAbsolute()) {
throw new IOException("Source must be absolute")
}
if (dst.isAbsolute()) {
throw new IOException("Destination must be relative")
}
import scala.sys.process._
("ln -sf " + src.getAbsolutePath() + " " + dst.getPath()) lines_! ProcessLogger(line =>
(logInfo(line)))
}


}
22 changes: 22 additions & 0 deletions core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,28 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
}
}

test("basic pipe with separate working directory") {
if (testCommandAvailable("cat")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)

val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)

val c = piped.collect()
assert(c.size === 4)
assert(c(0) === "1")
assert(c(1) === "2")
assert(c(2) === "3")
assert(c(3) === "4")
val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
val collectPwd = pipedPwd.collect()
println("collect pwd is: " + collectPwd(0))
assert(collectPwd(0).contains("tasks/"))
assert(collectPwd(0).matches("tasks/"))
} else {
assert(true)
}
}

test("test pipe exports map_input_file") {
testExportInputFile("map_input_file")
}
Expand Down
4 changes: 2 additions & 2 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,12 @@ object SparkBuild extends Build {
"org.eclipse.jetty" % "jetty-security" % "7.6.8.v20121106",
/** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */
"org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),
"commons-io" % "commons-io" % "2.4",
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
"com.novocode" % "junit-interface" % "0.10" % "test",
"org.easymock" % "easymock" % "3.1" % "test",
"org.mockito" % "mockito-all" % "1.8.5" % "test",
"commons-io" % "commons-io" % "2.4" % "test"
"org.mockito" % "mockito-all" % "1.8.5" % "test"
),

testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
Expand Down

0 comments on commit 1ab49ca

Please sign in to comment.