Skip to content

Commit

Permalink
Merge pull request #5 from apache/master
Browse files Browse the repository at this point in the history
merge lastest spark
  • Loading branch information
pzzs committed Mar 19, 2015
2 parents 161cae3 + 2c3f83c commit 98b134f
Show file tree
Hide file tree
Showing 203 changed files with 4,247 additions and 1,523 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ storage systems. Because the protocols have changed in different versions of
Hadoop, you must build Spark against the same version that your cluster runs.

Please refer to the build documentation at
["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-with-maven.html#specifying-the-hadoop-version)
["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)
for detailed guidance on building for a particular distribution of Hadoop, including
building for particular Hive and Hive Thriftserver distributions. See also
["Third Party Hadoop Distributions"](http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html)
Expand Down
10 changes: 0 additions & 10 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,6 @@
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<!-- Exclude libgfortran, libgcc for license issues -->
<artifact>org.jblas:jblas</artifact>
<excludes>
<!-- Linux amd64 is OK; not statically linked -->
<exclude>lib/static/Linux/i386/**</exclude>
<exclude>lib/static/Mac OS X/**</exclude>
<exclude>lib/static/Windows/**</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
Expand Down
1 change: 0 additions & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ export PYTHONSTARTUP="$SPARK_HOME/python/pyspark/shell.py"
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
export PYSPARK_SUBMIT_ARGS=pyspark-shell
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
else
Expand Down
2 changes: 1 addition & 1 deletion conf/spark-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
# - SPARK_CLASSPATH, default classpath entries to append
# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
# - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos
# - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos

# Options read in YARN client mode
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@
<overWriteIfNewer>true</overWriteIfNewer>
<useSubDirectoryPerType>true</useSubDirectoryPerType>
<includeArtifactIds>
guava,jetty-io,jetty-servlet,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server
guava,jetty-io,jetty-servlet,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,jetty-security
</includeArtifactIds>
<silent>true</silent>
</configuration>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def addFile(path: String, recursive: Boolean): Unit = {
val uri = new URI(path)
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => "file:" + uri.getPath
case null | "local" => new File(path).getCanonicalFile.toURI.toString
case _ => path
}

Expand Down Expand Up @@ -1736,7 +1736,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

listenerBus.start()
listenerBus.start(this)
}

/** Post the application start event */
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/TaskState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ private[spark] object TaskState extends Enumeration {
case MesosTaskState.TASK_FAILED => FAILED
case MesosTaskState.TASK_KILLED => KILLED
case MesosTaskState.TASK_LOST => LOST
case MesosTaskState.TASK_ERROR => LOST
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.StatCounter
import org.apache.spark.util.Utils

class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, JavaDoubleRDD] {
class JavaDoubleRDD(val srdd: RDD[scala.Double])
extends AbstractJavaRDDLike[JDouble, JavaDoubleRDD] {

override val classTag: ClassTag[JDouble] = implicitly[ClassTag[JDouble]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.util.Utils

class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
(implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V])
extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
extends AbstractJavaRDDLike[(K, V), JavaPairRDD[K, V]] {

override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
extends JavaRDDLike[T, JavaRDD[T]] {
extends AbstractJavaRDDLike[T, JavaRDD[T]] {

override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

/**
* As a workaround for https://issues.scala-lang.org/browse/SI-8905, implementations
* of JavaRDDLike should extend this dummy abstract class instead of directly inheriting
* from the trait. See SPARK-3266 for additional details.
*/
private[spark] abstract class AbstractJavaRDDLike[T, This <: JavaRDDLike[T, This]]
extends JavaRDDLike[T, This]

/**
* Defines operations common to several Java RDD implementations.
* Note that this trait is not intended to be implemented by user code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ private[spark] class PythonRDD(

context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()
writerThread.join()
if (!reuse_worker || !released) {
try {
worker.close()
Expand Down Expand Up @@ -248,13 +247,17 @@ private[spark] class PythonRDD(
} catch {
case e: Exception if context.isCompleted || context.isInterrupted =>
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
Utils.tryLog(worker.shutdownOutput())
if (!worker.isClosed) {
Utils.tryLog(worker.shutdownOutput())
}

case e: Exception =>
// We must avoid throwing exceptions here, because the thread uncaught exception handler
// will kill the whole executor (see org.apache.spark.executor.Executor).
_exception = e
Utils.tryLog(worker.shutdownOutput())
if (!worker.isClosed) {
Utils.tryLog(worker.shutdownOutput())
}
} finally {
// Release memory used by this thread for shuffles
env.shuffleMemoryManager.releaseMemoryForThisThread()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.util.{IntParam, MemoryParam}
/**
* Command-line parser for the driver client.
*/
private[spark] class ClientArguments(args: Array[String]) {
private[deploy] class ClientArguments(args: Array[String]) {
import ClientArguments._

var cmd: String = "" // 'launch' or 'kill'
Expand Down Expand Up @@ -96,7 +96,7 @@ private[spark] class ClientArguments(args: Array[String]) {
/**
* Print usage and exit JVM with the given exit code.
*/
def printUsageAndExit(exitCode: Int) {
private def printUsageAndExit(exitCode: Int) {
// TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
// separately similar to in the YARN client.
val usage =
Expand All @@ -116,10 +116,10 @@ private[spark] class ClientArguments(args: Array[String]) {
}
}

object ClientArguments {
private[spark] val DEFAULT_CORES = 1
private[spark] val DEFAULT_MEMORY = 512 // MB
private[spark] val DEFAULT_SUPERVISE = false
private[deploy] object ClientArguments {
val DEFAULT_CORES = 1
val DEFAULT_MEMORY = 512 // MB
val DEFAULT_SUPERVISE = false

def isValidJarUrl(s: String): Boolean = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

private[spark] class DriverDescription(
private[deploy] class DriverDescription(
val jarUrl: String,
val mem: Int,
val cores: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package org.apache.spark.deploy
* This state is sufficient for the Master to reconstruct its internal data structures during
* failover.
*/
private[spark] class ExecutorDescription(
private[deploy] class ExecutorDescription(
val appId: String,
val execId: Int,
val cores: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

private[spark] object ExecutorState extends Enumeration {
private[deploy] object ExecutorState extends Enumeration {

val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,29 @@ import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
* - The docker images tagged spark-test-master and spark-test-worker are built from the
* docker/ directory. Run 'docker/spark-test/build' to generate these.
*/
private[spark] object FaultToleranceTest extends App with Logging {
private object FaultToleranceTest extends App with Logging {

val conf = new SparkConf()
val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")
private val conf = new SparkConf()
private val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")

val masters = ListBuffer[TestMasterInfo]()
val workers = ListBuffer[TestWorkerInfo]()
var sc: SparkContext = _
private val masters = ListBuffer[TestMasterInfo]()
private val workers = ListBuffer[TestWorkerInfo]()
private var sc: SparkContext = _

val zk = SparkCuratorUtil.newClient(conf)
private val zk = SparkCuratorUtil.newClient(conf)

var numPassed = 0
var numFailed = 0
private var numPassed = 0
private var numFailed = 0

val sparkHome = System.getenv("SPARK_HOME")
private val sparkHome = System.getenv("SPARK_HOME")
assertTrue(sparkHome != null, "Run with a valid SPARK_HOME")

val containerSparkHome = "/opt/spark"
val dockerMountDir = "%s:%s".format(sparkHome, containerSparkHome)
private val containerSparkHome = "/opt/spark"
private val dockerMountDir = "%s:%s".format(sparkHome, containerSparkHome)

System.setProperty("spark.driver.host", "172.17.42.1") // default docker host ip

def afterEach() {
private def afterEach() {
if (sc != null) {
sc.stop()
sc = null
Expand Down Expand Up @@ -179,7 +179,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
}
}

def test(name: String)(fn: => Unit) {
private def test(name: String)(fn: => Unit) {
try {
fn
numPassed += 1
Expand All @@ -197,19 +197,19 @@ private[spark] object FaultToleranceTest extends App with Logging {
afterEach()
}

def addMasters(num: Int) {
private def addMasters(num: Int) {
logInfo(s">>>>> ADD MASTERS $num <<<<<")
(1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) }
}

def addWorkers(num: Int) {
private def addWorkers(num: Int) {
logInfo(s">>>>> ADD WORKERS $num <<<<<")
val masterUrls = getMasterUrls(masters)
(1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
}

/** Creates a SparkContext, which constructs a Client to interact with our cluster. */
def createClient() = {
private def createClient() = {
logInfo(">>>>> CREATE CLIENT <<<<<")
if (sc != null) { sc.stop() }
// Counter-hack: Because of a hack in SparkEnv#create() that changes this
Expand All @@ -218,27 +218,27 @@ private[spark] object FaultToleranceTest extends App with Logging {
sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome)
}

def getMasterUrls(masters: Seq[TestMasterInfo]): String = {
private def getMasterUrls(masters: Seq[TestMasterInfo]): String = {
"spark://" + masters.map(master => master.ip + ":7077").mkString(",")
}

def getLeader: TestMasterInfo = {
private def getLeader: TestMasterInfo = {
val leaders = masters.filter(_.state == RecoveryState.ALIVE)
assertTrue(leaders.size == 1)
leaders(0)
}

def killLeader(): Unit = {
private def killLeader(): Unit = {
logInfo(">>>>> KILL LEADER <<<<<")
masters.foreach(_.readState())
val leader = getLeader
masters -= leader
leader.kill()
}

def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis)
private def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis)

def terminateCluster() {
private def terminateCluster() {
logInfo(">>>>> TERMINATE CLUSTER <<<<<")
masters.foreach(_.kill())
workers.foreach(_.kill())
Expand All @@ -247,7 +247,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
}

/** This includes Client retry logic, so it may take a while if the cluster is recovering. */
def assertUsable() = {
private def assertUsable() = {
val f = future {
try {
val res = sc.parallelize(0 until 10).collect()
Expand All @@ -269,7 +269,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
* Asserts that the cluster is usable and that the expected masters and workers
* are all alive in a proper configuration (e.g., only one leader).
*/
def assertValidClusterState() = {
private def assertValidClusterState() = {
logInfo(">>>>> ASSERT VALID CLUSTER STATE <<<<<")
assertUsable()
var numAlive = 0
Expand Down Expand Up @@ -325,7 +325,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
}
}

def assertTrue(bool: Boolean, message: String = "") {
private def assertTrue(bool: Boolean, message: String = "") {
if (!bool) {
throw new IllegalStateException("Assertion failed: " + message)
}
Expand All @@ -335,7 +335,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
numFailed))
}

private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File)
private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File)
extends Logging {

implicit val formats = org.json4s.DefaultFormats
Expand Down Expand Up @@ -377,7 +377,7 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val
format(ip, dockerId.id, logFile.getAbsolutePath, state)
}

private[spark] class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File)
private class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File)
extends Logging {

implicit val formats = org.json4s.DefaultFormats
Expand All @@ -390,7 +390,7 @@ private[spark] class TestWorkerInfo(val ip: String, val dockerId: DockerId, val
"[ip=%s, id=%s, logFile=%s]".format(ip, dockerId, logFile.getAbsolutePath)
}

private[spark] object SparkDocker {
private object SparkDocker {
def startMaster(mountDir: String): TestMasterInfo = {
val cmd = Docker.makeRunCmd("spark-test-master", mountDir = mountDir)
val (ip, id, outFile) = startNode(cmd)
Expand Down Expand Up @@ -425,11 +425,11 @@ private[spark] object SparkDocker {
}
}

private[spark] class DockerId(val id: String) {
private class DockerId(val id: String) {
override def toString = id
}

private[spark] object Docker extends Logging {
private object Docker extends Logging {
def makeRunCmd(imageTag: String, args: String = "", mountDir: String = ""): ProcessBuilder = {
val mountCmd = if (mountDir != "") { " -v " + mountDir } else ""

Expand Down
Loading

0 comments on commit 98b134f

Please sign in to comment.