From 766f9386acbc82de52106b1e3025df5729d15a46 Mon Sep 17 00:00:00 2001 From: emres Date: Tue, 14 Apr 2015 17:32:18 +0200 Subject: [PATCH] SPARK-3276 Switched to using newly added getTimeAsSeconds method. * Switched to using newly added getTimeAsSeconds method (see https://github.com/apache/spark/pull/5236) * Renamed minRememberDuration to minRememberDurationS to be compatible with the examples in the pull request above. --- .../spark/streaming/dstream/FileInputDStream.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 1b093b12f6274..eca69f00188e4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -86,8 +86,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( * Files with mod times older than this "window" of remembering will be ignored. So if new * files are visible within this window, then the file will get selected in the next batch. */ - private val minRememberDuration = - Seconds(ssc.conf.getLong("spark.streaming.minRememberDuration", 60L)) + private val minRememberDurationS = + Seconds(ssc.conf.getTimeAsSeconds("spark.streaming.minRememberDuration", "60s")) // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock @@ -105,7 +105,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( * selected and processed. */ private val numBatchesToRemember = FileInputDStream - .calculateNumBatchesToRemember(slideDuration, minRememberDuration) + .calculateNumBatchesToRemember(slideDuration, minRememberDurationS) private val durationToRemember = slideDuration * numBatchesToRemember remember(durationToRemember) @@ -344,10 +344,10 @@ object FileInputDStream { /** * Calculate the number of last batches to remember, such that all the files selected in - * at least last minRememberDuration duration can be remembered. + * at least last minRememberDurationS duration can be remembered. */ def calculateNumBatchesToRemember(batchDuration: Duration, - minRememberDuration: Duration): Int = { - math.ceil(minRememberDuration.milliseconds.toDouble / batchDuration.milliseconds).toInt + minRememberDurationS: Duration): Int = { + math.ceil(minRememberDurationS.milliseconds.toDouble / batchDuration.milliseconds).toInt } }