Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set spark.executor.uri from environment variable (needed by Mesos) #311

Closed
wants to merge 1 commit into from

Conversation

ivanwick
Copy link
Contributor

@ivanwick ivanwick commented Apr 3, 2014

The Mesos backend uses this property when setting up a slave process. It is similarly set in the Scala repl (org.apache.spark.repl.SparkILoop), but I couldn't find any analogous for pyspark.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@mateiz
Copy link
Contributor

mateiz commented Apr 4, 2014

Jenkins, test this please.

Good catch!

@mateiz
Copy link
Contributor

mateiz commented Apr 5, 2014

Jenkins, test this please

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13785/

@pwendell
Copy link
Contributor

pwendell commented Apr 5, 2014

@ivanwick what is the symptom when this is not set correctly? If there is an exception or stacktrace it would be helpful to know what it does, so that other people who run into this problem can figure out that this is the fix for it.

@ivanwick
Copy link
Contributor Author

ivanwick commented Apr 7, 2014

This patch fixes a bug with PySpark shell running on Mesos.

Without the spark.executor.uri property, PySpark reports lost tasks because the slave is looking for the spark-executor in the wrong path and can never start it. It logs several "Lost TID" and "Executor lost", while the scheduler re-queues the lost tasks. They again fail for the same reason, finally ending with:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/spark/spark-0.9.0-incubating-bin-cdh4/python/pyspark/rdd.py", line 539, in sum
    return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
  File "/opt/spark/spark-0.9.0-incubating-bin-cdh4/python/pyspark/rdd.py", line 505, in reduce
    vals = self.mapPartitions(func).collect()
  File "/opt/spark/spark-0.9.0-incubating-bin-cdh4/python/pyspark/rdd.py", line 469, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File "/opt/spark/spark-0.9.0-incubating-bin-cdh4/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line 537, in __call__
  File "/opt/spark/spark-0.9.0-incubating-bin-cdh4/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError14/04/05 14:10:48 INFO TaskSetManager: Re-queueing tasks for 201404020012-1174907072-5050-22936-8 from TaskSet 0.0
14/04/05 14:10:48 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
: An error occurred while calling o21.collect.
: org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 4 times (most recent failure: unknown)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The stderr of each slave in the Mesos framework reports:

sh: 1: /opt/spark/spark-0.9.0-incubating-bin-cdh4/sbin/spark-executor: not found

because this path doesn't exist on the slave nodes (this happens to be the path where it's installed on the head node).

When spark.executor.uri is set, as it is with the Scala repl, Mesos is able to download the Spark dist package and run it from the framework temp directory on the slave.

andrewor14 pushed a commit to andrewor14/spark that referenced this pull request Apr 7, 2014
SPARK-991: Report information gleaned from a Python stacktrace in the UI

Scala:

- Added setCallSite/clearCallSite to SparkContext and JavaSparkContext.
  These functions mutate a LocalProperty called "externalCallSite."
- Add a wrapper, getCallSite, that checks for an externalCallSite and, if
  none is found, calls the usual Utils.formatSparkCallSite.
- Change everything that calls Utils.formatSparkCallSite to call
  getCallSite instead. Except getCallSite.
- Add wrappers to setCallSite/clearCallSite wrappers to JavaSparkContext.

Python:

- Add a gruesome hack to rdd.py that inspects the traceback and guesses
  what you want to see in the UI.
- Add a RAII wrapper around said gruesome hack that calls
  setCallSite/clearCallSite as appropriate.
- Wire said RAII wrapper up around three calls into the Scala code.
  I'm not sure that I hit all the spots with the RAII wrapper. I'm also
  not sure that my gruesome hack does exactly what we want.

One could also approach this change by refactoring
runJob/submitJob/runApproximateJob to take a call site, then threading
that parameter through everything that needs to know it.

One might object to the pointless-looking wrappers in JavaSparkContext.
Unfortunately, I can't directly access the SparkContext from
Python---or, if I can, I don't know how---so I need to wrap everything
that matters in JavaSparkContext.

Conflicts:
	core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@mateiz
Copy link
Contributor

mateiz commented Apr 11, 2014

Thanks Ivan, I've merged this in.

@asfgit asfgit closed this in 5cd11d5 Apr 11, 2014
asfgit pushed a commit that referenced this pull request Apr 11, 2014
The Mesos backend uses this property when setting up a slave process.  It is similarly set in the Scala repl (org.apache.spark.repl.SparkILoop), but I couldn't find any analogous for pyspark.

Author: Ivan Wick <[email protected]>

This patch had conflicts when merged, resolved by
Committer: Matei Zaharia <[email protected]>

Closes #311 from ivanwick/master and squashes the following commits:

da0c3e4 [Ivan Wick] Set spark.executor.uri from environment variable (needed by Mesos)

(cherry picked from commit 5cd11d5)
Signed-off-by: Matei Zaharia <[email protected]>
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
The Mesos backend uses this property when setting up a slave process.  It is similarly set in the Scala repl (org.apache.spark.repl.SparkILoop), but I couldn't find any analogous for pyspark.

Author: Ivan Wick <[email protected]>

This patch had conflicts when merged, resolved by
Committer: Matei Zaharia <[email protected]>

Closes apache#311 from ivanwick/master and squashes the following commits:

da0c3e4 [Ivan Wick] Set spark.executor.uri from environment variable (needed by Mesos)
liancheng pushed a commit to liancheng/spark that referenced this pull request Mar 17, 2017
## What changes were proposed in this pull request?

Redshift has no unsigned types (http://docs.aws.amazon.com/redshift/latest/dg/c_Supported_data_types.html), thus they can map to Long and Integer without loss of precision.
This integrates community PR apache#311 (github.com/databricks/spark-redshift/pull/311/), where the user claims that this change has fixed the scenarios where he would get a Decimal instead of a Long.
I was not able to reproduce the user problem, but nevertheless the change removes code that handles unsigned types which actually dead, so it is not hurtful.

## How was this patch tested?

Added tests from the community PR#311

Author: Juliusz Sompolski <[email protected]>

Closes apache#180 from juliuszsompolski/SC-5620.
gatesn pushed a commit to gatesn/spark that referenced this pull request Mar 14, 2018
bzhaoopenstack pushed a commit to bzhaoopenstack/spark that referenced this pull request Sep 11, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants