From 7a66d5d084ba665d1449ec038abc21250fc8d588 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 9 Dec 2014 19:29:09 -0800 Subject: [PATCH 001/361] Config updates for the new shuffle transport. Author: Reynold Xin Closes #3657 from rxin/conf-update and squashes the following commits: 7370eab [Reynold Xin] Config updates for the new shuffle transport. --- .../java/org/apache/spark/network/util/TransportConf.java | 8 ++++---- .../apache/spark/network/sasl/SaslClientBootstrap.java | 2 +- .../spark/network/shuffle/RetryingBlockFetcher.java | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java index f60573998f7ae..13b37f96f8ce2 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -35,14 +35,14 @@ public boolean preferDirectBufs() { return conf.getBoolean("spark.shuffle.io.preferDirectBufs", true); } - /** Connect timeout in secs. Default 120 secs. */ + /** Connect timeout in milliseconds. Default 120 secs. */ public int connectionTimeoutMs() { return conf.getInt("spark.shuffle.io.connectionTimeout", 120) * 1000; } /** Number of concurrent connections between two nodes for fetching data. **/ public int numConnectionsPerPeer() { - return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 2); + return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 1); } /** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. */ @@ -67,7 +67,7 @@ public int numConnectionsPerPeer() { public int sendBuf() { return conf.getInt("spark.shuffle.io.sendBuffer", -1); } /** Timeout for a single round trip of SASL token exchange, in milliseconds. */ - public int saslRTTimeout() { return conf.getInt("spark.shuffle.sasl.timeout", 30000); } + public int saslRTTimeoutMs() { return conf.getInt("spark.shuffle.sasl.timeout", 30) * 1000; } /** * Max number of times we will try IO exceptions (such as connection timeouts) per request. @@ -79,7 +79,7 @@ public int numConnectionsPerPeer() { * Time (in milliseconds) that we will wait in order to perform a retry after an IOException. * Only relevant if maxIORetries > 0. */ - public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); } + public int ioRetryWaitTimeMs() { return conf.getInt("spark.shuffle.io.retryWait", 5) * 1000; } /** * Minimum size of a block that we should start using memory map rather than reading in through diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java index 7bc91e375371f..33aa1344345ff 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java @@ -59,7 +59,7 @@ public void doBootstrap(TransportClient client) { ByteBuf buf = Unpooled.buffer(msg.encodedLength()); msg.encode(buf); - byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeout()); + byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeoutMs()); payload = saslClient.response(response); } } finally { diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java index f8a1a266863bb..4bb0498e5d5aa 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java @@ -106,7 +106,7 @@ public RetryingBlockFetcher( this.fetchStarter = fetchStarter; this.listener = listener; this.maxRetries = conf.maxIORetries(); - this.retryWaitTime = conf.ioRetryWaitTime(); + this.retryWaitTime = conf.ioRetryWaitTimeMs(); this.outstandingBlocksIds = Sets.newLinkedHashSet(); Collections.addAll(outstandingBlocksIds, blockIds); this.currentListener = new RetryingBlockFetchListener(); From 1ce3baf9aa5ed6529b73fd21458f8632bd8971d2 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 9 Dec 2014 23:47:05 -0800 Subject: [PATCH 002/361] [Minor] Use tag for help icon in web UI page header This small commit makes the `(?)` web UI help link into a superscript, which should address feedback that the current design makes it look like an error occurred or like information is missing. Before: ![image](https://cloud.githubusercontent.com/assets/50748/5370611/a3ed0034-7fd9-11e4-870f-05bd9faad5b9.png) After: ![image](https://cloud.githubusercontent.com/assets/50748/5370602/6c5ca8d6-7fd9-11e4-8d1a-568d71290aa7.png) Author: Josh Rosen Closes #3659 from JoshRosen/webui-help-sup and squashes the following commits: bd72899 [Josh Rosen] Use tag for help icon in web UI page header. --- core/src/main/scala/org/apache/spark/ui/UIUtils.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 315327c3c6b7c..d970fa30c1c35 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -181,7 +181,9 @@ private[spark] object UIUtils extends Logging { } val helpButton: Seq[Node] = helpText.map { helpText => - (?) + + (?) + }.getOrElse(Seq.empty) From 40c88b64c710d9ddd57135cedce7c492dee38f2d Mon Sep 17 00:00:00 2001 From: Nathan Kronenfeld Date: Tue, 9 Dec 2014 23:53:17 -0800 Subject: [PATCH 003/361] [SPARK-4772] Clear local copies of accumulators as soon as we're done with them Accumulators keep thread-local copies of themselves. These copies were only cleared at the beginning of a task. This meant that (a) the memory they used was tied up until the next task ran on that thread, and (b) if a thread died, the memory it had used for accumulators was locked up forever on that worker. This PR clears the thread-local copies of accumulators at the end of each task, in the tasks finally block, to make sure they are cleaned up between tasks. It also stores them in a ThreadLocal object, so that if, for some reason, the thread dies, any memory they are using at the time should be freed up. Author: Nathan Kronenfeld Closes #3570 from nkronenfeld/Accumulator-Improvements and squashes the following commits: a581f3f [Nathan Kronenfeld] Change Accumulators to private[spark] instead of adding mima exclude to get around false positive in mima tests b6c2180 [Nathan Kronenfeld] Include MiMa exclude as per build error instructions - this version incompatibility should be irrelevent, as it will only surface if a master is talking to a worker running a different version of spark. 537baad [Nathan Kronenfeld] Fuller refactoring as intended, incorporating JR's suggestions for ThreadLocal localAccums, and keeping clear(), but also calling it in tasks' finally block, rather than just at the beginning of the task. 39a82f2 [Nathan Kronenfeld] Clear local copies of accumulators as soon as we're done with them --- .../main/scala/org/apache/spark/Accumulators.scala | 14 ++++++++------ .../scala/org/apache/spark/executor/Executor.scala | 3 ++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 000bbd6b532ad..5f31bfba3f8d6 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -19,6 +19,7 @@ package org.apache.spark import java.io.{ObjectInputStream, Serializable} import java.util.concurrent.atomic.AtomicLong +import java.lang.ThreadLocal import scala.collection.generic.Growable import scala.collection.mutable.Map @@ -278,10 +279,12 @@ object AccumulatorParam { // TODO: The multi-thread support in accumulators is kind of lame; check // if there's a more intuitive way of doing it right -private object Accumulators { +private[spark] object Accumulators { // TODO: Use soft references? => need to make readObject work properly then val originals = Map[Long, Accumulable[_, _]]() - val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]() + val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { + override protected def initialValue() = Map[Long, Accumulable[_, _]]() + } var lastId: Long = 0 def newId(): Long = synchronized { @@ -293,22 +296,21 @@ private object Accumulators { if (original) { originals(a.id) = a } else { - val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map()) - accums(a.id) = a + localAccums.get()(a.id) = a } } // Clear the local (non-original) accumulators for the current thread def clear() { synchronized { - localAccums.remove(Thread.currentThread) + localAccums.get.clear } } // Get the values of the local accumulators for the current thread (by ID) def values: Map[Long, Any] = synchronized { val ret = Map[Long, Any]() - for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) { + for ((id, accum) <- localAccums.get) { ret(id) = accum.localValue } return ret diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 835157fc520aa..52de6980ecbf8 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -172,7 +172,6 @@ private[spark] class Executor( val startGCTime = gcTime try { - Accumulators.clear() val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) @@ -278,6 +277,8 @@ private[spark] class Executor( env.shuffleMemoryManager.releaseMemoryForThisThread() // Release memory used by this thread for unrolling blocks env.blockManager.memoryStore.releaseUnrollMemoryForThisThread() + // Release memory used by this thread for accumulators + Accumulators.clear() runningTasks.remove(taskId) } } From 11c10e0fd9d4619f9f348cbaa6df9a50d2f38cbd Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Wed, 10 Dec 2014 12:24:04 -0800 Subject: [PATCH 004/361] [SPARK-4161]Spark shell class path is not correctly set if "spark.driver.extraClassPath" is set in defaults.conf Author: GuoQiang Li Closes #3050 from witgo/SPARK-4161 and squashes the following commits: abb6fa4 [GuoQiang Li] move usejavacp opt to spark-shell 89e39e7 [GuoQiang Li] review commit c2a6f04 [GuoQiang Li] Spark shell class path is not correctly set if "spark.driver.extraClassPath" is set in defaults.conf --- bin/spark-shell | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/bin/spark-shell b/bin/spark-shell index 4a0670fc6c8aa..cca5aa0676123 100755 --- a/bin/spark-shell +++ b/bin/spark-shell @@ -45,6 +45,13 @@ source "$FWDIR"/bin/utils.sh SUBMIT_USAGE_FUNCTION=usage gatherSparkSubmitOpts "$@" +# SPARK-4161: scala does not assume use of the java classpath, +# so we need to add the "-Dscala.usejavacp=true" flag mnually. We +# do this specifically for the Spark shell because the scala REPL +# has its own class loader, and any additional classpath specified +# through spark.driver.extraClassPath is not automatically propagated. +SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true" + function main() { if $cygwin; then # Workaround for issue involving JLine and Cygwin From 639351252a9382c7972689f4e53a9ecb7e787d60 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 10 Dec 2014 12:29:00 -0800 Subject: [PATCH 005/361] [SPARK-4329][WebUI] HistoryPage pagenation Current HistoryPage have links only to previous page or next page. I suggest to add index to access history pages easily. I implemented like following pics. If there are many pages, current page +/- N pages, head page and last page are indexed. ![2014-11-10 16 13 25](https://cloud.githubusercontent.com/assets/4736016/4986246/9c7bbac4-6937-11e4-8695-8634d039d5b6.png) ![2014-11-10 16 03 21](https://cloud.githubusercontent.com/assets/4736016/4986210/3951bb74-6937-11e4-8b4e-9f90d266d736.png) ![2014-11-10 16 03 39](https://cloud.githubusercontent.com/assets/4736016/4986211/3b196ad8-6937-11e4-9f81-74bc0a6dad5b.png) ![2014-11-10 16 03 49](https://cloud.githubusercontent.com/assets/4736016/4986213/40686138-6937-11e4-86c0-41100f0404f6.png) ![2014-11-10 16 04 04](https://cloud.githubusercontent.com/assets/4736016/4986215/4326c9b4-6937-11e4-87ac-0f30c86ec6e3.png) Author: Kousuke Saruta Closes #3194 from sarutak/history-page-indexing and squashes the following commits: 15d3d2d [Kousuke Saruta] Simplified code c93932e [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into history-page-indexing 1c2f605 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into history-page-indexing 76b05e3 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into history-page-indexing b2240f8 [Kousuke Saruta] Fixed style ec7922e [Kousuke Saruta] Simplified code 755a004 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into history-page-indexing cfa242b [Kousuke Saruta] Added index to HistoryPage --- .../spark/deploy/history/HistoryPage.scala | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 5fdc350cd8512..0d5dcfb1ddffe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -26,6 +26,7 @@ import org.apache.spark.ui.{WebUIPage, UIUtils} private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { private val pageSize = 20 + private val plusOrMinus = 2 def render(request: HttpServletRequest): Seq[Node] = { val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt @@ -39,6 +40,9 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { val last = Math.min(actualFirst + pageSize, allApps.size) - 1 val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0) + val secondPageFromLeft = 2 + val secondPageFromRight = pageCount - 1 + val appTable = UIUtils.listingTable(appHeader, appRow, apps) val providerConfig = parent.getProviderConfig() val content = @@ -48,13 +52,38 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { {providerConfig.map { case (k, v) =>
  • {k}: {v}
  • }} { + // This displays the indices of pages that are within `plusOrMinus` pages of + // the current page. Regardless of where the current page is, this also links + // to the first and last page. If the current page +/- `plusOrMinus` is greater + // than the 2nd page from the first page or less than the 2nd page from the last + // page, `...` will be displayed. if (allApps.size > 0) { + val leftSideIndices = + rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _) + val rightSideIndices = + rangeIndices(actualPage + 1 to actualPage + plusOrMinus, _ < pageCount) +

    Showing {actualFirst + 1}-{last + 1} of {allApps.size} - - {if (actualPage > 1) <} - {if (actualPage < pageCount) >} - + + { + if (actualPage > 1) { + < + 1 + } + } + {if (actualPage - plusOrMinus > secondPageFromLeft) " ... "} + {leftSideIndices} + {actualPage} + {rightSideIndices} + {if (actualPage + plusOrMinus < secondPageFromRight) " ... "} + { + if (actualPage < pageCount) { + {pageCount} + > + } + } +

    ++ appTable } else { @@ -81,6 +110,10 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { "Spark User", "Last Updated") + private def rangeIndices(range: Seq[Int], condition: Int => Boolean): Seq[Node] = { + range.filter(condition).map(nextPage => {nextPage} ) + } + private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" val startTime = UIUtils.formatDate(info.startTime) From 0d8bbbedb3ea5073bb4eae778c64c0ca52cb2051 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 10 Dec 2014 12:41:36 -0800 Subject: [PATCH 006/361] [SPARK-4771][Docs] Document standalone cluster supervise mode tdas looks like streaming already refers to the supervise mode. The link from there is broken though. Author: Andrew Or Closes #3627 from andrewor14/document-supervise and squashes the following commits: 9ca0908 [Andrew Or] Wording changes 2b55ed2 [Andrew Or] Document standalone cluster supervise mode --- docs/spark-standalone.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index ae7b81d5bb71f..5c6084fb46255 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -257,7 +257,7 @@ To run an interactive Spark shell against the cluster, run the following command You can also pass an option `--total-executor-cores ` to control the number of cores that spark-shell uses on the cluster. -# Launching Compiled Spark Applications +# Launching Spark Applications The [`spark-submit` script](submitting-applications.html) provides the most straightforward way to submit a compiled Spark application to the cluster. For standalone clusters, Spark currently @@ -272,6 +272,15 @@ should specify them through the `--jars` flag using comma as a delimiter (e.g. ` To control the application's configuration or execution environment, see [Spark Configuration](configuration.html). +Additionally, standalone `cluster` mode supports restarting your application automatically if it +exited with non-zero exit code. To use this feature, you may pass in the `--supervise` flag to +`spark-submit` when launching your application. Then, if you wish to kill an application that is +failing repeatedly, you may do so through: + + ./bin/spark-class org.apache.spark.deploy.Client kill + +You can find the driver ID through the standalone Master web UI at `http://:8080`. + # Resource Scheduling The standalone cluster mode currently only supports a simple FIFO scheduler across applications. From 35528f48052b7a3f2afcf8dd4b1c2900b886e6d1 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 10 Dec 2014 12:48:24 -0800 Subject: [PATCH 007/361] [SPARK-4215] Allow requesting / killing executors only in YARN mode Currently this doesn't do anything in other modes, so we might as well just disable it rather than having the user mistakenly rely on it. Author: Andrew Or Closes #3615 from andrewor14/dynamic-allocation-yarn-only and squashes the following commits: ce6487a [Andrew Or] Allow requesting / killing executors only in YARN mode --- .../src/main/scala/org/apache/spark/SparkContext.scala | 10 +++++++++- .../apache/spark/ExecutorAllocationManagerSuite.scala | 1 + 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index aded7c12e274e..8e5378ecc08de 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -357,8 +357,12 @@ class SparkContext(config: SparkConf) extends Logging { } // Optionally scale number of executors dynamically based on workload. Exposed for testing. + private val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false) + private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false) private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] = - if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { + if (dynamicAllocationEnabled) { + assert(master.contains("yarn") || dynamicAllocationTesting, + "Dynamic allocation of executors is currently only supported in YARN mode") Some(new ExecutorAllocationManager(this)) } else { None @@ -989,6 +993,8 @@ class SparkContext(config: SparkConf) extends Logging { */ @DeveloperApi def requestExecutors(numAdditionalExecutors: Int): Boolean = { + assert(master.contains("yarn") || dynamicAllocationTesting, + "Requesting executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors) @@ -1005,6 +1011,8 @@ class SparkContext(config: SparkConf) extends Logging { */ @DeveloperApi def killExecutors(executorIds: Seq[String]): Boolean = { + assert(master.contains("yarn") || dynamicAllocationTesting, + "Killing executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index ce804f94f3267..c817f6dcede75 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -35,6 +35,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { .setMaster("local") .setAppName("test-executor-allocation-manager") .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") intercept[SparkException] { new SparkContext(conf) } SparkEnv.get.stop() // cleanup the created environment SparkContext.clearActiveContext() From a0848fdfdcde5a2fc172accf7d01b0b7e20cd05d Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Wed, 10 Dec 2014 13:29:27 -0800 Subject: [PATCH 008/361] [SPARK-4793] [Deploy] ensure .jar at end of line sometimes I switch between different version and do not want to rebuild spark, so I rename assembly.jar into .jar.bak, but still caught by `compute-classpath.sh` Author: Daoyuan Wang Closes #3641 from adrian-wang/jar and squashes the following commits: 45cbfd0 [Daoyuan Wang] ensure .jar at end of line --- bin/compute-classpath.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 298641f2684de..685051eeed9f1 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -68,14 +68,14 @@ else assembly_folder="$ASSEMBLY_DIR" fi -num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc -l)" +num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar$" | wc -l)" if [ "$num_jars" -eq "0" ]; then echo "Failed to find Spark assembly in $assembly_folder" echo "You need to build Spark before running this program." exit 1 fi if [ "$num_jars" -gt "1" ]; then - jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar") + jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar$") echo "Found multiple Spark assembly jars in $assembly_folder:" echo "$jars_list" echo "Please remove all but one jar." @@ -108,7 +108,7 @@ else datanucleus_dir="$FWDIR"/lib_managed/jars fi -datanucleus_jars="$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar")" +datanucleus_jars="$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar$")" datanucleus_jars="$(echo "$datanucleus_jars" | tr "\n" : | sed s/:$//g)" if [ -n "$datanucleus_jars" ]; then From 8df58cdb775e38da55920bcc893e76833f7afec4 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Wed, 10 Dec 2014 14:19:37 -0800 Subject: [PATCH 009/361] [SPARK-4569] Rename 'externalSorting' in Aggregator Hi all - I've renamed the unhelpfully named variable and added a comment clarifying what's actually happening. Author: Ilya Ganelin Closes #3666 from ilganeli/SPARK-4569B and squashes the following commits: 1810394 [Ilya Ganelin] [SPARK-4569] Rename 'externalSorting' in Aggregator e2d2092 [Ilya Ganelin] [SPARK-4569] Rename 'externalSorting' in Aggregator d7cefec [Ilya Ganelin] [SPARK-4569] Rename 'externalSorting' in Aggregator 5b3f39c [Ilya Ganelin] [SPARK-4569] Rename in Aggregator --- core/src/main/scala/org/apache/spark/Aggregator.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 79c9c451d273d..09eb9605fb799 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -34,7 +34,9 @@ case class Aggregator[K, V, C] ( mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { - private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true) + // When spilling is enabled sorting will happen externally, but not necessarily with an + // ExternalSorter. + private val isSpillEnabled = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true) @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0") def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] = @@ -42,7 +44,7 @@ case class Aggregator[K, V, C] ( def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = { - if (!externalSorting) { + if (!isSpillEnabled) { val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { @@ -71,9 +73,9 @@ case class Aggregator[K, V, C] ( combineCombinersByKey(iter, null) def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext) - : Iterator[(K, C)] = + : Iterator[(K, C)] = { - if (!externalSorting) { + if (!isSpillEnabled) { val combiners = new AppendOnlyMap[K,C] var kc: Product2[K, C] = null val update = (hadValue: Boolean, oldValue: C) => { From 387bfa2090db7a8c6a91fe5a8e9c1277c7395a3e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 10 Dec 2014 14:27:53 -0800 Subject: [PATCH 010/361] [SPARK-4759] Fix driver hanging from coalescing partitions The driver hangs sometimes when we coalesce RDD partitions. See JIRA for more details and reproduction. This is because our use of empty string as default preferred location in `CoalescedRDDPartition` causes the `TaskSetManager` to schedule the corresponding task on host `""` (empty string). The intended semantics here, however, is that the partition does not have a preferred location, and the TSM should schedule the corresponding task accordingly. Author: Andrew Or Closes #3633 from andrewor14/coalesce-preferred-loc and squashes the following commits: e520d6b [Andrew Or] Oops 3ebf8bd [Andrew Or] A few comments f370a4e [Andrew Or] Fix tests 2f7dfb6 [Andrew Or] Avoid using empty string as default preferred location --- .../org/apache/spark/rdd/CoalescedRDD.scala | 36 +++++++++++-------- .../scala/org/apache/spark/rdd/RDDSuite.scala | 2 +- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 9fab1d78abb04..b073eba8a1574 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -35,11 +35,10 @@ import org.apache.spark.util.Utils * @param preferredLocation the preferred location for this partition */ private[spark] case class CoalescedRDDPartition( - index: Int, - @transient rdd: RDD[_], - parentsIndices: Array[Int], - @transient preferredLocation: String = "" - ) extends Partition { + index: Int, + @transient rdd: RDD[_], + parentsIndices: Array[Int], + @transient preferredLocation: Option[String] = None) extends Partition { var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_)) @throws(classOf[IOException]) @@ -55,9 +54,10 @@ private[spark] case class CoalescedRDDPartition( * @return locality of this coalesced partition between 0 and 1 */ def localFraction: Double = { - val loc = parents.count(p => - rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation)) - + val loc = parents.count { p => + val parentPreferredLocations = rdd.context.getPreferredLocs(rdd, p.index).map(_.host) + preferredLocation.exists(parentPreferredLocations.contains) + } if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble) } } @@ -73,9 +73,9 @@ private[spark] case class CoalescedRDDPartition( * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance */ private[spark] class CoalescedRDD[T: ClassTag]( - @transient var prev: RDD[T], - maxPartitions: Int, - balanceSlack: Double = 0.10) + @transient var prev: RDD[T], + maxPartitions: Int, + balanceSlack: Double = 0.10) extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies override def getPartitions: Array[Partition] = { @@ -113,7 +113,7 @@ private[spark] class CoalescedRDD[T: ClassTag]( * @return the machine most preferred by split */ override def getPreferredLocations(partition: Partition): Seq[String] = { - List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation) + partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq } } @@ -147,7 +147,7 @@ private[spark] class CoalescedRDD[T: ClassTag]( * */ -private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { +private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean = @@ -341,8 +341,14 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc } } -private[spark] case class PartitionGroup(prefLoc: String = "") { +private case class PartitionGroup(prefLoc: Option[String] = None) { var arr = mutable.ArrayBuffer[Partition]() - def size = arr.size } + +private object PartitionGroup { + def apply(prefLoc: String): PartitionGroup = { + require(prefLoc != "", "Preferred location must not be empty") + PartitionGroup(Some(prefLoc)) + } +} diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 46fcb80fa1845..6836e9ab0fd6b 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -294,7 +294,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("coalesced RDDs with locality") { val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b")))) val coal3 = data3.coalesce(3) - val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation) + val list3 = coal3.partitions.flatMap(_.asInstanceOf[CoalescedRDDPartition].preferredLocation) assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped") // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5 From 179ff321471b8bbf551bed34fdee3de8fd5528ec Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Wed, 10 Dec 2014 15:01:15 -0800 Subject: [PATCH 011/361] SPARK-3526 Add section about data locality to the tuning guide cc kayousterhout I have a few outstanding questions from compiling this documentation: - What's the difference between NO_PREF and ANY? I understand the implications of the ordering but don't know what an example of each would be - Why is NO_PREF ahead of RACK_LOCAL? I would think it'd be better to schedule rack-local tasks ahead of no preference if you could only do one or the other. Is the idea to wait longer and hope for the rack-local tasks to turn into node-local or better? - Will there be a datacenter-local locality level in the future? Apache Cassandra for example has this level Author: Andrew Ash Closes #2519 from ash211/SPARK-3526 and squashes the following commits: 44cff28 [Andrew Ash] Link to spark.locality parameters rather than copying the list 6d5d966 [Andrew Ash] Stay focused on Spark, no astronaut architecture mumbo-jumbo 20e0e31 [Andrew Ash] SPARK-3526 Add section about data locality to the tuning guide --- docs/tuning.md | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/docs/tuning.md b/docs/tuning.md index 0e2447dd46394..c4ca766328c1e 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -233,6 +233,39 @@ Spark prints the serialized size of each task on the master, so you can look at decide whether your tasks are too large; in general tasks larger than about 20 KB are probably worth optimizing. +## Data Locality + +Data locality can have a major impact on the performance of Spark jobs. If data and the code that +operates on it are together than computation tends to be fast. But if code and data are separated, +one must move to the other. Typically it is faster to ship serialized code from place to place than +a chunk of data because code size is much smaller than data. Spark builds its scheduling around +this general principle of data locality. + +Data locality is how close data is to the code processing it. There are several levels of +locality based on the data's current location. In order from closest to farthest: + +- `PROCESS_LOCAL` data is in the same JVM as the running code. This is the best locality + possible +- `NODE_LOCAL` data is on the same node. Examples might be in HDFS on the same node, or in + another executor on the same node. This is a little slower than `PROCESS_LOCAL` because the data + has to travel between processes +- `NO_PREF` data is accessed equally quickly from anywhere and has no locality preference +- `RACK_LOCAL` data is on the same rack of servers. Data is on a different server on the same rack + so needs to be sent over the network, typically through a single switch +- `ANY` data is elsewhere on the network and not in the same rack + +Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In +situations where there is no unprocessed data on any idle executor, Spark switches to lower locality +levels. There are two options: a) wait until a busy CPU frees up to start a task on data on the same +server, or b) immediately start a new task in a farther away place that requires moving data there. + +What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout +expires, it starts moving the data from far away to the free CPU. The wait timeout for fallback +between each level can be configured individually or all together in one parameter; see the +`spark.locality` parameters on the [configuration page](configuration.html#scheduling) for details. +You should increase these settings if your tasks are long and see poor locality, but the default +usually works well. + # Summary This has been a short guide to point out the main concerns you should know about when tuning a From 9d0e843f46971e3bc353a5164634e027a7eeb5b1 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Wed, 10 Dec 2014 20:44:59 -0800 Subject: [PATCH 012/361] [CORE]codeStyle: uniform ConcurrentHashMap define in StorageLevel.scala with other places Author: Zhang, Liye Closes #2793 from liyezhang556520/uniformHashMap and squashes the following commits: 5884735 [Zhang, Liye] [CORE]codeStyle: uniform ConcurrentHashMap define in StorageLevel.scala --- .../main/scala/org/apache/spark/storage/StorageLevel.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 56edc4fe2e4ad..e5e1cf5a69a19 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -18,6 +18,7 @@ package org.apache.spark.storage import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} +import java.util.concurrent.ConcurrentHashMap import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils @@ -220,8 +221,7 @@ object StorageLevel { getCachedStorageLevel(obj) } - private[spark] val storageLevelCache = - new java.util.concurrent.ConcurrentHashMap[StorageLevel, StorageLevel]() + private[spark] val storageLevelCache = new ConcurrentHashMap[StorageLevel, StorageLevel]() private[spark] def getCachedStorageLevel(level: StorageLevel): StorageLevel = { storageLevelCache.putIfAbsent(level, level) From f0b431ebb91bd2f12f4b7bbe9a64effc66db52dc Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Wed, 10 Dec 2014 23:41:15 -0800 Subject: [PATCH 013/361] [SPARK-4791] [sql] Infer schema from case class with multiple constructors Modified ScalaReflection.schemaFor to take primary constructor of Product when there are multiple constructors. Added test to suite which failed before but works now. Needed for [https://github.com/apache/spark/pull/3637] CC: marmbrus Author: Joseph K. Bradley Closes #3646 from jkbradley/sql-reflection and squashes the following commits: 796b2e4 [Joseph K. Bradley] Modified ScalaReflection.schemaFor to take primary constructor of Product when there are multiple constructors. Added test to suite which failed before but works now. --- .../spark/sql/catalyst/ScalaReflection.scala | 14 +++++++++++++- .../spark/sql/catalyst/ScalaReflectionSuite.scala | 14 ++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 71034c2c43c77..2cf241de61f7a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -118,7 +118,19 @@ trait ScalaReflection { case t if t <:< typeOf[Product] => val formalTypeArgs = t.typeSymbol.asClass.typeParams val TypeRef(_, _, actualTypeArgs) = t - val params = t.member(nme.CONSTRUCTOR).asMethod.paramss + val constructorSymbol = t.member(nme.CONSTRUCTOR) + val params = if (constructorSymbol.isMethod) { + constructorSymbol.asMethod.paramss + } else { + // Find the primary constructor, and use its parameter ordering. + val primaryConstructorSymbol: Option[Symbol] = constructorSymbol.asTerm.alternatives.find( + s => s.isMethod && s.asMethod.isPrimaryConstructor) + if (primaryConstructorSymbol.isEmpty) { + sys.error("Internal SQL error: Product object did not have a primary constructor.") + } else { + primaryConstructorSymbol.get.asMethod.paramss + } + } Schema(StructType( params.head.map { p => val Schema(dataType, nullable) = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala index ddc3d44869c98..7be24bea7d5a6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala @@ -68,6 +68,10 @@ case class ComplexData( case class GenericData[A]( genericField: A) +case class MultipleConstructorsData(a: Int, b: String, c: Double) { + def this(b: String, a: Int) = this(a, b, c = 1.0) +} + class ScalaReflectionSuite extends FunSuite { import ScalaReflection._ @@ -253,4 +257,14 @@ class ScalaReflectionSuite extends FunSuite { Row(1, 1, 1, 1, 1, 1, true)) assert(convertToCatalyst(data, dataType) === convertedData) } + + test("infer schema from case class with multiple constructors") { + val dataType = schemaFor[MultipleConstructorsData].dataType + dataType match { + case s: StructType => + // Schema should have order: a: Int, b: String, c: Double + assert(s.fieldNames === Seq("a", "b", "c")) + assert(s.fields.map(_.dataType) === Seq(IntegerType, StringType, DoubleType)) + } + } } From 4ce74685fdc4af5eb0e403d3abbbb3fdd9f25297 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 11 Dec 2014 06:21:23 -0800 Subject: [PATCH 014/361] [SPARK-4806] Streaming doc update for 1.2 Important updates to the streaming programming guide - Make the fault-tolerance properties easier to understand, with information about write ahead logs - Update the information about deploying the spark streaming app with information about Driver HA - Update Receiver guide to discuss reliable vs unreliable receivers. Author: Tathagata Das Author: Josh Rosen Author: Josh Rosen Closes #3653 from tdas/streaming-doc-update-1.2 and squashes the following commits: f53154a [Tathagata Das] Addressed Josh's comments. ce299e4 [Tathagata Das] Minor update. ca19078 [Tathagata Das] Minor change f746951 [Tathagata Das] Mentioned performance problem with WAL 7787209 [Tathagata Das] Merge branch 'streaming-doc-update-1.2' of github.com:tdas/spark into streaming-doc-update-1.2 2184729 [Tathagata Das] Updated Kafka and Flume guides with reliability information. 2f3178c [Tathagata Das] Added more information about writing reliable receivers in the custom receiver guide. 91aa5aa [Tathagata Das] Improved API Docs menu 5707581 [Tathagata Das] Added Pythn API badge b9c8c24 [Tathagata Das] Merge pull request #26 from JoshRosen/streaming-programming-guide b8c8382 [Josh Rosen] minor fixes a4ef126 [Josh Rosen] Restructure parts of the fault-tolerance section to read a bit nicer when skipping over the headings 65f66cd [Josh Rosen] Fix broken link to fault-tolerance semantics section. f015397 [Josh Rosen] Minor grammar / pluralization fixes. 3019f3a [Josh Rosen] Fix minor Markdown formatting issues aa8bb87 [Tathagata Das] Small update. 195852c [Tathagata Das] Updated based on Josh's comments, updated receiver reliability and deploying section, and also updated configuration. 17b99fb [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-doc-update-1.2 a0217c0 [Tathagata Das] Changed Deploying menu layout 67fcffc [Tathagata Das] Added cluster mode + supervise example to submitting application guide. e45453b [Tathagata Das] Update streaming guide, added deploying section. 192c7a7 [Tathagata Das] Added more info about Python API, and rewrote the checkpointing section. --- docs/_layouts/global.html | 13 +- docs/configuration.md | 133 ++-- docs/streaming-custom-receivers.md | 90 ++- docs/streaming-flume-integration.md | 13 +- docs/streaming-kafka-integration.md | 17 + docs/streaming-programming-guide.md | 1068 +++++++++++++++------------ docs/submitting-applications.md | 36 +- 7 files changed, 819 insertions(+), 551 deletions(-) diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index 627ed37de4a9c..8841f7675d35e 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -33,7 +33,7 @@