diff --git a/core/src/main/scala/org/apache/spark/deploy/WebUI.scala b/core/src/main/scala/org/apache/spark/deploy/DeployWebUI.scala similarity index 97% rename from core/src/main/scala/org/apache/spark/deploy/WebUI.scala rename to core/src/main/scala/org/apache/spark/deploy/DeployWebUI.scala index ae258b58b9cc5..254f076f29441 100644 --- a/core/src/main/scala/org/apache/spark/deploy/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployWebUI.scala @@ -42,6 +42,6 @@ private[spark] object DeployWebUI { return "%.0f min".format(minutes) } val hours = minutes / 60 - return "%.1f h".format(hours) + "%.1f h".format(hours) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala b/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala new file mode 100644 index 0000000000000..33fceae4ff489 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import org.apache.spark.ui.{SparkUI, WebUI} + +private[spark] abstract class SparkUIContainer(name: String) extends WebUI(name) { + + /** Attach a SparkUI to this container. Only valid after bind(). */ + def attachUI(ui: SparkUI) { + assert(serverInfo.isDefined, + "%s must be bound to a server before attaching SparkUIs".format(name)) + val rootHandler = serverInfo.get.rootHandler + for (handler <- ui.handlers) { + rootHandler.addHandler(handler) + if (!handler.isStarted) { + handler.start() + } + } + } + + /** Detach a SparkUI from this container. Only valid after bind(). */ + def detachUI(ui: SparkUI) { + assert(serverInfo.isDefined, + "%s must be bound to a server before detaching SparkUIs".format(name)) + val rootHandler = serverInfo.get.rootHandler + for (handler <- ui.handlers) { + if (handler.isStarted) { + handler.stop() + } + rootHandler.removeHandler(handler) + } + } + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala new file mode 100644 index 0000000000000..09f5d63b6c907 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.net.URI +import javax.servlet.http.HttpServletRequest + +import scala.collection.mutable + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.eclipse.jetty.servlet.ServletContextHandler + +import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.deploy.SparkUIContainer +import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.JettyUtils._ +import org.apache.spark.util.Utils +import org.apache.spark.scheduler.ReplayListenerBus + +/** + * A web server that re-renders SparkUIs of finished applications. + * + * For the standalone mode, MasterWebUI already achieves this functionality. Thus, the + * main use case of the HistoryServer is in other deploy modes (e.g. Yarn or Mesos). + * + * The logging directory structure is as follows: Within the given base directory, each + * application's event logs are maintained in the application's own sub-directory. + * + * @param baseLogDir The base directory in which event logs are found + * @param requestedPort The requested port to which this server is to be bound + */ +class HistoryServer(baseLogDir: String, requestedPort: Int, conf: SparkConf) + extends SparkUIContainer("History Server") with Logging { + + private val host = Utils.localHostName() + private val port = requestedPort + private val indexPage = new IndexPage(this) + private val fileSystem = Utils.getHadoopFileSystem(new URI(baseLogDir)) + private val securityManager = new SecurityManager(conf) + + private val handlers = Seq[ServletContextHandler]( + createStaticHandler(HistoryServer.STATIC_RESOURCE_DIR, "/static"), + createServletHandler("/", + (request: HttpServletRequest) => indexPage.render(request), securityMgr = securityManager) + ) + + // A mapping from an event log path to the associated, already rendered, SparkUI + val logPathToUI = mutable.HashMap[String, SparkUI]() + + // A mapping from an event log path to a timestamp of when it was last updated + val logPathToLastUpdated = mutable.HashMap[String, Long]() + + /** Bind to the HTTP server behind this web interface */ + override def bind() { + try { + serverInfo = Some(startJettyServer(host, port, handlers, conf)) + logInfo("Started HistoryServer at http://%s:%d".format(host, boundPort)) + } catch { + case e: Exception => + logError("Failed to create HistoryServer", e) + System.exit(1) + } + checkForLogs() + } + + /** + * Check for any updated event logs. + * + * If a new application is found, render the associated SparkUI and remember it. + * If an existing application is updated, re-render the associated SparkUI. + * If an existing application is removed, remove the associated SparkUI. + */ + def checkForLogs() { + val logStatus = fileSystem.listStatus(new Path(baseLogDir)) + val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() + + // Render any missing or outdated SparkUI + logDirs.foreach { dir => + val path = dir.getPath.toString + val lastUpdated = dir.getModificationTime + if (!logPathToLastUpdated.contains(path) || + logPathToLastUpdated.getOrElse(path, -1L) < lastUpdated) { + maybeRenderUI(path, lastUpdated) + } + } + + // Remove any outdated SparkUIs + val logPaths = logDirs.map(_.getPath.toString) + logPathToUI.keys.foreach { path => + if (!logPaths.contains(path)) { + logPathToUI.remove(path) + logPathToLastUpdated.remove(path) + } + } + + logWarning("By the end of check for logs, the map looks like") + logPathToUI.foreach { case (k, v) => logWarning("* %s".format(k)) } + } + + /** Attempt to render a new SparkUI from event logs residing in the given log directory. */ + def maybeRenderUI(logPath: String, lastUpdated: Long) { + logWarning("Maybe rendering UI %s".format(logPath)) + + val appName = logPath.split("/").last + val replayBus = new ReplayListenerBus(conf) + val ui = new SparkUI(conf, replayBus, appName, "/history/%s".format(appName)) + + // Do not call ui.bind() to avoid creating a new server for each application + ui.start() + val success = replayBus.replay(logPath) + logWarning("Just replayed the events. Successful? %s".format(success)) + if (success) { + attachUI(ui) + logPathToUI(logPath) = ui + logPathToLastUpdated(logPath) = lastUpdated + } + } + +} + +object HistoryServer { + val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR + + def main(argStrings: Array[String]) { + val conf = new SparkConf + val args = new HistoryServerArguments(argStrings, conf) + val server = new HistoryServer(args.logDir, args.port, conf) + server.bind() + + // Wait until the end of the world... or if the HistoryServer process is manually stopped + while(true) { Thread.sleep(Int.MaxValue) } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala new file mode 100644 index 0000000000000..c142b18b94aea --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.net.URI + +import org.apache.spark.SparkConf +import org.apache.spark.util.{Utils, IntParam} +import org.apache.hadoop.fs.Path + +/** + * Command-line parser for the master. + */ +private[spark] class HistoryServerArguments(args: Array[String], conf: SparkConf) { + var port = 18080 + var logDir = "" + + parse(args.toList) + + def parse(args: List[String]): Unit = { + args match { + case ("--port" | "-p") :: IntParam(value) :: tail => + port = value + parse(tail) + + case ("--dir" | "-d") :: value :: tail => + logDir = value + parse(tail) + + case ("--help" | "-h") :: tail => + printUsageAndExit(0) + + case Nil => {} + + case _ => + printUsageAndExit(1) + } + validateLogDir() + } + + def validateLogDir() { + if (logDir == "") { + System.err.println("Logging directory must be specified.") + printUsageAndExit(1) + } + val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) + val path = new Path(logDir) + if (!fileSystem.exists(path) || !fileSystem.getFileStatus(path).isDir) { + System.err.println("Logging directory specified is invalid: %s".format(logDir)) + printUsageAndExit(1) + } + } + + /** + * Print usage and exit JVM with the given exit code. + */ + def printUsageAndExit(exitCode: Int) { + System.err.println( + "Usage: HistoryServer [options]\n" + + "\n" + + "Options:\n" + + " -p PORT, --port PORT Port for web server (default: 18080)\n" + + " -d DIR, --dir DIR Location of event log files") + System.exit(exitCode) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala new file mode 100644 index 0000000000000..2200e41898942 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.deploy.DeployWebUI +import org.apache.spark.deploy.master.ApplicationInfo +import org.apache.spark.ui.UIUtils +import org.apache.spark.util.Utils + +private[spark] class IndexPage(parent: HistoryServer) { + + def render(request: HttpServletRequest): Seq[Node] = { + val content = +
+
+ +
+
+ + UIUtils.basicSparkPage(content, "History Server") + } + + def appRow(app: ApplicationInfo): Seq[Node] = { + + + {app.id} + + + {app.desc.name} + + + {app.coresGranted} + + + {Utils.megabytesToString(app.desc.memoryPerSlave)} + + {DeployWebUI.formatDate(app.submitDate)} + {app.desc.user} + {app.state.toString} + {DeployWebUI.formatDuration(app.duration)} + + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 1fd211416976e..26ecfa406af5b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -671,16 +671,16 @@ private[spark] class Master( appConf.set("spark.eventLog.compress", "true") appConf.set("spark.io.compression.codec", codec) } - val replayerBus = new ReplayListenerBus(appConf) + val replayBus = new ReplayListenerBus(appConf) val ui = new SparkUI( appConf, - replayerBus, + replayBus, "%s (finished)".format(appName), "/history/%s".format(app.id)) // Do not call ui.bind() to avoid creating a new server for each application ui.start() - val success = replayerBus.replay(eventLogDir) + val success = replayBus.replay(eventLogDir) if (!success) { ui.stop() None diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index bd75b2dfd0e07..946eef782936b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -22,8 +22,9 @@ import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.Logging +import org.apache.spark.deploy.SparkUIContainer import org.apache.spark.deploy.master.Master -import org.apache.spark.ui.{ServerInfo, SparkUI} +import org.apache.spark.ui.SparkUI import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{AkkaUtils, Utils} @@ -31,7 +32,9 @@ import org.apache.spark.util.{AkkaUtils, Utils} * Web UI server for the standalone master. */ private[spark] -class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { +class MasterWebUI(val master: Master, requestedPort: Int) + extends SparkUIContainer("MasterWebUI") with Logging { + val masterActorRef = master.self val timeout = AkkaUtils.askTimeout(master.conf) @@ -39,7 +42,6 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { private val port = requestedPort private val applicationPage = new ApplicationPage(this) private val indexPage = new IndexPage(this) - private var serverInfo: Option[ServerInfo] = None private val handlers: Seq[ServletContextHandler] = { master.masterMetricsSystem.getServletHandlers ++ @@ -57,7 +59,8 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { ) } - def bind() { + /** Bind to the HTTP server behind this web interface. */ + override def bind() { try { serverInfo = Some(startJettyServer(host, port, handlers, master.conf)) logInfo("Started Master web UI at http://%s:%d".format(host, boundPort)) @@ -68,36 +71,6 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { } } - def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - - /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */ - def attachUI(ui: SparkUI) { - assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs") - val rootHandler = serverInfo.get.rootHandler - for (handler <- ui.handlers) { - rootHandler.addHandler(handler) - if (!handler.isStarted) { - handler.start() - } - } - } - - /** Detach a reconstructed UI from this Master UI. Only valid after bind(). */ - def detachUI(ui: SparkUI) { - assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs") - val rootHandler = serverInfo.get.rootHandler - for (handler <- ui.handlers) { - if (handler.isStarted) { - handler.stop() - } - rootHandler.removeHandler(handler) - } - } - - def stop() { - assert(serverInfo.isDefined, "Attempted to stop a Master UI that was not bound to a server!") - serverInfo.get.server.stop() - } } private[spark] object MasterWebUI { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index de76a5d5eb7bc..335a891493a46 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -24,7 +24,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.Logging import org.apache.spark.deploy.worker.Worker -import org.apache.spark.ui.{JettyUtils, ServerInfo, SparkUI, UIUtils} +import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{AkkaUtils, Utils} @@ -33,7 +33,7 @@ import org.apache.spark.util.{AkkaUtils, Utils} */ private[spark] class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None) - extends Logging { + extends WebUI("WorkerWebUI") with Logging { val timeout = AkkaUtils.askTimeout(worker.conf) @@ -41,7 +41,6 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I private val port = requestedPort.getOrElse( worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt) private val indexPage = new IndexPage(this) - private var serverInfo: Option[ServerInfo] = None private val handlers: Seq[ServletContextHandler] = { worker.metricsSystem.getServletHandlers ++ @@ -58,9 +57,10 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I ) } - def bind() { + /** Bind to the HTTP server behind this web interface. */ + override def bind() { try { - serverInfo = Some(JettyUtils.startJettyServer(host, port, handlers, worker.conf)) + serverInfo = Some(startJettyServer(host, port, handlers, worker.conf)) logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort)) } catch { case e: Exception => @@ -69,8 +69,6 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I } } - def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - private def log(request: HttpServletRequest): String = { val defaultBytes = 100 * 1024 @@ -187,10 +185,6 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I (startByte, endByte) } - def stop() { - assert(serverInfo.isDefined, "Attempted to stop a Worker UI that was not bound to a server!") - serverInfo.get.server.stop() - } } private[spark] object WorkerWebUI { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index fd638c83aac6e..e49d5b14722d8 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -36,7 +36,7 @@ private[spark] class SparkUI( val listenerBus: SparkListenerBus, val appName: String, val basePath: String = "") - extends Logging { + extends WebUI("SparkUI") with Logging { def this(sc: SparkContext) = this(sc, sc.conf, sc.listenerBus, sc.appName) def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) = @@ -49,7 +49,6 @@ private[spark] class SparkUI( private val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName()) private val port = conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt - private var serverInfo: Option[ServerInfo] = None private val storage = new BlockManagerUI(this) private val jobs = new JobProgressUI(this) @@ -76,20 +75,6 @@ private[spark] class SparkUI( // Maintain executor storage status through Spark events val storageStatusListener = new StorageStatusListener - /** Bind the HTTP server which backs this web interface */ - def bind() { - try { - serverInfo = Some(startJettyServer(host, port, handlers, sc.conf)) - logInfo("Started Spark Web UI at http://%s:%d".format(host, boundPort)) - } catch { - case e: Exception => - logError("Failed to create Spark JettyUtils", e) - System.exit(1) - } - } - - def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - /** Initialize all components of the server */ def start() { storage.start() @@ -105,9 +90,21 @@ private[spark] class SparkUI( listenerBus.addListener(exec.listener) } - def stop() { - assert(serverInfo.isDefined, "Attempted to stop a SparkUI that was not bound to a server!") - serverInfo.get.server.stop() + /** Bind to the HTTP server behind this web interface. */ + override def bind() { + try { + serverInfo = Some(startJettyServer(host, port, handlers, sc.conf)) + logInfo("Started Spark Web UI at http://%s:%d".format(host, boundPort)) + } catch { + case e: Exception => + logError("Failed to create Spark JettyUtils", e) + System.exit(1) + } + } + + /** Stop the server behind this web interface. Only valid after bind(). */ + override def stop() { + super.stop() logInfo("Stopped Spark Web UI at %s".format(appUIAddress)) } diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala new file mode 100644 index 0000000000000..4026f1942d470 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui + +private[spark] abstract class WebUI(name: String) { + protected var serverInfo: Option[ServerInfo] = None + + /** + * Bind to the HTTP server behind this web interface. + * Overridden implementation should set serverInfo. + */ + def bind() { } + + /** Return the actual port to which this server is bound. Only valid after bind(). */ + def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) + + /** Stop the server behind this web interface. Only valid after bind(). */ + def stop() { + assert(serverInfo.isDefined, "Attempted to stop %s before binding to a server!".format(name)) + serverInfo.get.server.stop() + } +} diff --git a/sbin/start-history-server.sh b/sbin/start-history-server.sh new file mode 100755 index 0000000000000..76ca799862e00 --- /dev/null +++ b/sbin/start-history-server.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Starts the history server on the machine this script is executed on. +# +# Usage: start-history-server.sh [] +# Example: ./start-history-server.sh --dir /tmp/spark-events --port 18080 +# + +sbin=`dirname "$0"` +sbin=`cd "$sbin"; pwd` + +if [ $# -lt 1 ]; then + echo "Usage: ./start-history-server.sh []" + echo "Example: ./start-history-server.sh /tmp/spark-events 18080" + exit +fi + +# Set up base event log directory +LOG_DIR=$1 +shift + +# Set up web UI port +if [ ! -z $1 ]; then + PORT=$1 +else + PORT=18080 +fi + +"$sbin"/spark-daemon.sh start org.apache.spark.deploy.history.HistoryServer 1 --dir "$LOG_DIR" --port "$PORT" diff --git a/sbin/stop-history-server.sh b/sbin/stop-history-server.sh new file mode 100755 index 0000000000000..c0034ad641cbe --- /dev/null +++ b/sbin/stop-history-server.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Stops the history server on the machine this script is executed on. + +sbin=`dirname "$0"` +sbin=`cd "$sbin"; pwd` + +"$sbin"/spark-daemon.sh stop org.apache.spark.deploy.history.HistoryServer 1