Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-3276 Added a new configuration spark.streaming.minRememberDuration #5438

Closed
wants to merge 7 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkConf, SerializableWritable}
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.streaming._
import org.apache.spark.util.{TimeStampedHashMap, Utils}
Expand Down Expand Up @@ -63,7 +63,7 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
* the streaming app.
* - If a file is to be visible in the directory listings, it must be visible within a certain
* duration of the mod time of the file. This duration is the "remember window", which is set to
* 1 minute (see `FileInputDStream.MIN_REMEMBER_DURATION`). Otherwise, the file will never be
* 1 minute (see `FileInputDStream.minRememberDurationMin`). Otherwise, the file will never be
* selected as the mod time will be less than the ignore threshold when it becomes visible.
* - Once a file is visible, the mod time cannot change. If it does due to appends, then the
* processing semantics are undefined.
Expand All @@ -80,6 +80,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](

private val serializableConfOpt = conf.map(new SerializableWritable(_))

/**
* Minimum duration of remembering the information of selected files. Defaults to 1 minute.
*
* Files with mod times older than this "window" of remembering will be ignored. So if new
* files are visible within this window, then the file will get selected in the next batch.
*/
private val minRememberDurationMin = Minutes(ssc.sparkContext.getConf
.get("spark.streaming.minRememberDurationMin", "1")
.toLong)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use ssc.conf rather than ssc.sparkContext.getConf, also SparkConf has a API getLong which can be directly used.


// This is a def so that it works during checkpoint recovery:
private def clock = ssc.scheduler.clock

Expand All @@ -95,7 +105,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
* This would allow us to filter away not-too-old files which have already been recently
* selected and processed.
*/
private val numBatchesToRemember = FileInputDStream.calculateNumBatchesToRemember(slideDuration)
private val numBatchesToRemember = FileInputDStream
.calculateNumBatchesToRemember(slideDuration, minRememberDurationMin)
private val durationToRemember = slideDuration * numBatchesToRemember
remember(durationToRemember)

Expand Down Expand Up @@ -330,20 +341,14 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
private[streaming]
object FileInputDStream {

/**
* Minimum duration of remembering the information of selected files. Files with mod times
* older than this "window" of remembering will be ignored. So if new files are visible
* within this window, then the file will get selected in the next batch.
*/
private val MIN_REMEMBER_DURATION = Minutes(1)

def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")

/**
* Calculate the number of last batches to remember, such that all the files selected in
* at least last MIN_REMEMBER_DURATION duration can be remembered.
* at least last minRememberDurationMin duration can be remembered.
*/
def calculateNumBatchesToRemember(batchDuration: Duration): Int = {
math.ceil(MIN_REMEMBER_DURATION.milliseconds.toDouble / batchDuration.milliseconds).toInt
def calculateNumBatchesToRemember(batchDuration: Duration,
minRememberDurationMin: Duration): Int = {
math.ceil(minRememberDurationMin.milliseconds.toDouble / batchDuration.milliseconds).toInt
}
}