-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-3276 Added a new configuration spark.streaming.minRememberDuration #5438
Conversation
…ember duration, with a default value of 1 minute.
*/ | ||
private val MIN_REMEMBER_DURATION = Minutes(1) | ||
private val minRememberDuration = new SparkConf().get("spark.streaming.minRememberDuration", "1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note https://issues.apache.org/jira/browse/SPARK-5931 here which would update time properties to be specifiable like "3s". It isn't updating property names, but we could update a new one. I suggest this property name include the units.
It looks like it's minutes here, so spark.streaming.minRememberDurationMin
. Can it be seconds to allow finer granularity or is that unreasonable?
Why two fields here? just one minRememberDurationMin
(or Sec
) is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen changing the property name to spark.streaming.minRememberDurationMin
makes sense with regard to consistency in naming. I will also get rid of of two fields. I'll open another pull request that reflects those changes.
(You can just push to this PR rather than close it.) |
… reduced number of fields. * Changed the property name to spark.streaming.minRememberDurationMin to reflect the unit of value (minutes). * Deleted the constant MIN_REMEMBER_DURATION, because now minRememberDurationMin is serving the same purpose.
@srowen I've pushed to my branch SPARK-3276 again. Is it acceptable now? |
*/ | ||
private val MIN_REMEMBER_DURATION = Minutes(1) | ||
private val minRememberDurationMin = Minutes(new SparkConf() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This much looks better, though now I also see that we need a real SparkConf
here somehow. I don't think this is a global, static property of the object anymore. I think this would have to live in the FileInputDStream
class now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen I'm a little confused. Do you mean something like: "inside private[streaming] class FileInputDStream[K, V, F <: NewInputFormat[K,V]]
class create a private val conf = new SparkConf()
and then use that conf
inside private[streaming] object FileInputDStream
"?
Or something totally different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC I think what @srowen mentions is to put minRememberDurationMin
related code into class FileInputDStream
, rather than in the object. By putting this into the class FileInputDStream
, you could use conf from StreamingContext, no need to create a new one again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the comment of @jerryshao I prepared another commit, moving the minRememberDurationMin
from the companion object to the class. @srowen does it look better now? Anything else to be done?
This approach looks like affecting all the file source in one project, may be have some cases that parts of the file stream needs to include some old files, but others do not. Is a parameter more suitable for this? |
* Moved the minRememberDurationMin to the class so that it can use the existing Spark context * Refactored calculateNumBatchesToRemember to take minRememberDurationMin as a parameter
Based on the comment of @jerryshao I prepared another commit, moving the |
*/ | ||
private val minRememberDurationMin = Minutes(ssc.sparkContext.getConf | ||
.get("spark.streaming.minRememberDurationMin", "1") | ||
.toLong) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can use ssc.conf
rather than ssc.sparkContext.getConf
, also SparkConf has a API getLong
which can be directly used.
…nf, and also getLong method directly.
Based on the comment of @jerryshao , the code is now using |
* Files with mod times older than this "window" of remembering will be ignored. So if new | ||
* files are visible within this window, then the file will get selected in the next batch. | ||
*/ | ||
private val minRememberDurationMin = Minutes(ssc.conf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The continuation indent looks a bit off here. Maybe just start the next line with Minutes(...
.
I think we might want to put this change on hold until SPARK-5931 is resolved, since it will give some proper syntax for time-based properties that you can use here.
@srowen fair enough. It looks like if SPARK-5931 is resolved, I can go back to using Do you foresee any other changes? E.g., will there be a new method to retrieve the value of the duration parameter such as |
Yes, I think you can change the prop name. I don't see a "minutes" designator, but might as well use seconds. I can sort of imagine wanting to specify a value less than 60 seconds, so the extra granularity might be beneficial. I don't think there are any special methods being added for specific properties. |
…rDuration * switched back to using spark.streaming.minRememberDuration * renamed minRememberDurationMin to minRememberDuration
@srowen I've switched back to
if these final changes are OK, then I'll be waiting for SPARK-5931 to be resolved. |
@srowen I see that SPARK-5931 is marked as CLOSED (Fixed), and the associated Pull Request has been closed. How does that affect this pull request of mine? Anything else to be done? |
Yes, have a look at the changes. You can and should now create a property with a value like "60s" like the new properties in that PR. |
* Switched to using newly added getTimeAsSeconds method (see apache#5236) * Renamed minRememberDuration to minRememberDurationS to be compatible with the examples in the pull request above.
@srowen I prepared another commit in which I'm now using the newly added |
That looks right. Still probably needs a nod from @tdas as it kinda adds a new API (config param) |
@srowen thanks for checking. @tdas does it look OK to you, too? One thing I've realized is that I did not touch streaming programming guide, and I think I should document this. But after checking https://spark.apache.org/docs/latest/streaming-programming-guide.html, I could not be sure where to document this property. Where is the most appropriate place for documenting |
I suspect this is OK. I wonder if we should simply leave this undocumented for now as it's somewhat obscure, rather than completely commit to it as an API. That might make this easier to merge. |
ok to test |
Oh, darn it. I merged this since I though this one had passed tests, but on looking back at this one, it has not yet. I will keep a close eye on this and make sure it actually does, and if not, will revert it right away. |
Jenkins, test this please |
So, I think I got away with this one. I tested it locally and it passes, and I see that subsequent pull requests in Jenkins succeed, and ones that failed did so for unrelated reasons. So this does actually pass tests. Of course it was an unintentional mistake to proceed before seeing the Jenkins result on this one explicitly, but no actual harm done in this case. |
SPARK-3276 Added a new configuration parameter ``spark.streaming.minRememberDuration``, with a default value of 1 minute. So that when a Spark Streaming application is started, an arbitrary number of minutes can be taken as threshold for remembering. Author: emres <[email protected]> Closes apache#5438 from emres/SPARK-3276 and squashes the following commits: 766f938 [emres] SPARK-3276 Switched to using newly added getTimeAsSeconds method. affee1d [emres] SPARK-3276 Changed the property name and variable name for minRememberDuration c9d58ca [emres] SPARK-3276 Minor code re-formatting. 1c53ba9 [emres] SPARK-3276 Started to use ssc.conf rather than ssc.sparkContext.getConf, and also getLong method directly. bfe0acb [emres] SPARK-3276 Moved the minRememberDurationMin to the class daccc82 [emres] SPARK-3276 Changed the property name to reflect the unit of value and reduced number of fields. 43cc1ce [emres] SPARK-3276 Added a new configuration parameter spark.streaming.minRemember duration, with a default value of 1 minute.
@srowen Another reason why this was premature merge was the configuration |
Maybe so -- worth changing now? |
Yes, I am a partial culprit in this matter, as I was not able to give the On Wed, Jul 15, 2015 at 12:52 AM, Sean Owen [email protected]
|
SPARK-3276 Added a new configuration parameter
spark.streaming.minRememberDuration
, with a default value of 1 minute.So that when a Spark Streaming application is started, an arbitrary number of minutes can be taken as threshold for remembering.