From 0307db0f55b714930c7ea118d5451190ea8c1a94 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Mon, 7 Apr 2014 13:06:30 -0700 Subject: [PATCH] SPARK-1099: Introduce local[*] mode to infer number of cores This is the default mode for running spark-shell and pyspark, intended to allow users running spark for the first time to see the performance benefits of using multiple cores, while not breaking backwards compatibility for users who use "local" mode and expect exactly 1 core. Author: Aaron Davidson Closes #182 from aarondav/110 and squashes the following commits: a88294c [Aaron Davidson] Rebased changes for new spark-shell a9f393e [Aaron Davidson] SPARK-1099: Introduce local[*] mode to infer number of cores --- bin/spark-shell | 4 ++-- core/src/main/scala/org/apache/spark/SparkContext.scala | 9 ++++++--- .../spark/SparkContextSchedulerCreationSuite.scala | 8 ++++++++ docs/python-programming-guide.md | 7 ++++--- docs/scala-programming-guide.md | 5 +++-- python/pyspark/shell.py | 2 +- .../main/scala/org/apache/spark/repl/SparkILoop.scala | 2 +- 7 files changed, 25 insertions(+), 12 deletions(-) diff --git a/bin/spark-shell b/bin/spark-shell index 535ee3ccd8269..ea12d256b23a1 100755 --- a/bin/spark-shell +++ b/bin/spark-shell @@ -34,7 +34,7 @@ set -o posix FWDIR="$(cd `dirname $0`/..; pwd)" SPARK_REPL_OPTS="${SPARK_REPL_OPTS:-""}" -DEFAULT_MASTER="local" +DEFAULT_MASTER="local[*]" MASTER=${MASTER:-""} info_log=0 @@ -64,7 +64,7 @@ ${txtbld}OPTIONS${txtrst}: is followed by m for megabytes or g for gigabytes, e.g. "1g". -dm --driver-memory : The memory used by the Spark Shell, the number is followed by m for megabytes or g for gigabytes, e.g. "1g". - -m --master : A full string that describes the Spark Master, defaults to "local" + -m --master : A full string that describes the Spark Master, defaults to "local[*]" e.g. "spark://localhost:7077". --log-conf : Enables logging of the supplied SparkConf as INFO at start of the Spark Context. diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8382dd44f3484..e5ebd350eeced 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1285,8 +1285,8 @@ object SparkContext extends Logging { /** Creates a task scheduler based on a given master URL. Extracted for testing. */ private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = { - // Regular expression used for local[N] master format - val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r + // Regular expression used for local[N] and local[*] master formats + val LOCAL_N_REGEX = """local\[([0-9\*]+)\]""".r // Regular expression for local[N, maxRetries], used in tests with failing tasks val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r // Regular expression for simulating a Spark cluster of [N, cores, memory] locally @@ -1309,8 +1309,11 @@ object SparkContext extends Logging { scheduler case LOCAL_N_REGEX(threads) => + def localCpuCount = Runtime.getRuntime.availableProcessors() + // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. + val threadCount = if (threads == "*") localCpuCount else threads.toInt val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) - val backend = new LocalBackend(scheduler, threads.toInt) + val backend = new LocalBackend(scheduler, threadCount) scheduler.initialize(backend) scheduler diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index b543471a5d35b..94fba102865b3 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -51,6 +51,14 @@ class SparkContextSchedulerCreationSuite } } + test("local-*") { + val sched = createTaskScheduler("local[*]") + sched.backend match { + case s: LocalBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors()) + case _ => fail() + } + } + test("local-n") { val sched = createTaskScheduler("local[5]") assert(sched.maxTaskFailures === 1) diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md index c2e5327324898..888631e7025b0 100644 --- a/docs/python-programming-guide.md +++ b/docs/python-programming-guide.md @@ -82,15 +82,16 @@ The Python shell can be used explore data interactively and is a simple way to l >>> help(pyspark) # Show all pyspark functions {% endhighlight %} -By default, the `bin/pyspark` shell creates SparkContext that runs applications locally on a single core. -To connect to a non-local cluster, or use multiple cores, set the `MASTER` environment variable. +By default, the `bin/pyspark` shell creates SparkContext that runs applications locally on all of +your machine's logical cores. +To connect to a non-local cluster, or to specify a number of cores, set the `MASTER` environment variable. For example, to use the `bin/pyspark` shell with a [standalone Spark cluster](spark-standalone.html): {% highlight bash %} $ MASTER=spark://IP:PORT ./bin/pyspark {% endhighlight %} -Or, to use four cores on the local machine: +Or, to use exactly four cores on the local machine: {% highlight bash %} $ MASTER=local[4] ./bin/pyspark diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md index 77373890eead7..a07cd2e0a32a2 100644 --- a/docs/scala-programming-guide.md +++ b/docs/scala-programming-guide.md @@ -54,7 +54,7 @@ object for more advanced configuration. The `master` parameter is a string specifying a [Spark or Mesos cluster URL](#master-urls) to connect to, or a special "local" string to run in local mode, as described below. `appName` is a name for your application, which will be shown in the cluster web UI. Finally, the last two parameters are needed to deploy your code to a cluster if running in distributed mode, as described later. -In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `MASTER` environment variable, and you can add JARs to the classpath with the `ADD_JARS` variable. For example, to run `bin/spark-shell` on four cores, use +In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `MASTER` environment variable, and you can add JARs to the classpath with the `ADD_JARS` variable. For example, to run `bin/spark-shell` on exactly four cores, use {% highlight bash %} $ MASTER=local[4] ./bin/spark-shell @@ -74,6 +74,7 @@ The master URL passed to Spark can be in one of the following formats: Master URLMeaning local Run Spark locally with one worker thread (i.e. no parallelism at all). local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine). + local[*] Run Spark locally with as many worker threads as logical cores on your machine. spark://HOST:PORT Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default. @@ -84,7 +85,7 @@ The master URL passed to Spark can be in one of the following formats: -If no master URL is specified, the spark shell defaults to "local". +If no master URL is specified, the spark shell defaults to "local[*]". For running on YARN, Spark launches an instance of the standalone deploy cluster within YARN; see [running on YARN](running-on-yarn.html) for details. diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index 3d779faf1fa44..35e48276e3cb9 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -29,7 +29,7 @@ # this is the equivalent of ADD_JARS add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") != None else None -sc = SparkContext(os.environ.get("MASTER", "local"), "PySparkShell", pyFiles=add_files) +sc = SparkContext(os.environ.get("MASTER", "local[*]"), "PySparkShell", pyFiles=add_files) print """Welcome to ____ __ diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 9b1da195002c2..5a367b6bb79de 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -963,7 +963,7 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter, case Some(m) => m case None => { val prop = System.getenv("MASTER") - if (prop != null) prop else "local" + if (prop != null) prop else "local[*]" } } master