Skip to content

Commit

Permalink
SPARK-1497. Fix scalastyle warnings in YARN, Hive code
Browse files Browse the repository at this point in the history
(I wasn't sure how to automatically set `SPARK_YARN=true` and `SPARK_HIVE=true` when running scalastyle, but these are the errors that turn up.)

Author: Sean Owen <[email protected]>

Closes #413 from srowen/SPARK-1497 and squashes the following commits:

f0c9318 [Sean Owen] Fix more scalastyle warnings in yarn
80bf4c3 [Sean Owen] Add YARN alpha / YARN profile to scalastyle check
026319c [Sean Owen] Fix scalastyle warnings in YARN, Hive code
(cherry picked from commit 77f8367)

Signed-off-by: Patrick Wendell <[email protected]>
  • Loading branch information
srowen authored and pwendell committed Apr 16, 2014
1 parent 8efec04 commit c744d66
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 20 deletions.
4 changes: 4 additions & 0 deletions dev/scalastyle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#

echo -e "q\n" | sbt/sbt clean scalastyle > scalastyle.txt
# Check style with YARN alpha built too
SPARK_YARN=true sbt/sbt yarn/scalastyle >> scalastyle.txt
# Check style with YARN built too
SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt yarn/scalastyle >> scalastyle.txt
ERRORS=$(cat scalastyle.txt | grep -e "\<error\>")
if test ! -z "$ERRORS"; then
echo -e "Scalastyle checks failed at following occurrences:\n$ERRORS"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import org.apache.spark.scheduler.SplitInfo
class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
extends Logging {

def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
this(args, new Configuration(), sparkConf)

def this(args: ApplicationMasterArguments) = this(args, new SparkConf())

Expand All @@ -63,7 +64,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
override def preStart() {
logInfo("Listen to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
// Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
// Send a hello message thus the connection is actually established, thus we can
// monitor Lifecycle Events.
driver ! "Hello"
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
Expand Down Expand Up @@ -104,8 +106,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// Allocate all containers
allocateExecutors()

// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
// Launch a progress reporter thread, else app will get killed after expiration
// (def: 10mins) timeout ensure that progress is sent before
// YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.

val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
// we want to be reasonably responsive without causing too many requests to RM.
Expand Down Expand Up @@ -163,8 +166,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
.asInstanceOf[RegisterApplicationMasterRequest]
appMasterRequest.setApplicationAttemptId(appAttemptId)
// Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
// Users can then monitor stderr/stdout on that node if required.
// Setting this to master host,port - so that the ApplicationReport at client has
// some sensible info. Users can then monitor stderr/stdout on that node if required.
appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0)
// What do we provide here ? Might make sense to expose something sensible later ?
Expand Down Expand Up @@ -213,7 +216,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
yarnAllocator.allocateContainers(math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
Thread.sleep(100)
}

Expand All @@ -230,7 +234,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
while (!driverClosed) {
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
if (missingExecutorCount > 0) {
logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers")
logInfo("Allocating " + missingExecutorCount +
" containers to make up for (potentially ?) lost containers")
yarnAllocator.allocateContainers(missingExecutorCount)
}
else sendProgress()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ private[yarn] class YarnAllocationHandler(
val executorHostname = container.getNodeId.getHost
val containerId = container.getId

assert(
container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
assert( container.getResource.getMemory >=
(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))

if (numExecutorsRunningNow > maxExecutors) {
logInfo("""Ignoring container %s at host %s, since we already have the required number of
Expand Down Expand Up @@ -393,9 +393,10 @@ private[yarn] class YarnAllocationHandler(

// default.
if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty)
resourceRequests = List(
createResourceRequest(AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
logDebug("numExecutors: " + numExecutors + ", host preferences: " +
preferredHostToCount.isEmpty)
resourceRequests = List(createResourceRequest(
AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
}
else {
// request for all hosts in preferred nodes and for numExecutors -
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)

val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
System.setProperty(
"spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
}

/** Get the Yarn approved local directories. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
override def preStart() {
logInfo("Listen to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
// Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
// Send a hello message thus the connection is actually established,
// thus we can monitor Lifecycle Events.
driver ! "Hello"
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
Expand Down Expand Up @@ -95,8 +96,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// Allocate all containers
allocateExecutors()

// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
// Launch a progress reporter thread, else app will get killed after expiration
// (def: 10mins) timeout ensure that progress is sent before
// YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.

val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
// we want to be reasonably responsive without causing too many requests to RM.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ private[yarn] class YarnAllocationHandler(
allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
}
}
logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(driverUrl, executorHostname))
logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
driverUrl, executorHostname))
val executorRunnable = new ExecutorRunnable(
container,
conf,
Expand Down Expand Up @@ -314,8 +315,8 @@ private[yarn] class YarnAllocationHandler(
// `pendingReleaseContainers`.
pendingReleaseContainers.remove(containerId)
} else {
// Decrement the number of executors running. The next iteration of the ApplicationMaster's
// reporting thread will take care of allocating.
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s (state: %s, exit status: %s)".format(
containerId,
Expand Down

0 comments on commit c744d66

Please sign in to comment.