diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 0401b15446a7b..3ab425aab84c8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -59,7 +59,7 @@ class LocalSparkCluster( /* Start the Workers */ for (workerNum <- 1 to numWorkers) { val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker, - memoryPerWorker, masters, null, Some(workerNum)) + memoryPerWorker, masters, null, Some(workerNum), _conf) workerActorSystems += workerSystem } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 4584b730e3420..15814293227ab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -96,7 +96,7 @@ private[spark] class Master( val webUi = new MasterWebUI(this, webUiPort) val masterPublicAddress = { - val envVar = System.getenv("SPARK_PUBLIC_DNS") + val envVar = conf.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else host } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 066d46c4473eb..023f3c6269062 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -44,6 +44,7 @@ private[spark] class ExecutorRunner( val workerId: String, val host: String, val webUiPort: Int, + val publicAddress: String, val sparkHome: File, val executorDir: File, val workerUrl: String, @@ -140,7 +141,8 @@ private[spark] class ExecutorRunner( builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") // Add webUI log urls - val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=" + val baseUrl = + s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=" builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr") builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 2473a90aa9309..f2e7418f4bf15 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -121,7 +121,7 @@ private[spark] class Worker( val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr) val publicAddress = { - val envVar = System.getenv("SPARK_PUBLIC_DNS") + val envVar = conf.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else host } var webUi: WorkerWebUI = null @@ -362,7 +362,8 @@ private[spark] class Worker( self, workerId, host, - webUiPort, + webUi.boundPort, + publicAddress, sparkHome, executorDir, akkaUrl, @@ -538,10 +539,10 @@ private[spark] object Worker extends Logging { memory: Int, masterUrls: Array[String], workDir: String, - workerNumber: Option[Int] = None): (ActorSystem, Int) = { + workerNumber: Option[Int] = None, + conf: SparkConf = new SparkConf): (ActorSystem, Int) = { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems - val conf = new SparkConf val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") val actorName = "Worker" val securityMgr = new SecurityManager(conf) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 9be65a4a39a09..ec68837a1516c 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -47,7 +47,7 @@ private[spark] abstract class WebUI( protected val handlers = ArrayBuffer[ServletContextHandler]() protected var serverInfo: Option[ServerInfo] = None protected val localHostName = Utils.localHostName() - protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName) + protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName) private val className = Utils.getFormattedClassName(this) def getBasePath: String = basePath diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index e955636cf5b59..68b5776fc6515 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite { def createExecutorRunner(): ExecutorRunner = { new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123, - new File("sparkHome"), new File("workDir"), "akka://worker", + "publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker", new SparkConf, Seq("localDir"), ExecutorState.RUNNING) } diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala index f33bdc73e40ac..54dd7c9c45c61 100644 --- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -17,35 +17,69 @@ package org.apache.spark.deploy +import java.net.URL + import scala.collection.mutable +import scala.io.Source -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.FunSuite import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener} -import org.apache.spark.{SparkContext, LocalSparkContext} +import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext} -class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter { +class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext { /** Length of time to wait while draining listener events. */ - val WAIT_TIMEOUT_MILLIS = 10000 + private val WAIT_TIMEOUT_MILLIS = 10000 - before { + test("verify that correct log urls get propagated from workers") { sc = new SparkContext("local-cluster[2,1,512]", "test") + + val listener = new SaveExecutorInfo + sc.addSparkListener(listener) + + // Trigger a job so that executors get added + sc.parallelize(1 to 100, 4).map(_.toString).count() + + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + listener.addedExecutorInfos.values.foreach { info => + assert(info.logUrlMap.nonEmpty) + // Browse to each URL to check that it's valid + info.logUrlMap.foreach { case (logType, logUrl) => + val html = Source.fromURL(logUrl).mkString + assert(html.contains(s"$logType log page")) + } + } } - test("verify log urls get propagated from workers") { + test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") { + val SPARK_PUBLIC_DNS = "public_dns" + class MySparkConf extends SparkConf(false) { + override def getenv(name: String) = { + if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS + else super.getenv(name) + } + + override def clone: SparkConf = { + new MySparkConf().setAll(getAll) + } + } + val conf = new MySparkConf() + sc = new SparkContext("local-cluster[2,1,512]", "test", conf) + val listener = new SaveExecutorInfo sc.addSparkListener(listener) - val rdd1 = sc.parallelize(1 to 100, 4) - val rdd2 = rdd1.map(_.toString) - rdd2.setName("Target RDD") - rdd2.count() + // Trigger a job so that executors get added + sc.parallelize(1 to 100, 4).map(_.toString).count() assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) listener.addedExecutorInfos.values.foreach { info => assert(info.logUrlMap.nonEmpty) + info.logUrlMap.values.foreach { logUrl => + assert(new URL(logUrl).getHost === SPARK_PUBLIC_DNS) + } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 76511699e5ac5..6fca6321e5a1b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -33,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite { val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl") val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123, - new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"), + "publicAddr", new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"), ExecutorState.RUNNING) val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables) assert(builder.command().last === appId) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index ced042e2f96ca..c1d1a224817e8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -22,6 +22,7 @@ import org.apache.spark.Logging import org.apache.spark.SparkContext._ import org.apache.spark.mllib.evaluation.binary._ import org.apache.spark.rdd.{RDD, UnionRDD} +import org.apache.spark.sql.DataFrame /** * :: Experimental :: @@ -53,6 +54,13 @@ class BinaryClassificationMetrics( */ def this(scoreAndLabels: RDD[(Double, Double)]) = this(scoreAndLabels, 0) + /** + * An auxiliary constructor taking a DataFrame. + * @param scoreAndLabels a DataFrame with two double columns: score and label + */ + private[mllib] def this(scoreAndLabels: DataFrame) = + this(scoreAndLabels.map(r => (r.getDouble(0), r.getDouble(1)))) + /** Unpersist intermediate RDDs used in the computation. */ def unpersist() { cumulativeCounts.unpersist() diff --git a/python/docs/pyspark.mllib.rst b/python/docs/pyspark.mllib.rst index b706c5e376ef4..15101470afc07 100644 --- a/python/docs/pyspark.mllib.rst +++ b/python/docs/pyspark.mllib.rst @@ -16,6 +16,13 @@ pyspark.mllib.clustering module :members: :undoc-members: +pyspark.mllib.evaluation module +------------------------------- + +.. automodule:: pyspark.mllib.evaluation + :members: + :undoc-members: + pyspark.mllib.feature module ------------------------------- diff --git a/python/pyspark/mllib/evaluation.py b/python/pyspark/mllib/evaluation.py new file mode 100644 index 0000000000000..16cb49cc0cfff --- /dev/null +++ b/python/pyspark/mllib/evaluation.py @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark.mllib.common import JavaModelWrapper +from pyspark.sql import SQLContext +from pyspark.sql.types import StructField, StructType, DoubleType + + +class BinaryClassificationMetrics(JavaModelWrapper): + """ + Evaluator for binary classification. + + >>> scoreAndLabels = sc.parallelize([ + ... (0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)], 2) + >>> metrics = BinaryClassificationMetrics(scoreAndLabels) + >>> metrics.areaUnderROC() + 0.70... + >>> metrics.areaUnderPR() + 0.83... + >>> metrics.unpersist() + """ + + def __init__(self, scoreAndLabels): + """ + :param scoreAndLabels: an RDD of (score, label) pairs + """ + sc = scoreAndLabels.ctx + sql_ctx = SQLContext(sc) + df = sql_ctx.createDataFrame(scoreAndLabels, schema=StructType([ + StructField("score", DoubleType(), nullable=False), + StructField("label", DoubleType(), nullable=False)])) + java_class = sc._jvm.org.apache.spark.mllib.evaluation.BinaryClassificationMetrics + java_model = java_class(df._jdf) + super(BinaryClassificationMetrics, self).__init__(java_model) + + def areaUnderROC(self): + """ + Computes the area under the receiver operating characteristic + (ROC) curve. + """ + return self.call("areaUnderROC") + + def areaUnderPR(self): + """ + Computes the area under the precision-recall curve. + """ + return self.call("areaUnderPR") + + def unpersist(self): + """ + Unpersists intermediate RDDs used in the computation. + """ + self.call("unpersist") + + +def _test(): + import doctest + from pyspark import SparkContext + import pyspark.mllib.evaluation + globs = pyspark.mllib.evaluation.__dict__.copy() + globs['sc'] = SparkContext('local[4]', 'PythonTest') + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + globs['sc'].stop() + if failure_count: + exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/run-tests b/python/run-tests index a2c2f37a54eda..b7630c356cfae 100755 --- a/python/run-tests +++ b/python/run-tests @@ -75,6 +75,7 @@ function run_mllib_tests() { echo "Run mllib tests ..." run_test "pyspark/mllib/classification.py" run_test "pyspark/mllib/clustering.py" + run_test "pyspark/mllib/evaluation.py" run_test "pyspark/mllib/feature.py" run_test "pyspark/mllib/linalg.py" run_test "pyspark/mllib/rand.py"