-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-8743] [Streaming]: Deregister Codahale metrics for streaming when StreamingContext is closed #7250
Conversation
…for the Streaming Context. Added the removeSource method to the stop method. Added comments for both
@@ -577,6 +575,12 @@ class StreamingContext private[streaming] ( | |||
* @throws IllegalStateException if the StreamingContext is already stopped. | |||
*/ | |||
def start(): Unit = synchronized { | |||
/** | |||
* Registering Streaming Metrics at the start of the StreamingContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better to use //
rather than /** */
for internal code commenting.
This change LGTM, actually will get rid of some annoying logs :). |
@jerryshao thank you for the comment. I made the changes. |
Jenkins, test this please |
Test build #36660 has finished for PR 7250 at commit
|
@@ -577,6 +575,10 @@ class StreamingContext private[streaming] ( | |||
* @throws IllegalStateException if the StreamingContext is already stopped. | |||
*/ | |||
def start(): Unit = synchronized { | |||
//Registering Streaming Metrics at the start of the StreamingContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My guess is this fails style checks. Indent it, and add a space
@srowen, I made the changes and ran a dev test on my repo. The scala errors weren't present. |
@nssalian that is indeed the failure: "Insert a space after the start of the comment" Did you run |
nsalian-MBP:spark nsalian$ ./dev/scalastyle Scalastyle checks passed. |
Should have phrased it better, the tests ran fine. |
@@ -688,6 +690,8 @@ class StreamingContext private[streaming] ( | |||
} finally { | |||
// The state should always be Stopped after calling `stop()`, even if we haven't started yet | |||
state = STOPPED | |||
// De-registering Streaming Metrics of the StreamingContext | |||
env.metricsSystem.removeSource(streamingSource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think should be done try, after calling scheduler.stop(). In the finally this will fail, if the SparkContext has been stopped and the env is invalid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to add try as TD said, otherwise you may meet exception when SparkContext is stopped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it on my local. Added to the latest commit.
You need to add unit tests for this. Please add them to the StreamingContextSuite. You can probably add it to existing unit tests that test start and stop. |
Test build #1004 has finished for PR 7250 at commit
|
Will update shortly with the changes. Thank you. |
…e of the sources ArrayBuffer
To illustrate,
Would like some feedback on this approach. Thank you. |
@srowen, I wanted to check whether the size was decremented at all. Couldn't think of a way to assert that the source has been removed since the sources ArrayBuffer is still private. |
But this line doesn't check the state afterwards. You're reading the same value twice; x == x - 1 can never be true. (There's an extra space at the end of that line too.) I think there's still an outstanding comment from TD here too. |
…compare with the original size after removal
Based on @srowen's comment, I made the change and added updatedSourcesSize to check the ArrayBuffer size after the source is remove to assert that the size was indeed decremented. |
@@ -73,7 +73,7 @@ private[spark] class MetricsSystem private ( | |||
private[this] val metricsConfig = new MetricsConfig(conf) | |||
|
|||
private val sinks = new mutable.ArrayBuffer[Sink] | |||
private val sources = new mutable.ArrayBuffer[Source] | |||
val sources = new mutable.ArrayBuffer[Source] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better not to loose the restriction to expose this to the public user just only for test convenience. you could refer here to see how to test with private field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jerryshao will add a test similarly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nssalian This has not been addressed. This should not be public and see the link to see how to keep it private and yet access it in tests.
@@ -674,6 +676,8 @@ class StreamingContext private[streaming] ( | |||
logWarning("StreamingContext has already been stopped") | |||
case ACTIVE => | |||
scheduler.stop(stopGracefully) | |||
// De-registering Streaming Metrics of the StreamingContext | |||
env.metricsSystem.removeSource(streamingSource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here remove the source in ACTIVE
state may introduce some corner case problem. For example, since metrics source is added when StreamingContext's state is INITIALIZED
, if we met exception at this point, we will never change the state into ACTIVE
, so metrics source cannot be removed, since you only assume the state is ACTIVE
to remove the source. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was to register at the call of the start()
.
So, based on your comment, that would mean registering the sources after the state is set to INITIALIZED
and before.
def start(): Unit = synchronized { // Registering Streaming Metrics at the start of the StreamingContext assert(env != null) assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource)
Makes sense to have it after INITIALIZED
and before the synchronized block of ACTIVE
and STOPPED
.
@tdas, could add more light.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is to move this line out of ACTIVE
condition, In any case you have to deregister the metrics source, not only in ACTIVE
state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao In this PR the streaming metric source is added only after starting the context. So it is fine to deregister only when the state is ACTIVE. Also that maintain the characteristic that stop
is idempotent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is OK for normal case, but what if an exception is met after metrics is successfully registered, but before changing the state into ACTIVE
, according to the code, it will try the exception and change state into STOPPED
, so at that situation, if we call stop(), we will never de-register the metrics source according to the current implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good point. I suggested above that the registering be moved after the ACTIVE status change.
|
||
ssc.start() | ||
assert(ssc.getState() === StreamingContextState.INITIALIZED) | ||
assert(StreamingContextSuite.sources.get(StreamingContextSuite.streamingSource)!= "null") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont get this statement here. You are calling StreamingContext.sources.get() on an object of type java.lang.reflect.Field
(that is, StreamingContextSuite.streamingSource) rather than the actual StreamingContext object ssc
above. And then you are comparing that it is not the string "null"? I am not sure what is being tested here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking to see if the source is not returning a null and is actually present in the sources ArrayBuffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A much nicer way to do this is to do it they way it is done in https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala#L782
using the PrivateTester stuff. Please use that, we want to stay consistent to the rest of the code base, and the code is easier to read and understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I'll write that up. Was having problems when I initially wrote as it was in ExecutionManagerSuite. I'll figure out something. Hence went with reflection
Thank you.
… check for Sources containing or not containing StreamingSource
@tdas, I added the new test as per PrivateMethodTester. |
|
||
import org.apache.spark.storage.StorageLevel | ||
import org.apache.spark.streaming.dstream.DStream | ||
import org.apache.spark.streaming.receiver.Receiver | ||
import org.apache.spark.util.Utils | ||
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, SparkFunSuite} | ||
|
||
import org.apache.spark._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't make this a _ import. Leave it as it was. ALso I think the collection imports become unnecessary; see below
Hitting this error on the test for StreamingContextSuite. Ran this: Test failure message: ArrayBuffer(org.apache.spark.scheduler.DAGSchedulerSource@1473d83a, org.apache.spark.storage.BlockManagerSource@7560392b) did not contain org.apache.spark.streaming.StreamingSource@1c71ddd7 (StreamingContextSuite.scala:322) |
@@ -21,17 +21,22 @@ import java.io.{File, NotSerializableException} | |||
import java.util.concurrent.atomic.AtomicInteger | |||
|
|||
import org.apache.commons.io.FileUtils | |||
import org.apache.spark.metrics.MetricsSystem | |||
import org.apache.spark.metrics.source.Source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These need to be with other spark imports, and the spark imports shouldn't have a blank line in the middle.
The test failed with the assert(ssc.getState() === StreamingContextState.INITIALIZED) @srowen, I've added the changes as you suggested. |
That can't be the failure you're referring to above, right? looks clearly like the assertion about which sources are still in the registered set. Anyway, the assertion looks better now. You need to rebase this PR now, and two of the imports are still in the wrong place. |
…for the Streaming Context. Added the removeSource method to the stop method. Added comments for both Added // instead of /** for commenting in code Added indentation and Space at the comment on line 578; Registering.. Added test case for de-register metrics and made a change to the scope of the sources ArrayBuffer Added additional variable to check the updated Sources size value to compare with the original size after removal Added the removeSource method in try Removed the assert for the env field, added the registerSource line in the INITIALIZED block and kept the removeSource() in the ACTIVE block Added test to check registering and de-registering of streamingSource Removed unused imports Moved the registerSource() call before line 601 Changed scope of sources and corrected comments for helper Removed extra line in Helper Methods section Added helper method for private methods and changed the test logic to check for Sources containing or not containing StreamingSource Changed import statements to remove unnecessary imports and add specific imports Removed types for fields in test for registering and deregistering metrics Changed imports statements, negated test statement and removed postfix Removed added comment to Assert for INITIALIZED state Removing the INITIALIZED check since after start() the state moves to ACTIVE and this check fails Move the INITIALIZED state check to when the ssc is initialized
Right. The negation took care of the failure above and I fixed the assertion error as well. Made a unified commit. 299a57d |
Right. The negation took care of the failure above and I fixed the assertion error as well. Made a unified commit. 299a57d |
Closing this PR: for #7362 |
@nssalian you can make a new PR, yes. I usually like to stick to one PR to preserve the discussion in one place, but it's not a huge deal. You don't need to squash your commit history like you might in PRs for other projects, since the merge process will take care of that. Still, you can squash some of your commits in your PR branch and force-push if you like for clarity, though you don't need to. After a rebase to resolve merge conflicts you can simply force-push your PR branch, note. |
…en StreamingContext is closed The issue link: https://issues.apache.org/jira/browse/SPARK-8743 Deregister Codahale metrics for streaming when StreamingContext is closed Design: Adding the method calls in the appropriate start() and stop () methods for the StreamingContext Actions in the PullRequest: 1) Added the registerSource method call to the start method for the Streaming Context. 2) Added the removeSource method to the stop method. 3) Added comments for both 1 and 2 and comment to show initialization of the StreamingSource 4) Added a test case to check for both registration and de-registration of metrics Previous closed PR for reference: #7250 Author: Neelesh Srinivas Salian <[email protected]> Closes #7362 from nssalian/branch-SPARK-8743 and squashes the following commits: 7d998a3 [Neelesh Srinivas Salian] Removed the Thread.sleep() call 8b26397 [Neelesh Srinivas Salian] Moved the scalatest.{} import 0e8007a [Neelesh Srinivas Salian] moved import org.apache.spark{} to correct place daedaa5 [Neelesh Srinivas Salian] Corrected Ordering of imports 8873180 [Neelesh Srinivas Salian] Removed redundancy in imports 59227a4 [Neelesh Srinivas Salian] Changed the ordering of the imports to classify scala and spark imports d8cb577 [Neelesh Srinivas Salian] Added registerSource to start() and removeSource to stop(). Wrote a test to check the registration and de-registration
…en StreamingContext is closed The issue link: https://issues.apache.org/jira/browse/SPARK-8743 Deregister Codahale metrics for streaming when StreamingContext is closed Design: Adding the method calls in the appropriate start() and stop () methods for the StreamingContext Actions in the PullRequest: 1) Added the registerSource method call to the start method for the Streaming Context. 2) Added the removeSource method to the stop method. 3) Added comments for both 1 and 2 and comment to show initialization of the StreamingSource 4) Added a test case to check for both registration and de-registration of metrics Previous closed PR for reference: #7250 Author: Neelesh Srinivas Salian <[email protected]> Closes #7362 from nssalian/branch-SPARK-8743 and squashes the following commits: 7d998a3 [Neelesh Srinivas Salian] Removed the Thread.sleep() call 8b26397 [Neelesh Srinivas Salian] Moved the scalatest.{} import 0e8007a [Neelesh Srinivas Salian] moved import org.apache.spark{} to correct place daedaa5 [Neelesh Srinivas Salian] Corrected Ordering of imports 8873180 [Neelesh Srinivas Salian] Removed redundancy in imports 59227a4 [Neelesh Srinivas Salian] Changed the ordering of the imports to classify scala and spark imports d8cb577 [Neelesh Srinivas Salian] Added registerSource to start() and removeSource to stop(). Wrote a test to check the registration and de-registration (cherry picked from commit b7bcbe2) Signed-off-by: Tathagata Das <[email protected]>
The issue link: https://issues.apache.org/jira/browse/SPARK-8743
Deregister Codahale metrics for streaming when StreamingContext is closed
Design:
Adding the method calls in the appropriate start() and stop () methods for the StreamingContext
Actions in the PullRequest:
Requesting Review.