Skip to content

Commit

Permalink
SPARK-1099: Introduce local[*] mode to infer number of cores
Browse files Browse the repository at this point in the history
This is the default mode for running spark-shell and pyspark, intended
to allow users running spark for the first time to see the performance
benefits of using multiple cores, while not breaking backwards
compatibility for users who use "local" mode and expect exactly 1 core.
  • Loading branch information
aarondav committed Apr 6, 2014
1 parent 6e88583 commit a9f393e
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 10 deletions.
9 changes: 6 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1285,8 +1285,8 @@ object SparkContext extends Logging {

/** Creates a task scheduler based on a given master URL. Extracted for testing. */
private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
// Regular expression used for local[N] master format
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
// Regular expression used for local[N] and local[*] master formats
val LOCAL_N_REGEX = """local\[([0-9\*]+)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
Expand All @@ -1309,8 +1309,11 @@ object SparkContext extends Logging {
scheduler

case LOCAL_N_REGEX(threads) =>
def localCpuCount = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(scheduler, threads.toInt)
val backend = new LocalBackend(scheduler, threadCount)
scheduler.initialize(backend)
scheduler

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ class SparkContextSchedulerCreationSuite
}
}

test("local-*") {
val sched = createTaskScheduler("local[*]")
sched.backend match {
case s: LocalBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors())
case _ => fail()
}
}

test("local-n") {
val sched = createTaskScheduler("local[5]")
assert(sched.maxTaskFailures === 1)
Expand Down
7 changes: 4 additions & 3 deletions docs/python-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,16 @@ The Python shell can be used explore data interactively and is a simple way to l
>>> help(pyspark) # Show all pyspark functions
{% endhighlight %}

By default, the `bin/pyspark` shell creates SparkContext that runs applications locally on a single core.
To connect to a non-local cluster, or use multiple cores, set the `MASTER` environment variable.
By default, the `bin/pyspark` shell creates SparkContext that runs applications locally on all of
your machine's logical cores.
To connect to a non-local cluster, or to specify a number of cores, set the `MASTER` environment variable.
For example, to use the `bin/pyspark` shell with a [standalone Spark cluster](spark-standalone.html):

{% highlight bash %}
$ MASTER=spark://IP:PORT ./bin/pyspark
{% endhighlight %}

Or, to use four cores on the local machine:
Or, to use exactly four cores on the local machine:

{% highlight bash %}
$ MASTER=local[4] ./bin/pyspark
Expand Down
5 changes: 3 additions & 2 deletions docs/scala-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object for more advanced configuration.

The `master` parameter is a string specifying a [Spark or Mesos cluster URL](#master-urls) to connect to, or a special "local" string to run in local mode, as described below. `appName` is a name for your application, which will be shown in the cluster web UI. Finally, the last two parameters are needed to deploy your code to a cluster if running in distributed mode, as described later.

In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `MASTER` environment variable, and you can add JARs to the classpath with the `ADD_JARS` variable. For example, to run `bin/spark-shell` on four cores, use
In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `MASTER` environment variable, and you can add JARs to the classpath with the `ADD_JARS` variable. For example, to run `bin/spark-shell` on exactly four cores, use

{% highlight bash %}
$ MASTER=local[4] ./bin/spark-shell
Expand All @@ -74,6 +74,7 @@ The master URL passed to Spark can be in one of the following formats:
<tr><th>Master URL</th><th>Meaning</th></tr>
<tr><td> local </td><td> Run Spark locally with one worker thread (i.e. no parallelism at all). </td></tr>
<tr><td> local[K] </td><td> Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
<tr><td> local[*] </td><td> Run Spark locally with as many worker threads as logical cores on your machine.</td></tr>
</td></tr>
<tr><td> spark://HOST:PORT </td><td> Connect to the given <a href="spark-standalone.html">Spark standalone
cluster</a> master. The port must be whichever one your master is configured to use, which is 7077 by default.
Expand All @@ -84,7 +85,7 @@ The master URL passed to Spark can be in one of the following formats:
</td></tr>
</table>

If no master URL is specified, the spark shell defaults to "local".
If no master URL is specified, the spark shell defaults to "local[*]".

For running on YARN, Spark launches an instance of the standalone deploy cluster within YARN; see [running on YARN](running-on-yarn.html) for details.

Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
# this is the equivalent of ADD_JARS
add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") != None else None

sc = SparkContext(os.environ.get("MASTER", "local"), "PySparkShell", pyFiles=add_files)
sc = SparkContext(os.environ.get("MASTER", "local[*]"), "PySparkShell", pyFiles=add_files)

print """Welcome to
____ __
Expand Down
2 changes: 1 addition & 1 deletion repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
case Some(m) => m
case None => {
val prop = System.getenv("MASTER")
if (prop != null) prop else "local"
if (prop != null) prop else "local[*]"
}
}
master
Expand Down

0 comments on commit a9f393e

Please sign in to comment.