{driver.id} |
{driver.submitDate} |
- {driver.worker.map(w => {w.id.toString}).getOrElse("None")} |
+ {driver.worker.map(w => {w.id.toString}).getOrElse("None")}
+ |
{driver.state} |
{driver.desc.cores}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index ead35662fc75a..5ab13e7aa6b1f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -18,6 +18,7 @@
package org.apache.spark.deploy.master.ui
import javax.servlet.http.HttpServletRequest
+
import org.eclipse.jetty.server.{Handler, Server}
import org.apache.spark.Logging
@@ -45,7 +46,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
def start() {
try {
- val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+ val (srv, bPort) = JettyUtils.startJettyServer(host, port, handlers)
server = Some(srv)
boundPort = Some(bPort)
logInfo("Started Master web UI at http://%s:%d".format(host, boundPort.get))
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 460883ec7ae24..0c761dfc93a1f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.worker
-import java.io.{File, FileOutputStream, IOException, InputStream}
+import java.io.{File, FileOutputStream, InputStream, IOException}
import java.lang.System._
import org.apache.spark.Logging
@@ -49,7 +49,8 @@ object CommandUtils extends Logging {
val libraryOpts = getEnv("SPARK_LIBRARY_PATH", command)
.map(p => List("-Djava.library.path=" + p))
.getOrElse(Nil)
- val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil)
+ val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS"))
+ .map(Utils.splitCommandString).getOrElse(Nil)
val userOpts = getEnv("SPARK_JAVA_OPTS", command).map(Utils.splitCommandString).getOrElse(Nil)
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 6f6c101547c3c..a26e47950a0ec 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -45,4 +45,4 @@ object DriverWrapper {
System.exit(-1)
}
}
-}
\ No newline at end of file
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 18885d7ca6daa..2edd921066876 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -20,12 +20,11 @@ package org.apache.spark.deploy.worker
import java.io._
import akka.actor.ActorRef
-
import com.google.common.base.Charsets
import com.google.common.io.Files
import org.apache.spark.Logging
-import org.apache.spark.deploy.{ExecutorState, ApplicationDescription, Command}
+import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
old mode 100644
new mode 100755
index fbf2e0f30fde9..7b0b7861b76e1
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -31,7 +31,6 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
-import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{AkkaUtils, Utils}
@@ -325,6 +324,7 @@ private[spark] class Worker(
override def postStop() {
executors.values.foreach(_.kill())
+ drivers.values.foreach(_.kill())
webUi.stop()
metricsSystem.stop()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 3ed528e6b3773..d35d5be73ff97 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -17,9 +17,10 @@
package org.apache.spark.deploy.worker
-import org.apache.spark.util.{Utils, IntParam, MemoryParam}
import java.lang.management.ManagementFactory
+import org.apache.spark.util.{IntParam, MemoryParam, Utils}
+
/**
* Command-line parser for the master.
*/
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 1dc39c450ea16..530c147000904 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -69,4 +69,4 @@ private[spark] class WorkerWatcher(workerUrl: String) extends Actor
case e => logWarning(s"Received unexpected actor system event: $e")
}
-}
\ No newline at end of file
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 925c6fb1832d7..3089acffb8d98 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -84,7 +84,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
{runningExecutorTable}
-
+ // scalastyle:off
{if (hasDrivers)
@@ -113,7 +113,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
}
;
-
+ // scalastyle:on
UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format(
workerState.host, workerState.port))
}
@@ -133,10 +133,10 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
|
- stdout
- stderr
+ stdout
+ stderr
|
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 8daa47b2b2435..bdf126f93abc8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -18,11 +18,11 @@
package org.apache.spark.deploy.worker.ui
import java.io.File
-
import javax.servlet.http.HttpServletRequest
+
import org.eclipse.jetty.server.{Handler, Server}
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.Logging
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.ui.{JettyUtils, UIUtils}
import org.apache.spark.ui.JettyUtils._
@@ -56,7 +56,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
def start() {
try {
- val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+ val (srv, bPort) = JettyUtils.startJettyServer(host, port, handlers)
server = Some(srv)
boundPort = Some(bPort)
logInfo("Started Worker web UI at http://%s:%d".format(host, bPort))
@@ -187,7 +187,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
val logPageLength = math.min(byteLength, maxBytes)
- val endByte = math.min(startByte+logPageLength, logLength)
+ val endByte = math.min(startByte + logPageLength, logLength)
(startByte, endByte)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 45b43b403dd8c..0aae569b17272 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index f7efd74e1b043..989d666f15600 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -205,7 +205,7 @@ private[spark] class Executor(
}
attemptedTask = Some(task)
- logDebug("Task " + taskId +"'s epoch is " + task.epoch)
+ logDebug("Task " + taskId + "'s epoch is " + task.epoch)
env.mapOutputTracker.updateEpoch(task.epoch)
// Run the actual task and measure its runtime.
@@ -233,7 +233,8 @@ private[spark] class Executor(
val accumUpdates = Accumulators.values
- val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
+ val directResult = new DirectTaskResult(valueBytes, accumUpdates,
+ task.metrics.getOrElse(null))
val serializedDirectResult = ser.serialize(directResult)
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
val serializedResult = {
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
index ad7dd34c76940..3d34960653f5d 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
@@ -18,6 +18,7 @@
package org.apache.spark.executor
import java.nio.ByteBuffer
+
import org.apache.spark.TaskState.TaskState
/**
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
index e5c9bbbe2874e..210f3dbeebaca 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
@@ -50,10 +50,11 @@ object ExecutorExitCode {
"Failed to create local directory (bad spark.local.dir?)"
case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
- if (exitCode > 128)
+ if (exitCode > 128) {
" (died from signal " + (exitCode - 128) + "?)"
- else
+ } else {
""
+ }
)
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index 97176e4f5b727..127f5e90f3e1a 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -17,12 +17,11 @@
package org.apache.spark.executor
-import com.codahale.metrics.{Gauge, MetricRegistry}
+import scala.collection.JavaConversions._
+import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.hadoop.fs.FileSystem
-import scala.collection.JavaConversions._
-
import org.apache.spark.metrics.source.Source
class ExecutorSource(val executor: Executor, executorId: String) extends Source {
@@ -55,7 +54,8 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source
override def getValue: Int = executor.threadPool.getPoolSize()
})
- // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
+ // Gauge got executor thread pool's largest number of threads that have ever simultaneously
+ // been in th pool
metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
})
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index b56d8c99124df..6fc702fdb1512 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -20,8 +20,7 @@ package org.apache.spark.executor
import java.nio.ByteBuffer
import com.google.protobuf.ByteString
-
-import org.apache.mesos.{Executor => MesosExecutor, MesosExecutorDriver, MesosNativeLibrary, ExecutorDriver}
+import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary}
import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
import org.apache.spark.Logging
@@ -29,7 +28,6 @@ import org.apache.spark.TaskState
import org.apache.spark.TaskState.TaskState
import org.apache.spark.util.Utils
-
private[spark] class MesosExecutorBackend
extends MesosExecutor
with ExecutorBackend
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 0c8f4662a5f3a..455339943f42d 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -64,7 +64,8 @@ class TaskMetrics extends Serializable {
var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
/**
- * If this task writes to shuffle output, metrics on the written shuffle data will be collected here
+ * If this task writes to shuffle output, metrics on the written shuffle data will be collected
+ * here
*/
var shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None
}
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 59801773205bd..848b5c439bb5b 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -20,10 +20,9 @@ package org.apache.spark.io
import java.io.{InputStream, OutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
-import org.apache.spark.{SparkEnv, SparkConf}
+import org.apache.spark.SparkConf
/**
* CompressionCodec allows the customization of choosing different compression implementations
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index e54ac0b332093..6883a54494598 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -17,8 +17,8 @@
package org.apache.spark.metrics
+import java.io.{FileInputStream, InputStream}
import java.util.Properties
-import java.io.{File, FileInputStream, InputStream, IOException}
import scala.collection.mutable
import scala.util.matching.Regex
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 9930537b34db0..966c092124266 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -17,14 +17,14 @@
package org.apache.spark.metrics
-import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
-
import java.util.Properties
import java.util.concurrent.TimeUnit
import scala.collection.mutable
-import org.apache.spark.{SparkConf, Logging}
+import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
+
+import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.metrics.sink.{MetricsServlet, Sink}
import org.apache.spark.metrics.source.Source
@@ -56,7 +56,8 @@ import org.apache.spark.metrics.source.Source
* wild card "*" can be used to replace instance name, which means all the instances will have
* this property.
*
- * [sink|source] means this property belongs to source or sink. This field can only be source or sink.
+ * [sink|source] means this property belongs to source or sink. This field can only be
+ * source or sink.
*
* [name] specify the name of sink or source, it is custom defined.
*
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
index bce257d6e6f47..98fa1dbd7c6ab 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
@@ -17,11 +17,11 @@
package org.apache.spark.metrics.sink
-import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
-
import java.util.Properties
import java.util.concurrent.TimeUnit
+import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
+
import org.apache.spark.metrics.MetricsSystem
class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink {
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
index 3d1a06a395a72..40f64768e6885 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
@@ -17,12 +17,12 @@
package org.apache.spark.metrics.sink
-import com.codahale.metrics.{CsvReporter, MetricRegistry}
-
import java.io.File
import java.util.{Locale, Properties}
import java.util.concurrent.TimeUnit
+import com.codahale.metrics.{CsvReporter, MetricRegistry}
+
import org.apache.spark.metrics.MetricsSystem
class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink {
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
index b924907070eb9..410ca0704b5c4 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
@@ -20,8 +20,8 @@ package org.apache.spark.metrics.sink
import java.util.Properties
import java.util.concurrent.TimeUnit
-import com.codahale.metrics.ganglia.GangliaReporter
import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.ganglia.GangliaReporter
import info.ganglia.gmetric4j.gmetric.GMetric
import org.apache.spark.metrics.MetricsSystem
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
index cdcfec8ca785b..e09be001421fc 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
@@ -17,12 +17,12 @@
package org.apache.spark.metrics.sink
+import java.net.InetSocketAddress
import java.util.Properties
import java.util.concurrent.TimeUnit
-import java.net.InetSocketAddress
import com.codahale.metrics.MetricRegistry
-import com.codahale.metrics.graphite.{GraphiteReporter, Graphite}
+import com.codahale.metrics.graphite.{Graphite, GraphiteReporter}
import org.apache.spark.metrics.MetricsSystem
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
index 621d086d415cc..b5cf210af2119 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
@@ -17,10 +17,10 @@
package org.apache.spark.metrics.sink
-import com.codahale.metrics.{JmxReporter, MetricRegistry}
-
import java.util.Properties
+import com.codahale.metrics.{JmxReporter, MetricRegistry}
+
class JmxSink(val property: Properties, val registry: MetricRegistry) extends Sink {
val reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
index 99357fede6d06..3cdfe26d40f66 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
@@ -17,15 +17,13 @@
package org.apache.spark.metrics.sink
-import com.codahale.metrics.MetricRegistry
-import com.codahale.metrics.json.MetricsModule
-
-import com.fasterxml.jackson.databind.ObjectMapper
-
import java.util.Properties
import java.util.concurrent.TimeUnit
import javax.servlet.http.HttpServletRequest
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.json.MetricsModule
+import com.fasterxml.jackson.databind.ObjectMapper
import org.eclipse.jetty.server.Handler
import org.apache.spark.ui.JettyUtils
diff --git a/core/src/main/scala/org/apache/spark/network/BufferMessage.scala b/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
index fb4c65909a9e2..d3c09b16063d6 100644
--- a/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
+++ b/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
@@ -23,7 +23,6 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.storage.BlockManager
-
private[spark]
class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId: Int)
extends Message(Message.BUFFER_MESSAGE, id_) {
diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala
index cba8477ed5723..f2e3c1a14ecc6 100644
--- a/core/src/main/scala/org/apache/spark/network/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/Connection.scala
@@ -17,16 +17,13 @@
package org.apache.spark.network
-import org.apache.spark._
-
-import scala.collection.mutable.{HashMap, Queue, ArrayBuffer}
-
-import java.io._
+import java.net._
import java.nio._
import java.nio.channels._
-import java.nio.channels.spi._
-import java.net._
+import scala.collection.mutable.{ArrayBuffer, HashMap, Queue}
+
+import org.apache.spark._
private[spark]
abstract class Connection(val channel: SocketChannel, val selector: Selector,
@@ -211,7 +208,6 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}
return chunk
} else {
- /*logInfo("Finished sending [" + message + "] to [" + getRemoteConnectionManagerId() + "]")*/
message.finishTime = System.currentTimeMillis
logDebug("Finished sending [" + message + "] to [" + getRemoteConnectionManagerId() +
"] in " + message.timeTaken )
@@ -238,7 +234,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
message.startTime = System.currentTimeMillis
}
logTrace(
- "Sending chunk from [" + message+ "] to [" + getRemoteConnectionManagerId() + "]")
+ "Sending chunk from [" + message + "] to [" + getRemoteConnectionManagerId() + "]")
return chunk
} else {
message.finishTime = System.currentTimeMillis
@@ -349,8 +345,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
outbox.getChunk() match {
case Some(chunk) => {
val buffers = chunk.buffers
- // If we have 'seen' pending messages, then reset flag - since we handle that as normal
- // registering of event (below)
+ // If we have 'seen' pending messages, then reset flag - since we handle that as
+ // normal registering of event (below)
if (needForceReregister && buffers.exists(_.remaining() > 0)) resetForceReregister()
currentBuffers ++= buffers
}
@@ -404,7 +400,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}
} catch {
case e: Exception =>
- logError("Exception while reading SendingConnection to " + getRemoteConnectionManagerId(), e)
+ logError("Exception while reading SendingConnection to " + getRemoteConnectionManagerId(),
+ e)
callOnExceptionCallback(e)
close()
}
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index e6e01783c8895..3dd82bee0b5fd 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -17,24 +17,21 @@
package org.apache.spark.network
-import org.apache.spark._
-
+import java.net._
import java.nio._
import java.nio.channels._
import java.nio.channels.spi._
-import java.net._
import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor}
-import scala.collection.mutable.HashSet
+import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
import scala.collection.mutable.SynchronizedMap
import scala.collection.mutable.SynchronizedQueue
-import scala.collection.mutable.ArrayBuffer
-
-import scala.concurrent.{Await, Promise, ExecutionContext, Future}
-import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.concurrent.duration._
+import org.apache.spark._
import org.apache.spark.util.Utils
private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Logging {
@@ -65,7 +62,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]())
- // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : which should be executed asap
+ // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks :
+ // which should be executed asap
private val handleConnectExecutor = new ThreadPoolExecutor(
conf.getInt("spark.core.connection.connect.threads.min", 1),
conf.getInt("spark.core.connection.connect.threads.max", 8),
@@ -73,8 +71,10 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
new LinkedBlockingDeque[Runnable]())
private val serverChannel = ServerSocketChannel.open()
- private val connectionsByKey = new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
- private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
+ private val connectionsByKey =
+ new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
+ private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection]
+ with SynchronizedMap[ConnectionManagerId, SendingConnection]
private val messageStatuses = new HashMap[Int, MessageStatus]
private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
private val registerRequests = new SynchronizedQueue[SendingConnection]
@@ -173,7 +173,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
if (conn == null) return
// prevent other events from being triggered
- // Since we are still trying to connect, we do not need to do the additional steps in triggerWrite
+ // Since we are still trying to connect, we do not need to do the additional steps in
+ // triggerWrite
conn.changeConnectionKeyInterest(0)
handleConnectExecutor.execute(new Runnable {
@@ -188,8 +189,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
}
// fallback to previous behavior : we should not really come here since this method was
- // triggered since channel became connectable : but at times, the first finishConnect need not
- // succeed : hence the loop to retry a few 'times'.
+ // triggered since channel became connectable : but at times, the first finishConnect need
+ // not succeed : hence the loop to retry a few 'times'.
conn.finishConnect(true)
}
} )
@@ -258,8 +259,9 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
}
- logTrace("Changed key for connection to [" + connection.getRemoteConnectionManagerId() +
- "] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
+ logTrace("Changed key for connection to [" +
+ connection.getRemoteConnectionManagerId() + "] changed from [" +
+ intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
}
}
} else {
@@ -282,7 +284,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
try {
selector.select()
} catch {
- // Explicitly only dealing with CancelledKeyException here since other exceptions should be dealt with differently.
+ // Explicitly only dealing with CancelledKeyException here since other exceptions
+ // should be dealt with differently.
case e: CancelledKeyException => {
// Some keys within the selectors list are invalid/closed. clear them.
val allKeys = selector.keys().iterator()
@@ -310,7 +313,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
}
if (selectedKeysCount == 0) {
- logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys")
+ logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size +
+ " keys")
}
if (selectorThread.isInterrupted) {
logInfo("Selector thread was interrupted!")
@@ -341,7 +345,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
throw new CancelledKeyException()
}
} catch {
- // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException.
+ // weird, but we saw this happening - even though key.isValid was true,
+ // key.isAcceptable would throw CancelledKeyException.
case e: CancelledKeyException => {
logInfo("key already cancelled ? " + key, e)
triggerForceCloseByException(key, e)
@@ -458,7 +463,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
}
def handleConnectionError(connection: Connection, e: Exception) {
- logInfo("Handling connection error on connection to " + connection.getRemoteConnectionManagerId())
+ logInfo("Handling connection error on connection to " +
+ connection.getRemoteConnectionManagerId())
removeConnection(connection)
}
@@ -495,7 +501,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
status
}
case None => {
- throw new Exception("Could not find reference for received ack message " + message.id)
+ throw new Exception("Could not find reference for received ack message " +
+ message.id)
null
}
}
@@ -517,7 +524,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
if (ackMessage.isDefined) {
if (!ackMessage.get.isInstanceOf[BufferMessage]) {
- logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type " + ackMessage.get.getClass())
+ logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type "
+ + ackMessage.get.getClass())
} else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) {
logDebug("Response to " + bufferMessage + " does not have ack id set")
ackMessage.get.asInstanceOf[BufferMessage].ackId = bufferMessage.id
@@ -535,14 +543,16 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
private def sendMessage(connectionManagerId: ConnectionManagerId, message: Message) {
def startNewConnection(): SendingConnection = {
- val inetSocketAddress = new InetSocketAddress(connectionManagerId.host, connectionManagerId.port)
+ val inetSocketAddress = new InetSocketAddress(connectionManagerId.host,
+ connectionManagerId.port)
val newConnection = new SendingConnection(inetSocketAddress, selector, connectionManagerId)
registerRequests.enqueue(newConnection)
newConnection
}
- // I removed the lookupKey stuff as part of merge ... should I re-add it ? We did not find it useful in our test-env ...
- // If we do re-add it, we should consistently use it everywhere I guess ?
+ // I removed the lookupKey stuff as part of merge ... should I re-add it ? We did not find it
+ // useful in our test-env ... If we do re-add it, we should consistently use it everywhere I
+ // guess ?
val connection = connectionsById.getOrElseUpdate(connectionManagerId, startNewConnection())
message.senderAddress = id.toSocketAddress()
logDebug("Sending [" + message + "] to [" + connectionManagerId + "]")
@@ -558,7 +568,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
: Future[Option[Message]] = {
val promise = Promise[Option[Message]]
- val status = new MessageStatus(message, connectionManagerId, s => promise.success(s.ackMessage))
+ val status = new MessageStatus(
+ message, connectionManagerId, s => promise.success(s.ackMessage))
messageStatuses.synchronized {
messageStatuses += ((message.id, status))
}
@@ -566,7 +577,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
promise.future
}
- def sendMessageReliablySync(connectionManagerId: ConnectionManagerId, message: Message): Option[Message] = {
+ def sendMessageReliablySync(connectionManagerId: ConnectionManagerId,
+ message: Message): Option[Message] = {
Await.result(sendMessageReliably(connectionManagerId, message), Duration.Inf)
}
@@ -656,7 +668,8 @@ private[spark] object ConnectionManager {
val tput = mb * 1000.0 / ms
println("--------------------------")
println("Started at " + startTime + ", finished at " + finishTime)
- println("Sent " + count + " messages of size " + size + " in " + ms + " ms (" + tput + " MB/s)")
+ println("Sent " + count + " messages of size " + size + " in " + ms + " ms " +
+ "(" + tput + " MB/s)")
println("--------------------------")
println()
}
@@ -667,7 +680,11 @@ private[spark] object ConnectionManager {
println("--------------------------")
val size = 10 * 1024 * 1024
val count = 10
- val buffers = Array.tabulate(count)(i => ByteBuffer.allocate(size * (i + 1)).put(Array.tabulate[Byte](size * (i + 1))(x => x.toByte)))
+ val buffers = Array.tabulate(count) { i =>
+ val bufferLen = size * (i + 1)
+ val bufferContent = Array.tabulate[Byte](bufferLen)(x => x.toByte)
+ ByteBuffer.allocate(bufferLen).put(bufferContent)
+ }
buffers.foreach(_.flip)
val mb = buffers.map(_.remaining).reduceLeft(_ + _) / 1024.0 / 1024.0
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
index 50dd9bc2d101f..b82edb6850d23 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
@@ -21,7 +21,6 @@ import java.net.InetSocketAddress
import org.apache.spark.util.Utils
-
private[spark] case class ConnectionManagerId(host: String, port: Int) {
// DEBUG code
Utils.checkHost(host)
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
index 4f5742d29b367..35f64134b073a 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
@@ -17,27 +17,24 @@
package org.apache.spark.network
-import org.apache.spark._
-import org.apache.spark.SparkContext._
-
-import scala.io.Source
-
import java.nio.ByteBuffer
-import java.net.InetAddress
import scala.concurrent.Await
import scala.concurrent.duration._
+import scala.io.Source
+
+import org.apache.spark._
private[spark] object ConnectionManagerTest extends Logging{
def main(args: Array[String]) {
- //