Skip to content

Commit

Permalink
SPARK-3276 Switched to using newly added getTimeAsSeconds method.
Browse files Browse the repository at this point in the history
* Switched to using newly added getTimeAsSeconds method (see #5236)
* Renamed minRememberDuration to minRememberDurationS to be compatible with the examples in the pull request above.
  • Loading branch information
emres committed Apr 14, 2015
1 parent affee1d commit 766f938
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
* Files with mod times older than this "window" of remembering will be ignored. So if new
* files are visible within this window, then the file will get selected in the next batch.
*/
private val minRememberDuration =
Seconds(ssc.conf.getLong("spark.streaming.minRememberDuration", 60L))
private val minRememberDurationS =
Seconds(ssc.conf.getTimeAsSeconds("spark.streaming.minRememberDuration", "60s"))

// This is a def so that it works during checkpoint recovery:
private def clock = ssc.scheduler.clock
Expand All @@ -105,7 +105,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
* selected and processed.
*/
private val numBatchesToRemember = FileInputDStream
.calculateNumBatchesToRemember(slideDuration, minRememberDuration)
.calculateNumBatchesToRemember(slideDuration, minRememberDurationS)
private val durationToRemember = slideDuration * numBatchesToRemember
remember(durationToRemember)

Expand Down Expand Up @@ -344,10 +344,10 @@ object FileInputDStream {

/**
* Calculate the number of last batches to remember, such that all the files selected in
* at least last minRememberDuration duration can be remembered.
* at least last minRememberDurationS duration can be remembered.
*/
def calculateNumBatchesToRemember(batchDuration: Duration,
minRememberDuration: Duration): Int = {
math.ceil(minRememberDuration.milliseconds.toDouble / batchDuration.milliseconds).toInt
minRememberDurationS: Duration): Int = {
math.ceil(minRememberDurationS.milliseconds.toDouble / batchDuration.milliseconds).toInt
}
}

0 comments on commit 766f938

Please sign in to comment.