Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
sagacifyTestUser committed Mar 23, 2015
2 parents daf685e + bf044de commit 12dddb4
Show file tree
Hide file tree
Showing 211 changed files with 2,000 additions and 919 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.3.0-SNAPSHOT</version>
<version>1.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.3.0-SNAPSHOT</version>
<version>1.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
4 changes: 2 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.3.0-SNAPSHOT</version>
<version>1.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -275,7 +275,7 @@
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
<version>0.5.0</version>
<version>0.6.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ private[spark] object TaskState extends Enumeration {

type TaskState = Value

def isFailed(state: TaskState) = (LOST == state) || (FAILED == state)

def isFinished(state: TaskState) = FINISHED_STATES.contains(state)

def toMesos(state: TaskState): MesosTaskState = state match {
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,23 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])

/**
* Return a sampled subset of this RDD.
*
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be >= 0
*/
def sample(withReplacement: Boolean, fraction: Double): JavaRDD[T] =
sample(withReplacement, fraction, Utils.random.nextLong)

/**
* Return a sampled subset of this RDD.
*
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be >= 0
* @param seed seed for the random number generator
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T] =
wrapRDD(rdd.sample(withReplacement, fraction, seed))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.json4s.jackson.JsonMethods

import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
import org.apache.spark.util.Utils

/**
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
Expand Down Expand Up @@ -405,8 +406,7 @@ private object SparkDocker {

private def startNode(dockerCmd: ProcessBuilder) : (String, DockerId, File) = {
val ipPromise = promise[String]()
val outFile = File.createTempFile("fault-tolerance-test", "")
outFile.deleteOnExit()
val outFile = File.createTempFile("fault-tolerance-test", "", Utils.createTempDir())
val outStream: FileWriter = new FileWriter(outFile)
def findIpAndLog(line: String): Unit = {
if (line.startsWith("CONTAINER_IP=")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
} catch {
case e: Exception =>
logError(
s"Exception encountered when attempting to load application log ${fileStatus.getPath}")
s"Exception encountered when attempting to load application log ${fileStatus.getPath}",
e)
None
}
}.toSeq.sortWith(compareAppInfo)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ package org.apache

package object spark {
// For package docs only
val SPARK_VERSION = "1.3.0-SNAPSHOT"
val SPARK_VERSION = "1.4.0-SNAPSHOT"
}
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,12 @@ abstract class RDD[T: ClassTag](

/**
* Return a sampled subset of this RDD.
*
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be >= 0
* @param seed seed for the random number generator
*/
def sample(withReplacement: Boolean,
fraction: Double,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private[spark] class CoarseMesosSchedulerBackend(
coresByTaskId -= taskId
}
// If it was a failure, mark the slave as failed for blacklisting purposes
if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
if (TaskState.isFailed(TaskState.fromMesos(state))) {
failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,11 @@ import org.apache.spark.SparkContext

private[spark] object MemoryUtils {
// These defaults copied from YARN
val OVERHEAD_FRACTION = 1.10
val OVERHEAD_FRACTION = 0.10
val OVERHEAD_MINIMUM = 384

def calculateTotalMemory(sc: SparkContext) = {
math.max(
sc.conf.getOption("spark.mesos.executor.memoryOverhead")
.getOrElse(OVERHEAD_MINIMUM.toString)
.toInt + sc.executorMemory,
OVERHEAD_FRACTION * sc.executorMemory
)
sc.conf.getInt("spark.mesos.executor.memoryOverhead",
math.max(OVERHEAD_FRACTION * sc.executorMemory, OVERHEAD_MINIMUM).toInt) + sc.executorMemory
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ private[spark] class MesosSchedulerBackend(
val tid = status.getTaskId.getValue.toLong
val state = TaskState.fromMesos(status.getState)
synchronized {
if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
if (TaskState.isFailed(TaskState.fromMesos(status.getState))
&& taskIdToSlaveId.contains(tid)) {
// We lost the executor on this slave, so remember that it's gone
removeExecutor(taskIdToSlaveId(tid), "Lost executor")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.storage
import java.text.SimpleDateFormat
import java.util.{Date, Random}

import tachyon.client.TachyonFS
import tachyon.client.TachyonFile
import tachyon.TachyonURI
import tachyon.client.{TachyonFile, TachyonFS}

import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode
Expand All @@ -40,7 +40,7 @@ private[spark] class TachyonBlockManager(
val master: String)
extends Logging {

val client = if (master != null && master != "") TachyonFS.get(master) else null
val client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null

if (client == null) {
logError("Failed to connect to the Tachyon as the master address is not configured")
Expand All @@ -60,11 +60,11 @@ private[spark] class TachyonBlockManager(
addShutdownHook()

def removeFile(file: TachyonFile): Boolean = {
client.delete(file.getPath(), false)
client.delete(new TachyonURI(file.getPath()), false)
}

def fileExists(file: TachyonFile): Boolean = {
client.exist(file.getPath())
client.exist(new TachyonURI(file.getPath()))
}

def getFile(filename: String): TachyonFile = {
Expand All @@ -81,15 +81,15 @@ private[spark] class TachyonBlockManager(
if (old != null) {
old
} else {
val path = tachyonDirs(dirId) + "/" + "%02x".format(subDirId)
val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}")
client.mkdir(path)
val newDir = client.getFile(path)
subDirs(dirId)(subDirId) = newDir
newDir
}
}
}
val filePath = subDir + "/" + filename
val filePath = new TachyonURI(s"$subDir/$filename")
if(!client.exist(filePath)) {
client.createFile(filePath)
}
Expand All @@ -101,7 +101,7 @@ private[spark] class TachyonBlockManager(

// TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore.
private def createTachyonDirs(): Array[TachyonFile] = {
logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'")
logDebug(s"Creating tachyon directories at root dirs '$rootDirs'")
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
rootDirs.split(",").map { rootDir =>
var foundLocalDir = false
Expand All @@ -113,22 +113,21 @@ private[spark] class TachyonBlockManager(
tries += 1
try {
tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
val path = rootDir + "/" + "spark-tachyon-" + tachyonDirId
val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId")
if (!client.exist(path)) {
foundLocalDir = client.mkdir(path)
tachyonDir = client.getFile(path)
}
} catch {
case e: Exception =>
logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e)
logWarning(s"Attempt $tries to create tachyon dir $tachyonDir failed", e)
}
}
if (!foundLocalDir) {
logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " +
rootDir)
logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create tachyon dir in $rootDir")
System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR)
}
logInfo("Created tachyon directory at " + tachyonDir)
logInfo(s"Created tachyon directory at $tachyonDir")
tachyonDir
}
}
Expand All @@ -145,7 +144,7 @@ private[spark] class TachyonBlockManager(
}
} catch {
case e: Exception =>
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
logError(s"Exception while deleting tachyon spark dir: $tachyonDir", e)
}
}
client.close()
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.log4j.PropertyConfigurator
import org.eclipse.jetty.util.MultiException
import org.json4s._

import tachyon.TachyonURI
import tachyon.client.{TachyonFS, TachyonFile}

import org.apache.spark._
Expand Down Expand Up @@ -288,7 +290,7 @@ private[spark] object Utils extends Logging {
} catch { case e: SecurityException => dir = null; }
}

dir
dir.getCanonicalFile
}

/**
Expand Down Expand Up @@ -970,7 +972,7 @@ private[spark] object Utils extends Logging {
* Delete a file or directory and its contents recursively.
*/
def deleteRecursively(dir: TachyonFile, client: TachyonFS) {
if (!client.delete(dir.getPath(), true)) {
if (!client.delete(new TachyonURI(dir.getPath()), true)) {
throw new IOException("Failed to delete the tachyon dir: " + dir)
}
}
Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {

override def beforeEach() {
super.beforeEach()
checkpointDir = File.createTempFile("temp", "")
checkpointDir.deleteOnExit()
checkpointDir = File.createTempFile("temp", "", Utils.createTempDir())
checkpointDir.delete()
sc = new SparkContext("local", "test")
sc.setCheckpointDir(checkpointDir.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.File

import org.scalatest.FunSuite

import org.apache.spark.util.Utils

class SecurityManagerSuite extends FunSuite {

test("set security with conf") {
Expand Down Expand Up @@ -160,8 +162,7 @@ class SecurityManagerSuite extends FunSuite {
}

test("ssl off setup") {
val file = File.createTempFile("SSLOptionsSuite", "conf")
file.deleteOnExit()
val file = File.createTempFile("SSLOptionsSuite", "conf", Utils.createTempDir())

System.setProperty("spark.ssl.configFile", file.getAbsolutePath)
val conf = new SparkConf()
Expand Down
11 changes: 6 additions & 5 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,14 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
val byteArray2 = converter.convert(bytesWritable)
assert(byteArray2.length === 0)
}

test("addFile works") {
val file1 = File.createTempFile("someprefix1", "somesuffix1")
val dir = Utils.createTempDir()

val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
val absolutePath1 = file1.getAbsolutePath

val pluto = Utils.createTempDir()
val file2 = File.createTempFile("someprefix2", "somesuffix2", pluto)
val file2 = File.createTempFile("someprefix2", "somesuffix2", dir)
val relativePath = file2.getParent + "/../" + file2.getParentFile.getName + "/" + file2.getName
val absolutePath2 = file2.getAbsolutePath

Expand Down Expand Up @@ -129,7 +130,7 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
sc.stop()
}
}

test("addFile recursive works") {
val pluto = Utils.createTempDir()
val neptune = Utils.createTempDir(pluto.getAbsolutePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,10 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
val archives = "file:/archive1,archive2" // spark.yarn.dist.archives
val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles

val tmpDir = Utils.createTempDir()

// Test jars and files
val f1 = File.createTempFile("test-submit-jars-files", "")
val f1 = File.createTempFile("test-submit-jars-files", "", tmpDir)
val writer1 = new PrintWriter(f1)
writer1.println("spark.jars " + jars)
writer1.println("spark.files " + files)
Expand All @@ -420,7 +422,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
sysProps("spark.files") should be(Utils.resolveURIs(files))

// Test files and archives (Yarn)
val f2 = File.createTempFile("test-submit-files-archives", "")
val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir)
val writer2 = new PrintWriter(f2)
writer2.println("spark.yarn.dist.files " + files)
writer2.println("spark.yarn.dist.archives " + archives)
Expand All @@ -437,7 +439,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
sysProps2("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))

// Test python files
val f3 = File.createTempFile("test-submit-python-files", "")
val f3 = File.createTempFile("test-submit-python-files", "", tmpDir)
val writer3 = new PrintWriter(f3)
writer3.println("spark.submit.pyFiles " + pyFiles)
writer3.close()
Expand Down
6 changes: 4 additions & 2 deletions core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{FileSplit, JobConf, TextInputFormat}
import org.apache.spark._
import org.scalatest.FunSuite

import scala.collection.Map
import scala.language.postfixOps
import scala.sys.process._
import scala.util.Try

import org.apache.spark._
import org.apache.spark.util.Utils

class PipedRDDSuite extends FunSuite with SharedSparkContext {

test("basic pipe") {
Expand Down Expand Up @@ -141,7 +143,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
// make sure symlinks were created
assert(pipedLs.length > 0)
// clean up top level tasks directory
new File("tasks").delete()
Utils.deleteRecursively(new File("tasks"))
} else {
assert(true)
}
Expand Down
Loading

0 comments on commit 12dddb4

Please sign in to comment.