-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-6980] [CORE] Akka timeout exceptions indicate which conf controls them (RPC Layer) #6205
Conversation
ok to test |
} | ||
catch { | ||
case _: Throwable => | ||
RpcTimeout(conf, "spark.network.timeout", "120s") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you change this to catch a much narrower exception?
Or even better -- how about you move this into RpcTimeout.apply
-- have it take a prioritized list of properties to check, and it will check the conf for each of them, avoiding any need for exception handling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the second idea, I'll add that in another commit
@BryanCutler I will be checking out your PR changes and then will be carrying forward the changes which you have made by adding |
@BryanCutler can I get write access to your PR repo ? I want to merge my changes , how do I do it ? |
@hardmettle are you able to checkout my branch, then maybe you could share any changes/additions as a pull-request? |
* @param timeoutPropList prioritized list of property keys for the timeout in seconds | ||
* @param defaultValue default timeout value in seconds if no properties found | ||
*/ | ||
def apply(conf: SparkConf, timeoutPropList: Seq[String], defaultValue: String): RpcTimeout = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating RpcTimeout
from a prioritized list of property keys, from @squito previous request
…p instead of chained functions
@hardmettle I don't think there are any occurrences of |
…y list for RpcEnv exception catch
[SPARK-6980] [CORE] [WIP] Creating wrapper for Akka timeout exceptions to get better information using conf (RPC Layer)
@BryanCutler @hardmettle just checking if there is any update here, let me know when its ready to take another look. Seemed like it was getting close, only "big" thing was what to do with |
@squito I have been banging my head on this . any suggestions for improvements? |
Hey guys, I've been playing around with making the futures fail before calling await, here is a test case that does it: test("Future failure with RpcTimeout") {
class EchoActor extends Actor {
def receive: Receive = {
case msg =>
Thread.sleep(50)
sender() ! msg
}
}
val system = ActorSystem("EchoSystem")
val echoActor = system.actorOf(Props(new EchoActor), name = "echoA")
val timeout = new RpcTimeout(50 millis, "spark.rpc.short.timeout")
val fut = echoActor.ask("hello")(10 millis).mapTo[String].recover {
case te: TimeoutException => throw timeout.amend(te)
}
fut.onFailure {
case te: TimeoutException => println("failed with timeout exception")
}
fut.onComplete {
case Success(str) => println("future success")
case Failure(ex) => println("future failure")
}
println("sleeping")
Thread.sleep(50)
println("Future complete: " + fut.isCompleted.toString() + ", " + fut.value.toString())
println("Caught TimeoutException: " +
intercept[TimeoutException] {
//timeout.awaitResult(fut) // will print RpcTimeout description twice
Await.result(fut, 10 millis)
}.getMessage()
)
} If we use |
@BryanCutler haven't looked at this closely yet, but quick question -- are you talking using this just for |
@squito I was thinking to use this in place of what we have, but actually it wouldn't cover the case where One thing I'm not too sure of though, when using |
Hi @BryanCutler,
for me the behavior seems flip flop somewhere around In any case, the reason I'm bringing this up is that we're sort of in the same scenario with Your suggestion of using I think its worth adding your suggestion as well for those remaining cases -- its a nice trick :). I didn't see any existing uses for those methods, but most likely the resulting phew that was a mouthful. hope this all makes sense |
Hi @squito, |
retest this please |
Test build #36033 timed out for PR 6205 at commit |
…rnSchedulerBackend although it is not being used, and fixed SparkConfSuite UT to not use deprecated RpcUtils functions
Hey @squito , it looks like the last Jenkins test timed out before it even got to RpcEnvSuite, can you trigger the test again? |
@@ -47,14 +47,22 @@ object RpcUtils { | |||
} | |||
|
|||
/** Returns the default Spark timeout to use for RPC ask operations. */ | |||
def askRpcTimeout(conf: SparkConf): RpcTimeout = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private[spark]
Hi @BryanCutler sorry for the delay on my end. Thanks for updating. Unfortunately looks like there are more conflicts with master, looks like you need to merge again. |
No problem, I'll try to resolve these today |
Test build #992 timed out for PR 6205 at commit |
…e to private[spark] and improved deprecation warning msg
Conflicts: core/src/main/scala/org/apache/spark/deploy/Client.scala core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala core/src/main/scala/org/apache/spark/deploy/master/Master.scala core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala
I merged with master, which now mostly calls private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: String)
extends Serializable { And the test was able to pass. Does this seem correct @squito ? |
@BryanCutler yeah making it serializable seems fine. I have a feeling that there is probably some field that could be marked @zsxwing do you want to take one more pass? |
Jenkins, retest this please |
|
@@ -147,7 +146,7 @@ private[spark] object AkkaUtils extends Logging { | |||
def askWithReply[T]( | |||
message: Any, | |||
actor: ActorRef, | |||
timeout: FiniteDuration): T = { | |||
timeout: RpcTimeout): T = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest reverting the changes to AkkaUtils
. These methods you changed won't be called any more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, I disagree. No harm in making these changes. If we're really certain AkkaUtils
wont' be used anymore, then we'll delete it ... but in the meantime, these changes are here just in case.
Maybe it wasn't worth the effort in the first place (my fault!) but I dont' see the harm ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK with the changes. Not a big deal.
LGTM except two minor comments. |
…sleeping, to avoid possible sync issues
Test build #995 has finished for PR 6205 at commit
|
Test build #996 has finished for PR 6205 at commit
|
Test build #997 has finished for PR 6205 at commit
|
merged to master. Thanks for all your hard work on this @BryanCutler! this was probably a lot more work than you initially expected (and me too), but I'm glad you stuck with it, its a great addition. |
Awesome, thanks for all your help @squito! That was a heck of a first Jira, but it was a great experience and I definitely learned a lot. |
Latest changes after refactoring to the RPC layer. I rebased against trunk to make sure to get any recent changes since it had been a while. I wasn't crazy about the name
ConfigureTimeout
andRpcTimeout
seemed to fit better, but I'm open to suggestions!I ran most of the tests and they pass, but others would get stuck with "WARN TaskSchedulerImpl: Initial job has not accepted any resources". I think its just my machine, so I'd though I would push what I have anyway.
Still left to do:
RpcTimeout
ask
andAwait.result
use the same timeout, should we differentiate between these in the TimeoutException message?Await.result
inRpcTimeout
, should we also wrapAwait.ready
?@hardmettle, feel free to help out with any of these!