Skip to content

Commit

Permalink
Merge pull request apache#411 from tdas/filestream-fix
Browse files Browse the repository at this point in the history
Improved logic of finding new files in FileInputDStream

Earlier, if HDFS has a hiccup and reports a existence of a new file (mod time T sec) at time T + 1 sec, then fileStream could have missed that file. With this change, it should be able to find files that are delayed by up to <batch size> seconds. That is, even if file is reported at T + <batch time> sec, file stream should be able to catch it.

The new logic, at a high level, is as follows. It keeps track of the new files it found in the previous interval and mod time of the oldest of those files (lets call it X). Then in the current interval, it will ignore those files that were seen in the previous interval and those which have mod time older than X. So if a new file gets reported by HDFS that in the current interval, but has mod time in the previous interval, it will be considered. However, if the mod time earlier than the previous interval (that is, earlier than X), they will be ignored. This is the current limitation, and future version would improve this behavior further.

Also reduced line lengths in DStream to <=100 chars.
  • Loading branch information
pwendell committed Jan 14, 2014
2 parents 01c0d72 + c0bb38e commit a2fee38
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,19 @@ import org.apache.spark.streaming.Duration

/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
* sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
* for more details on RDDs). DStreams can either be created from live data (such as, data from
* HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
* such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
* DStream periodically generates a RDD, either from live data or by transforming the RDD generated
* by a parent DStream.
* sequence of RDDs (of the same type) representing a continuous stream of data (see
* [[org.apache.spark.rdd.RDD]] for more details on RDDs). DStreams can either be created from
* live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation
* existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`.
* While a Spark Streaming program is running, each DStream periodically generates a RDD,
* either from live data or by transforming the RDD generated by a parent DStream.
*
* This class contains the basic operations available on all DStreams, such as `map`, `filter` and
* `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available
* only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
* are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
* implicit conversions when `spark.streaming.StreamingContext._` is imported.
* `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
* operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
* `join`. These operations are automatically available on any DStream of pairs
* (e.g., DStream[(Int, Int)] through implicit conversions when
* `org.apache.spark.streaming.StreamingContext._` is imported.
*
* DStreams internally is characterized by a few basic properties:
* - A list of other DStreams that the DStream depends on
Expand Down Expand Up @@ -155,7 +156,8 @@ abstract class DStream[T: ClassTag] (
// Set the minimum value of the rememberDuration if not already set
var minRememberDuration = slideDuration
if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
minRememberDuration = checkpointDuration * 2 // times 2 just to be sure that the latest checkpoint is not forgetten
// times 2 just to be sure that the latest checkpoint is not forgotten (#paranoia)
minRememberDuration = checkpointDuration * 2
}
if (rememberDuration == null || rememberDuration < minRememberDuration) {
rememberDuration = minRememberDuration
Expand Down Expand Up @@ -259,7 +261,8 @@ abstract class DStream[T: ClassTag] (
if (!isInitialized) {
throw new Exception (this + " has not been initialized")
} else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime +
" and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
false
} else {
logDebug("Time " + time + " is valid")
Expand Down Expand Up @@ -288,11 +291,14 @@ abstract class DStream[T: ClassTag] (
case Some(newRDD) =>
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time)
logInfo("Persisting RDD " + newRDD.id + " for time " +
time + " to " + storageLevel + " at time " + time)
}
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
if (checkpointDuration != null &&
(time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time)
logInfo("Marking RDD " + newRDD.id + " for time " + time +
" for checkpointing at time " + time)
}
generatedRDDs.put(time, newRDD)
Some(newRDD)
Expand Down Expand Up @@ -401,7 +407,8 @@ abstract class DStream[T: ClassTag] (
}
}
} else {
throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.")
throw new java.io.NotSerializableException(
"Graph is unexpectedly null when DStream is being serialized.")
}
}

Expand Down Expand Up @@ -651,8 +658,8 @@ abstract class DStream[T: ClassTag] (

/**
* Return a new DStream in which each RDD has a single element generated by counting the number
* of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
* Spark's default number of partitions.
* of elements in a sliding window over this DStream. Hash partitioning is used to generate
* the RDDs with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
Expand Down Expand Up @@ -709,10 +716,12 @@ abstract class DStream[T: ClassTag] (
*/
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
}
if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
}
val alignedToTime = toTime.floor(slideDuration)
val alignedFromTime = fromTime.floor(slideDuration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,22 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas

protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData

// Latest file mod time seen till any point of time
private val prevModTimeFiles = new HashSet[String]()
private var prevModTime = 0L
// files found in the last interval
private val lastFoundFiles = new HashSet[String]

// Files with mod time earlier than this is ignored. This is updated every interval
// such that in the current interval, files older than any file found in the
// previous interval will be ignored. Obviously this time keeps moving forward.
private var ignoreTime = if (newFilesOnly) 0L else System.currentTimeMillis()

// Latest file mod time seen till any point of time
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
@transient private[streaming] var files = new HashMap[Time, Array[String]]
@transient private var fileModTimes = new TimeStampedHashMap[String, Long](true)
@transient private var lastNewFileFindingTime = 0L

override def start() {
if (newFilesOnly) {
prevModTime = graph.zeroTime.milliseconds
} else {
prevModTime = 0
}
logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly)
}
override def start() { }

override def stop() { }

Expand All @@ -70,20 +68,16 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* the previous call.
*/
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
assert(validTime.milliseconds >= prevModTime,
"Trying to get new files for really old time [" + validTime + " < " + prevModTime + "]")
assert(validTime.milliseconds >= ignoreTime,
"Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]")

// Find new files
val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds)
val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
if (newFiles.length > 0) {
// Update the modification time and the files processed for that modification time
if (prevModTime < latestModTime) {
prevModTime = latestModTime
prevModTimeFiles.clear()
}
prevModTimeFiles ++= latestModTimeFiles
logDebug("Last mod time updated to " + prevModTime)
if (!newFiles.isEmpty) {
lastFoundFiles.clear()
lastFoundFiles ++= newFiles
ignoreTime = minNewFileModTime
}
files += ((validTime, newFiles.toArray))
Some(filesToRDD(newFiles))
Expand All @@ -92,7 +86,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
/** Clear the old time-to-files mappings along with old RDDs */
protected[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time)
val oldFiles = files.filter(_._1 <= (time - rememberDuration))
val oldFiles = files.filter(_._1 < (time - rememberDuration))
files --= oldFiles.keys
logInfo("Cleared " + oldFiles.size + " old files that were older than " +
(time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
Expand All @@ -106,7 +100,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* Find files which have modification timestamp <= current time and return a 3-tuple of
* (new files found, latest modification time among them, files with latest modification time)
*/
private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
private def findNewFiles(currentTime: Long): (Seq[String], Long) = {
logDebug("Trying to get new files for time " + currentTime)
lastNewFileFindingTime = System.currentTimeMillis
val filter = new CustomPathFilter(currentTime)
Expand All @@ -121,7 +115,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
"files in the monitored directory."
)
}
(newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
(newFiles, filter.minNewFileModTime)
}

/** Generate one RDD from an array of files */
Expand Down Expand Up @@ -200,38 +194,42 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}

/**
* Custom PathFilter class to find new files that have modification timestamps <= current time,
* but have not been seen before (i.e. the file should not be in lastModTimeFiles)
* Custom PathFilter class to find new files that
* ... have modification time more than ignore time
* ... have not been seen in the last interval
* ... have modification time less than maxModTime
*/
private[streaming]
class CustomPathFilter(maxModTime: Long) extends PathFilter {
// Latest file mod time seen in this round of fetching files and its corresponding files
var latestModTime = 0L
val latestModTimeFiles = new HashSet[String]()

// Minimum of the mod times of new files found in the current interval
var minNewFileModTime = -1L

def accept(path: Path): Boolean = {
try {
if (!filter(path)) { // Reject file if it does not satisfy filter
logDebug("Rejected by filter " + path)
return false
}
// Reject file if it was found in the last interval
if (lastFoundFiles.contains(path.toString)) {
logDebug("Mod time equal to last mod time, but file considered already")
return false
}
val modTime = getFileModTime(path)
logDebug("Mod time for " + path + " is " + modTime)
if (modTime < prevModTime) {
logDebug("Mod time less than last mod time")
return false // If the file was created before the last time it was called
} else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) {
logDebug("Mod time equal to last mod time, but file considered already")
return false // If the file was created exactly as lastModTime but not reported yet
if (modTime < ignoreTime) {
// Reject file if it was created before the ignore time (or, before last interval)
logDebug("Mod time " + modTime + " less than ignore time " + ignoreTime)
return false
} else if (modTime > maxModTime) {
// Reject file if it is too new that considering it may give errors
logDebug("Mod time more than ")
return false // If the file is too new that considering it may give errors
return false
}
if (modTime > latestModTime) {
latestModTime = modTime
latestModTimeFiles.clear()
logDebug("Latest mod time updated to " + latestModTime)
if (minNewFileModTime < 0 || modTime < minNewFileModTime) {
minNewFileModTime = modTime
}
latestModTimeFiles += path.toString
logDebug("Accepted " + path)
} catch {
case fnfe: java.io.FileNotFoundException =>
Expand Down

0 comments on commit a2fee38

Please sign in to comment.