Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
leahmcguire committed Mar 5, 2015
2 parents 900b586 + 424a86a commit b85b0c9
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class LocalSparkCluster(
/* Start the Workers */
for (workerNum <- 1 to numWorkers) {
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
memoryPerWorker, masters, null, Some(workerNum))
memoryPerWorker, masters, null, Some(workerNum), _conf)
workerActorSystems += workerSystem
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private[spark] class Master(
val webUi = new MasterWebUI(this, webUiPort)

val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private[spark] class ExecutorRunner(
val workerId: String,
val host: String,
val webUiPort: Int,
val publicAddress: String,
val sparkHome: File,
val executorDir: File,
val workerUrl: String,
Expand Down Expand Up @@ -140,7 +141,8 @@ private[spark] class ExecutorRunner(
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

// Add webUI log urls
val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
val baseUrl =
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private[spark] class Worker(
val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)

val publicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
}
var webUi: WorkerWebUI = null
Expand Down Expand Up @@ -362,7 +362,8 @@ private[spark] class Worker(
self,
workerId,
host,
webUiPort,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
akkaUrl,
Expand Down Expand Up @@ -538,10 +539,10 @@ private[spark] object Worker extends Logging {
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None): (ActorSystem, Int) = {
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): (ActorSystem, Int) = {

// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val conf = new SparkConf
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val actorName = "Worker"
val securityMgr = new SecurityManager(conf)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[spark] abstract class WebUI(
protected val handlers = ArrayBuffer[ServletContextHandler]()
protected var serverInfo: Option[ServerInfo] = None
protected val localHostName = Utils.localHostName()
protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
private val className = Utils.getFormattedClassName(this)

def getBasePath: String = basePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite {

def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
new File("sparkHome"), new File("workDir"), "akka://worker",
"publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker",
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,69 @@

package org.apache.spark.deploy

import java.net.URL

import scala.collection.mutable
import scala.io.Source

import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.FunSuite

import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
import org.apache.spark.{SparkContext, LocalSparkContext}
import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}

class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
private val WAIT_TIMEOUT_MILLIS = 10000

before {
test("verify that correct log urls get propagated from workers") {
sc = new SparkContext("local-cluster[2,1,512]", "test")

val listener = new SaveExecutorInfo
sc.addSparkListener(listener)

// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
// Browse to each URL to check that it's valid
info.logUrlMap.foreach { case (logType, logUrl) =>
val html = Source.fromURL(logUrl).mkString
assert(html.contains(s"$logType log page"))
}
}
}

test("verify log urls get propagated from workers") {
test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
val SPARK_PUBLIC_DNS = "public_dns"
class MySparkConf extends SparkConf(false) {
override def getenv(name: String) = {
if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS
else super.getenv(name)
}

override def clone: SparkConf = {
new MySparkConf().setAll(getAll)
}
}
val conf = new MySparkConf()
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)

val listener = new SaveExecutorInfo
sc.addSparkListener(listener)

val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.map(_.toString)
rdd2.setName("Target RDD")
rdd2.count()
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
info.logUrlMap.values.foreach { logUrl =>
assert(new URL(logUrl).getHost === SPARK_PUBLIC_DNS)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite {
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123,
new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
"publicAddr", new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
ExecutorState.RUNNING)
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
assert(builder.command().last === appId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.Logging
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.evaluation.binary._
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.sql.DataFrame

/**
* :: Experimental ::
Expand Down Expand Up @@ -53,6 +54,13 @@ class BinaryClassificationMetrics(
*/
def this(scoreAndLabels: RDD[(Double, Double)]) = this(scoreAndLabels, 0)

/**
* An auxiliary constructor taking a DataFrame.
* @param scoreAndLabels a DataFrame with two double columns: score and label
*/
private[mllib] def this(scoreAndLabels: DataFrame) =
this(scoreAndLabels.map(r => (r.getDouble(0), r.getDouble(1))))

/** Unpersist intermediate RDDs used in the computation. */
def unpersist() {
cumulativeCounts.unpersist()
Expand Down
7 changes: 7 additions & 0 deletions python/docs/pyspark.mllib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ pyspark.mllib.clustering module
:members:
:undoc-members:

pyspark.mllib.evaluation module
-------------------------------

.. automodule:: pyspark.mllib.evaluation
:members:
:undoc-members:

pyspark.mllib.feature module
-------------------------------

Expand Down
83 changes: 83 additions & 0 deletions python/pyspark/mllib/evaluation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from pyspark.mllib.common import JavaModelWrapper
from pyspark.sql import SQLContext
from pyspark.sql.types import StructField, StructType, DoubleType


class BinaryClassificationMetrics(JavaModelWrapper):
"""
Evaluator for binary classification.
>>> scoreAndLabels = sc.parallelize([
... (0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)], 2)
>>> metrics = BinaryClassificationMetrics(scoreAndLabels)
>>> metrics.areaUnderROC()
0.70...
>>> metrics.areaUnderPR()
0.83...
>>> metrics.unpersist()
"""

def __init__(self, scoreAndLabels):
"""
:param scoreAndLabels: an RDD of (score, label) pairs
"""
sc = scoreAndLabels.ctx
sql_ctx = SQLContext(sc)
df = sql_ctx.createDataFrame(scoreAndLabels, schema=StructType([
StructField("score", DoubleType(), nullable=False),
StructField("label", DoubleType(), nullable=False)]))
java_class = sc._jvm.org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
java_model = java_class(df._jdf)
super(BinaryClassificationMetrics, self).__init__(java_model)

def areaUnderROC(self):
"""
Computes the area under the receiver operating characteristic
(ROC) curve.
"""
return self.call("areaUnderROC")

def areaUnderPR(self):
"""
Computes the area under the precision-recall curve.
"""
return self.call("areaUnderPR")

def unpersist(self):
"""
Unpersists intermediate RDDs used in the computation.
"""
self.call("unpersist")


def _test():
import doctest
from pyspark import SparkContext
import pyspark.mllib.evaluation
globs = pyspark.mllib.evaluation.__dict__.copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest')
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)


if __name__ == "__main__":
_test()
1 change: 1 addition & 0 deletions python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ function run_mllib_tests() {
echo "Run mllib tests ..."
run_test "pyspark/mllib/classification.py"
run_test "pyspark/mllib/clustering.py"
run_test "pyspark/mllib/evaluation.py"
run_test "pyspark/mllib/feature.py"
run_test "pyspark/mllib/linalg.py"
run_test "pyspark/mllib/rand.py"
Expand Down

0 comments on commit b85b0c9

Please sign in to comment.