Skip to content

Commit

Permalink
Improved file input stream further.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Jan 14, 2014
1 parent b93f9d4 commit c0bb38e
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,19 @@ import org.apache.spark.streaming.Duration

/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
* sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
* for more details on RDDs). DStreams can either be created from live data (such as, data from
* HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
* such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
* DStream periodically generates a RDD, either from live data or by transforming the RDD generated
* by a parent DStream.
* sequence of RDDs (of the same type) representing a continuous stream of data (see
* [[org.apache.spark.rdd.RDD]] for more details on RDDs). DStreams can either be created from
* live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation
* existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`.
* While a Spark Streaming program is running, each DStream periodically generates a RDD,
* either from live data or by transforming the RDD generated by a parent DStream.
*
* This class contains the basic operations available on all DStreams, such as `map`, `filter` and
* `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available
* only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
* are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
* implicit conversions when `spark.streaming.StreamingContext._` is imported.
* `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
* operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
* `join`. These operations are automatically available on any DStream of pairs
* (e.g., DStream[(Int, Int)] through implicit conversions when
* `org.apache.spark.streaming.StreamingContext._` is imported.
*
* DStreams internally is characterized by a few basic properties:
* - A list of other DStreams that the DStream depends on
Expand Down Expand Up @@ -155,7 +156,8 @@ abstract class DStream[T: ClassTag] (
// Set the minimum value of the rememberDuration if not already set
var minRememberDuration = slideDuration
if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
minRememberDuration = checkpointDuration * 2 // times 2 just to be sure that the latest checkpoint is not forgetten
// times 2 just to be sure that the latest checkpoint is not forgotten (#paranoia)
minRememberDuration = checkpointDuration * 2
}
if (rememberDuration == null || rememberDuration < minRememberDuration) {
rememberDuration = minRememberDuration
Expand Down Expand Up @@ -259,7 +261,8 @@ abstract class DStream[T: ClassTag] (
if (!isInitialized) {
throw new Exception (this + " has not been initialized")
} else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime +
" and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
false
} else {
logDebug("Time " + time + " is valid")
Expand Down Expand Up @@ -288,11 +291,14 @@ abstract class DStream[T: ClassTag] (
case Some(newRDD) =>
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time)
logInfo("Persisting RDD " + newRDD.id + " for time " +
time + " to " + storageLevel + " at time " + time)
}
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
if (checkpointDuration != null &&
(time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time)
logInfo("Marking RDD " + newRDD.id + " for time " + time +
" for checkpointing at time " + time)
}
generatedRDDs.put(time, newRDD)
Some(newRDD)
Expand Down Expand Up @@ -401,7 +407,8 @@ abstract class DStream[T: ClassTag] (
}
}
} else {
throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.")
throw new java.io.NotSerializableException(
"Graph is unexpectedly null when DStream is being serialized.")
}
}

Expand Down Expand Up @@ -651,8 +658,8 @@ abstract class DStream[T: ClassTag] (

/**
* Return a new DStream in which each RDD has a single element generated by counting the number
* of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
* Spark's default number of partitions.
* of elements in a sliding window over this DStream. Hash partitioning is used to generate
* the RDDs with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
Expand Down Expand Up @@ -709,10 +716,12 @@ abstract class DStream[T: ClassTag] (
*/
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
}
if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
}
val alignedToTime = toTime.floor(slideDuration)
val alignedFromTime = fromTime.floor(slideDuration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,22 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas

protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData

// Latest file mod time seen till any point of time
private val prevModTimeFiles = new HashSet[String]()
private var prevModTime = 0L
// files found in the last interval
private val lastFoundFiles = new HashSet[String]

// Files with mod time earlier than this is ignored. This is updated every interval
// such that in the current interval, files older than any file found in the
// previous interval will be ignored. Obviously this time keeps moving forward.
private var ignoreTime = if (newFilesOnly) 0L else System.currentTimeMillis()

// Latest file mod time seen till any point of time
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
@transient private[streaming] var files = new HashMap[Time, Array[String]]
@transient private var fileModTimes = new TimeStampedHashMap[String, Long](true)
@transient private var lastNewFileFindingTime = 0L

override def start() {
if (newFilesOnly) {
prevModTime = graph.zeroTime.milliseconds
} else {
prevModTime = 0
}
logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly)
}
override def start() { }

override def stop() { }

Expand All @@ -70,20 +68,16 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* the previous call.
*/
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
assert(validTime.milliseconds >= prevModTime,
"Trying to get new files for really old time [" + validTime + " < " + prevModTime + "]")
assert(validTime.milliseconds >= ignoreTime,
"Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]")

// Find new files
val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds)
val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
if (newFiles.length > 0) {
// Update the modification time and the files processed for that modification time
if (prevModTime < latestModTime) {
prevModTime = latestModTime
prevModTimeFiles.clear()
}
prevModTimeFiles ++= latestModTimeFiles
logDebug("Last mod time updated to " + prevModTime)
if (!newFiles.isEmpty) {
lastFoundFiles.clear()
lastFoundFiles ++= newFiles
ignoreTime = minNewFileModTime
}
files += ((validTime, newFiles.toArray))
Some(filesToRDD(newFiles))
Expand All @@ -92,7 +86,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
/** Clear the old time-to-files mappings along with old RDDs */
protected[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time)
val oldFiles = files.filter(_._1 <= (time - rememberDuration))
val oldFiles = files.filter(_._1 < (time - rememberDuration))
files --= oldFiles.keys
logInfo("Cleared " + oldFiles.size + " old files that were older than " +
(time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
Expand All @@ -106,7 +100,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* Find files which have modification timestamp <= current time and return a 3-tuple of
* (new files found, latest modification time among them, files with latest modification time)
*/
private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
private def findNewFiles(currentTime: Long): (Seq[String], Long) = {
logDebug("Trying to get new files for time " + currentTime)
lastNewFileFindingTime = System.currentTimeMillis
val filter = new CustomPathFilter(currentTime)
Expand All @@ -121,7 +115,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
"files in the monitored directory."
)
}
(newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
(newFiles, filter.minNewFileModTime)
}

/** Generate one RDD from an array of files */
Expand Down Expand Up @@ -200,38 +194,42 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}

/**
* Custom PathFilter class to find new files that have modification timestamps <= current time,
* but have not been seen before (i.e. the file should not be in lastModTimeFiles)
* Custom PathFilter class to find new files that
* ... have modification time more than ignore time
* ... have not been seen in the last interval
* ... have modification time less than maxModTime
*/
private[streaming]
class CustomPathFilter(maxModTime: Long) extends PathFilter {
// Latest file mod time seen in this round of fetching files and its corresponding files
var latestModTime = 0L
val latestModTimeFiles = new HashSet[String]()

// Minimum of the mod times of new files found in the current interval
var minNewFileModTime = -1L

def accept(path: Path): Boolean = {
try {
if (!filter(path)) { // Reject file if it does not satisfy filter
logDebug("Rejected by filter " + path)
return false
}
// Reject file if it was found in the last interval
if (lastFoundFiles.contains(path.toString)) {
logDebug("Mod time equal to last mod time, but file considered already")
return false
}
val modTime = getFileModTime(path)
logDebug("Mod time for " + path + " is " + modTime)
if (modTime < prevModTime) {
logDebug("Mod time less than last mod time")
return false // If the file was created before the last time it was called
} else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) {
logDebug("Mod time equal to last mod time, but file considered already")
return false // If the file was created exactly as lastModTime but not reported yet
if (modTime < ignoreTime) {
// Reject file if it was created before the ignore time (or, before last interval)
logDebug("Mod time " + modTime + " less than ignore time " + ignoreTime)
return false
} else if (modTime > maxModTime) {
// Reject file if it is too new that considering it may give errors
logDebug("Mod time more than ")
return false // If the file is too new that considering it may give errors
return false
}
if (modTime > latestModTime) {
latestModTime = modTime
latestModTimeFiles.clear()
logDebug("Latest mod time updated to " + latestModTime)
if (minNewFileModTime < 0 || modTime < minNewFileModTime) {
minNewFileModTime = modTime
}
latestModTimeFiles += path.toString
logDebug("Accepted " + path)
} catch {
case fnfe: java.io.FileNotFoundException =>
Expand Down

0 comments on commit c0bb38e

Please sign in to comment.