Skip to content

Commit

Permalink
[SPARK-8743] [STREAMING] Deregister Codahale metrics for streaming wh…
Browse files Browse the repository at this point in the history
…en StreamingContext is closed

The issue link: https://issues.apache.org/jira/browse/SPARK-8743
Deregister Codahale metrics for streaming when StreamingContext is closed

Design:
Adding the method calls in the appropriate start() and stop () methods for the StreamingContext

Actions in the PullRequest:
1) Added the registerSource method call to the start method for the Streaming Context.
2) Added the removeSource method to the stop method.
3) Added comments for both 1 and 2 and comment to show initialization of the StreamingSource
4) Added a test case to check for both registration and de-registration of metrics

Previous closed PR for reference: apache#7250

Author: Neelesh Srinivas Salian <[email protected]>

Closes apache#7362 from nssalian/branch-SPARK-8743 and squashes the following commits:

7d998a3 [Neelesh Srinivas Salian] Removed the Thread.sleep() call
8b26397 [Neelesh Srinivas Salian] Moved the scalatest.{} import
0e8007a [Neelesh Srinivas Salian] moved import org.apache.spark{} to correct place
daedaa5 [Neelesh Srinivas Salian] Corrected Ordering of imports
8873180 [Neelesh Srinivas Salian] Removed redundancy in imports
59227a4 [Neelesh Srinivas Salian] Changed the ordering of the imports to classify  scala and spark imports
d8cb577 [Neelesh Srinivas Salian] Added registerSource to start() and removeSource to stop(). Wrote a test to check the registration and de-registration
  • Loading branch information
Neelesh Srinivas Salian authored and tdas committed Jul 13, 2015
1 parent 0aed38e commit b7bcbe2
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,8 @@ class StreamingContext private[streaming] (
None
}

/** Register streaming source to metrics system */
/* Initializing a streamingSource to register metrics */
private val streamingSource = new StreamingSource(this)
assert(env != null)
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)

private var state: StreamingContextState = INITIALIZED

Expand Down Expand Up @@ -606,6 +603,9 @@ class StreamingContext private[streaming] (
}
shutdownHookRef = Utils.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
Expand Down Expand Up @@ -682,6 +682,8 @@ class StreamingContext private[streaming] (
logWarning("StreamingContext has already been stopped")
case ACTIVE =>
scheduler.stop(stopGracefully)
// Removing the streamingSource to de-register the metrics on stop()
env.metricsSystem.removeSource(streamingSource)
uiTab.foreach(_.detach())
StreamingContext.setActiveContext(null)
waiter.notifyStop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,23 @@ package org.apache.spark.streaming
import java.io.{File, NotSerializableException}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Queue

import org.apache.commons.io.FileUtils
import org.scalatest.{Assertions, BeforeAndAfter, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts
import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.SpanSugar._
import org.scalatest.{Assertions, BeforeAndAfter}

import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.Source
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, SparkFunSuite}


class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging {
Expand Down Expand Up @@ -299,6 +302,25 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
Thread.sleep(100)
}

test ("registering and de-registering of streamingSource") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
ssc = new StreamingContext(conf, batchDuration)
assert(ssc.getState() === StreamingContextState.INITIALIZED)
addInputStream(ssc).register()
ssc.start()

val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem)
val streamingSource = StreamingContextSuite.getStreamingSource(ssc)
assert(sources.contains(streamingSource))
assert(ssc.getState() === StreamingContextState.ACTIVE)

ssc.stop()
val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem)
val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc)
assert(ssc.getState() === StreamingContextState.STOPPED)
assert(!sourcesAfterStop.contains(streamingSourceAfterStop))
}

test("awaitTermination") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
Expand Down Expand Up @@ -811,3 +833,18 @@ package object testPackage extends Assertions {
}
}
}

/**
* Helper methods for testing StreamingContextSuite
* This includes methods to access private methods and fields in StreamingContext and MetricsSystem
*/
private object StreamingContextSuite extends PrivateMethodTester {
private val _sources = PrivateMethod[ArrayBuffer[Source]]('sources)
private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = {
metricsSystem.invokePrivate(_sources())
}
private val _streamingSource = PrivateMethod[StreamingSource]('streamingSource)
private def getStreamingSource(streamingContext: StreamingContext): StreamingSource = {
streamingContext.invokePrivate(_streamingSource())
}
}

0 comments on commit b7bcbe2

Please sign in to comment.