Skip to content

Commit

Permalink
Merge pull request #556 from CodingCat/JettyUtil. Closes #556.
Browse files Browse the repository at this point in the history
[SPARK-1060] startJettyServer should explicitly use IP information

https://spark-project.atlassian.net/browse/SPARK-1060

In the current implementation, the webserver in Master/Worker is started with

val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)

inside startJettyServer:

val server = new Server(currentPort) //here, the Server will take "0.0.0.0" as the hostname, i.e. will always bind to the IP address of the first NIC

this can cause wrong IP binding, e.g. if the host has two NICs, N1 and N2, the user specify the SPARK_LOCAL_IP as the N2's IP address, however, when starting the web server, for the reason stated above, it will always bind to the N1's address

Author: CodingCat <[email protected]>

== Merge branch commits ==

commit 6c6d9a8ccc9ec4590678a3b34cb03df19092029d
Author: CodingCat <[email protected]>
Date:   Thu Feb 6 14:53:34 2014 -0500

    startJettyServer should explicitly use IP information
  • Loading branch information
CodingCat authored and rxin committed Feb 9, 2014
1 parent 2ef37c9 commit b6dba10
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {

def start() {
try {
val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
val (srv, bPort) = JettyUtils.startJettyServer(host, port, handlers)
server = Some(srv)
boundPort = Some(bPort)
logInfo("Started Master web UI at http://%s:%d".format(host, boundPort.get))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I

def start() {
try {
val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
val (srv, bPort) = JettyUtils.startJettyServer(host, port, handlers)
server = Some(srv)
boundPort = Some(bPort)
logInfo("Started Worker web UI at http://%s:%d".format(host, bPort))
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHa
import org.eclipse.jetty.util.thread.QueuedThreadPool

import org.apache.spark.Logging
import java.net.InetSocketAddress


/** Utilities for launching a web server using Jetty's HTTP Server class */
private[spark] object JettyUtils extends Logging {
// Base type for a function that returns something based on an HTTP request. Allows for
// implicit conversion from many types of functions to jetty Handlers.

type Responder[T] = HttpServletRequest => T

// Conversions from various types of Responder's to jetty Handlers
Expand Down Expand Up @@ -92,12 +94,13 @@ private[spark] object JettyUtils extends Logging {
}

/**
* Attempts to start a Jetty server at the supplied ip:port which uses the supplied handlers.
* Attempts to start a Jetty server at the supplied hostName:port which uses the supplied handlers.
*
* If the desired port number is contented, continues incrementing ports until a free port is
* found. Returns the chosen port and the jetty Server object.
*/
def startJettyServer(ip: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) = {
def startJettyServer(hostName: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) = {

val handlersToRegister = handlers.map { case(path, handler) =>
val contextHandler = new ContextHandler(path)
contextHandler.setHandler(handler)
Expand All @@ -109,7 +112,7 @@ private[spark] object JettyUtils extends Logging {

@tailrec
def connect(currentPort: Int): (Server, Int) = {
val server = new Server(currentPort)
val server = new Server(new InetSocketAddress(hostName, currentPort))
val pool = new QueuedThreadPool
pool.setDaemon(true)
server.setThreadPool(pool)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
/** Bind the HTTP server which backs this web interface */
def bind() {
try {
val (srv, usedPort) = JettyUtils.startJettyServer("0.0.0.0", port, allHandlers)
val (srv, usedPort) = JettyUtils.startJettyServer(host, port, allHandlers)
logInfo("Started Spark Web UI at http://%s:%d".format(host, usedPort))
server = Some(srv)
boundPort = Some(usedPort)
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/org/apache/spark/ui/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ class UISuite extends FunSuite {
case Failure(e) =>
// Either case server port is busy hence setup for test complete
}
val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq())
val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq())
val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq())
val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq())
// Allow some wiggle room in case ports on the machine are under contention
assert(boundPort1 > startPort && boundPort1 < startPort + 10)
assert(boundPort2 > boundPort1 && boundPort2 < boundPort1 + 10)
}

test("jetty binds to port 0 correctly") {
val (jettyServer, boundPort) = JettyUtils.startJettyServer("localhost", 0, Seq())
val (jettyServer, boundPort) = JettyUtils.startJettyServer("0.0.0.0", 0, Seq())
assert(jettyServer.getState === "STARTED")
assert(boundPort != 0)
Try {new ServerSocket(boundPort)} match {
Expand Down

0 comments on commit b6dba10

Please sign in to comment.