Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6028][Core]A new RPC implemetation based on the network module #6457

Closed
wants to merge 36 commits into from
Closed

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented May 28, 2015

@SparkQA
Copy link

SparkQA commented May 28, 2015

Test build #33660 has finished for PR 6457 at commit 2011e94.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MessageLoop extends Runnable
    • class NettyRpcEnvFactory extends RpcEnvFactory with Logging
    • class NettyRpcEndpointRef(@transient conf: SparkConf)
    • class NettyRpcHandler(
    • class KryoSerializationStream(
    • class KryoDeserializationStream(

@SparkQA
Copy link

SparkQA commented May 29, 2015

Test build #33748 has finished for PR 6457 at commit e41f2e8.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MessageLoop extends Runnable
    • class NettyRpcEnvFactory extends RpcEnvFactory with Logging
    • class NettyRpcEndpointRef(@transient conf: SparkConf)
    • class NettyRpcHandler(

@SparkQA
Copy link

SparkQA commented May 30, 2015

Test build #33783 has finished for PR 6457 at commit 91fc944.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MessageLoop extends Runnable
    • class NettyRpcEnvFactory extends RpcEnvFactory with Logging
    • class NettyRpcEndpointRef(@transient conf: SparkConf)
    • class NettyRpcHandler(

@zsxwing
Copy link
Member Author

zsxwing commented May 31, 2015

retest this please

@SparkQA
Copy link

SparkQA commented May 31, 2015

Test build #33850 has finished for PR 6457 at commit 91fc944.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MessageLoop extends Runnable
    • class NettyRpcEnvFactory extends RpcEnvFactory with Logging
    • class NettyRpcEndpointRef(@transient conf: SparkConf)
    • class NettyRpcHandler(

@SparkQA
Copy link

SparkQA commented May 31, 2015

Test build #33852 has finished for PR 6457 at commit 64f1f04.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MessageLoop extends Runnable
    • class NettyRpcEnvFactory extends RpcEnvFactory with Logging
    • class NettyRpcEndpointRef(@transient conf: SparkConf)
    • class NettyRpcHandler(
    • case class SortOrder(child: Expression, direction: SortDirection) extends Expression
    • abstract class BinaryMathExpression(f: (Double, Double) => Double, name: String)

@SparkQA
Copy link

SparkQA commented May 31, 2015

Test build #33854 has finished for PR 6457 at commit 879ecd5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MessageLoop extends Runnable
    • class NettyRpcEnvFactory extends RpcEnvFactory with Logging
    • class NettyRpcEndpointRef(@transient conf: SparkConf)
    • class NettyRpcHandler(

@SparkQA
Copy link

SparkQA commented Jun 6, 2015

Test build #34367 timed out for PR 6457 at commit 1735f2d after a configured wait of 175m.

@SparkQA
Copy link

SparkQA commented Jun 8, 2015

Test build #34450 has finished for PR 6457 at commit 019334a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class MessageLoop extends Runnable
    • class NettyRpcEnvFactory extends RpcEnvFactory with Logging
    • class NettyRpcEndpointRef(@transient conf: SparkConf)
    • class NettyRpcHandler(

@SparkQA
Copy link

SparkQA commented Jun 24, 2015

Test build #35687 has finished for PR 6457 at commit 24aea28.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing zsxwing changed the title [SPARK-6028][Core][WIP] A new RPC implemetation based on the network module [SPARK-6028][Core]A new RPC implemetation based on the network module Jun 29, 2015
@rxin
Copy link
Contributor

rxin commented Jul 21, 2015

@zsxwing can you bring this up to date? I'd like to review this this week.

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #38094 has finished for PR 6457 at commit f905c72.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member Author

zsxwing commented Jul 23, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Jul 23, 2015

Test build #38146 has finished for PR 6457 at commit f905c72.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 23, 2015

Test build #68 has finished for PR 6457 at commit f905c72.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = new NettyRpcAddress(nettyEnv.address.host, nettyEnv.address.port, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
nameToEndpoint.put(name, new RpcEndpointPair(endpoint, endpointRef))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this check whether name is already being used? Just in case.


private val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
"netty-rpc-connection",
conf.getInt("spark.rpc.connect.threads", 256))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a bit excessive, but then it kinda sucks that TransportClientFactory.createClient is blocking. Perhaps we should write that down as a future enhancement.

@vanzin
Copy link
Contributor

vanzin commented Sep 21, 2015

Left some minor feedback. After those are taken care of, this LGTM. @rxin did you mean to also take a look at this?

@rxin
Copy link
Contributor

rxin commented Sep 21, 2015

Will take a look later. But given it is early in the release cycle & size of the patch, I'm also ok with merging it first and do post-hoc reviews & feedback addressing later.

@SparkQA
Copy link

SparkQA commented Sep 22, 2015

Test build #42806 has finished for PR 6457 at commit 1c1ec99.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


override def onSuccess(response: Array[Byte]): Unit = {
val reply = deserialize[AskResponse](response)
if (reply.reply != null && reply.reply.isInstanceOf[RpcFailure]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: null check is redundant.

@vanzin
Copy link
Contributor

vanzin commented Sep 22, 2015

LGTM; the duplicate event issue is very minor and doesn't happen with the default config (where connections per peer is 1), so not super critical to fix now.

@SparkQA
Copy link

SparkQA commented Sep 23, 2015

Test build #42872 has finished for PR 6457 at commit 90de095.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Sep 23, 2015

@zsxwing did you intend to leave "netty" as the default rpc implementation? Just double checking. Let me know and I'll merge this.

@zsxwing
Copy link
Member Author

zsxwing commented Sep 24, 2015

@zsxwing did you intend to leave "netty" as the default rpc implementation? Just double checking. Let me know and I'll merge this.

Yes. Actually it's better to use netty now so as to test it and see if it's stable. We can change it to akka if we find any critical issue before releasing it.

@rxin
Copy link
Contributor

rxin commented Sep 24, 2015

I'm going to merge this and review it post-hoc.

@mengxr
Copy link
Contributor

mengxr commented Sep 24, 2015

@zsxwing @rxin I saw this test failure on Jenkins master. If this happens frequently, we may need to revert this patch. I created https://issues.apache.org/jira/browse/SPARK-10799 for the issue.

https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/3596/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.2,label=spark-test/testReport/junit/org.apache.spark.rpc.netty/InboxSuite/post__multiple_threads/

org.apache.spark.rpc.netty.InboxSuite.post: multiple threads

Failing for the past 1 build (Since Failed#3596 )
Took 9 ms.
add description
Error Message

org.apache.spark.rpc.netty.InboxSuite$$anonfun$3$$anon$1@73986812 was not empty
Stacktrace

sbt.ForkMain$ForkError: org.apache.spark.rpc.netty.InboxSuite$$anonfun$3$$anon$1@73986812 was not empty
    at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
    at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
    at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466)
    at org.apache.spark.rpc.netty.InboxSuite$$anonfun$3.apply$mcV$sp(InboxSuite.scala:94)
    at org.apache.spark.rpc.netty.InboxSuite$$anonfun$3.apply(InboxSuite.scala:66)
    at org.apache.spark.rpc.netty.InboxSuite$$anonfun$3.apply(InboxSuite.scala:66)
    at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
    at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:42)
    at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
    at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
    at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
    at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
    at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
    at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
    at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
    at org.scalatest.Suite$class.run(Suite.scala:1424)
    at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
    at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
    at org.scalatest.FunSuite.run(FunSuite.scala:1555)
    at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462)
    at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671)
    at sbt.ForkMain$Run$2.call(ForkMain.java:294)
    at sbt.ForkMain$Run$2.call(ForkMain.java:284)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

@mengxr
Copy link
Contributor

mengxr commented Sep 24, 2015

Also this might be related: org.apache.spark.deploy.StandaloneDynamicAllocationSuite (https://issues.apache.org/jira/browse/SPARK-10800).

https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-Maven-with-YARN/3622/HADOOP_PROFILE=hadoop-2.4,label=spark-test/testReport/junit/org.apache.spark.deploy/

org.apache.spark.deploy.StandaloneDynamicAllocationSuite.dynamic allocation default behavior

Failing for the past 1 build (Since Failed#3622 )
Took 0.12 sec.
add description
Error Message

1 did not equal 2
Stacktrace

      org.scalatest.exceptions.TestFailedException: 1 did not equal 2
      at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
      at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
      at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466)
      at org.apache.spark.deploy.StandaloneDynamicAllocationSuite$$anonfun$1.apply$mcV$sp(StandaloneDynamicAllocationSuite.scala:78)
      at org.apache.spark.deploy.StandaloneDynamicAllocationSuite$$anonfun$1.apply(StandaloneDynamicAllocationSuite.scala:73)
      at org.apache.spark.deploy.StandaloneDynamicAllocationSuite$$anonfun$1.apply(StandaloneDynamicAllocationSuite.scala:73)
      at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
      at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
      at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
      at org.scalatest.Transformer.apply(Transformer.scala:22)
      at org.scalatest.Transformer.apply(Transformer.scala:20)
      at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
      at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:42)
      at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
      at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
      at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
      at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
      at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
      at org.apache.spark.deploy.StandaloneDynamicAllocationSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(StandaloneDynamicAllocationSuite.scala:33)
      at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255)
      at org.apache.spark.deploy.StandaloneDynamicAllocationSuite.runTest(StandaloneDynamicAllocationSuite.scala:33)
      at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
      at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
      at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
      at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
      at scala.collection.immutable.List.foreach(List.scala:318)
      at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
      at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
      at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
      at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
      at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
      at org.scalatest.Suite$class.run(Suite.scala:1424)
      at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
      at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
      at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
      at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
      at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
      at org.apache.spark.deploy.StandaloneDynamicAllocationSuite.org$scalatest$BeforeAndAfterAll$$super$run(StandaloneDynamicAllocationSuite.scala:33)
      at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
      at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
      at org.apache.spark.deploy.StandaloneDynamicAllocationSuite.run(StandaloneDynamicAllocationSuite.scala:33)
      at org.scalatest.Suite$class.callExecuteOnSuite$1(Suite.scala:1492)
      at org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1528)
      at org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1526)
      at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
      at org.scalatest.Suite$class.runNestedSuites(Suite.scala:1526)
      at org.scalatest.tools.DiscoverySuite.runNestedSuites(DiscoverySuite.scala:29)
      at org.scalatest.Suite$class.run(Suite.scala:1421)
      at org.scalatest.tools.DiscoverySuite.run(DiscoverySuite.scala:29)
      at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
      at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
      at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
      at scala.collection.immutable.List.foreach(List.scala:318)
      at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
      at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
      at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
      at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
      at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
      at org.scalatest.tools.Runner$.main(Runner.scala:860)
      at org.scalatest.tools.Runner.main(Runner.scala)

@zsxwing
Copy link
Member Author

zsxwing commented Sep 24, 2015

@zsxwing @rxin I saw this test failure on Jenkins master. If this happens frequently, we may need to revert this patch. I created https://issues.apache.org/jira/browse/SPARK-10799 for the issue.

I have already sent #8905 to fix this test

@mengxr
Copy link
Contributor

mengxr commented Sep 24, 2015

Actually, the second is worse, which broke most recent master builds.

@zsxwing
Copy link
Member Author

zsxwing commented Sep 24, 2015

@mengxr I saw some race conditions in StandaloneDynamicAllocationSuite after reviewing it. This patch just exposed them. I'm thinking about how to fix them. It's better to remerge this patch after fixing StandaloneDynamicAllocationSuite to avoid breaking the master branch.

@zsxwing
Copy link
Member Author

zsxwing commented Sep 25, 2015

Just realized we don't need to revert it. The new RPC can be disabled...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants