From c0bb38e8aa36c53e96a64b9bf8d2c8b020e93663 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 13 Jan 2014 16:54:52 -0800 Subject: [PATCH] Improved file input stream further. --- .../spark/streaming/dstream/DStream.scala | 49 ++++++----- .../streaming/dstream/FileInputDStream.scala | 82 +++++++++---------- 2 files changed, 69 insertions(+), 62 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index a7c4cca7eacab..9dfcc08abea95 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -35,18 +35,19 @@ import org.apache.spark.streaming.Duration /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous - * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]] - * for more details on RDDs). DStreams can either be created from live data (such as, data from - * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations - * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each - * DStream periodically generates a RDD, either from live data or by transforming the RDD generated - * by a parent DStream. + * sequence of RDDs (of the same type) representing a continuous stream of data (see + * [[org.apache.spark.rdd.RDD]] for more details on RDDs). DStreams can either be created from + * live data (such as, data from * HDFS, Kafka or Flume) or it can be generated by transformation + * existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`. + * While a Spark Streaming program is running, each DStream periodically generates a RDD, + * either from live data or by transforming the RDD generated by a parent DStream. * * This class contains the basic operations available on all DStreams, such as `map`, `filter` and - * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available - * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations - * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through - * implicit conversions when `spark.streaming.StreamingContext._` is imported. + * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains + * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and + * `join`. These operations are automatically available on any DStream of pairs + * (e.g., DStream[(Int, Int)] through implicit conversions when + * `org.apache.spark.streaming.StreamingContext._` is imported. * * DStreams internally is characterized by a few basic properties: * - A list of other DStreams that the DStream depends on @@ -155,7 +156,8 @@ abstract class DStream[T: ClassTag] ( // Set the minimum value of the rememberDuration if not already set var minRememberDuration = slideDuration if (checkpointDuration != null && minRememberDuration <= checkpointDuration) { - minRememberDuration = checkpointDuration * 2 // times 2 just to be sure that the latest checkpoint is not forgetten + // times 2 just to be sure that the latest checkpoint is not forgotten (#paranoia) + minRememberDuration = checkpointDuration * 2 } if (rememberDuration == null || rememberDuration < minRememberDuration) { rememberDuration = minRememberDuration @@ -259,7 +261,8 @@ abstract class DStream[T: ClassTag] ( if (!isInitialized) { throw new Exception (this + " has not been initialized") } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { - logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime)) + logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime)) false } else { logDebug("Time " + time + " is valid") @@ -288,11 +291,14 @@ abstract class DStream[T: ClassTag] ( case Some(newRDD) => if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) - logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time) + logInfo("Persisting RDD " + newRDD.id + " for time " + + time + " to " + storageLevel + " at time " + time) } - if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { + if (checkpointDuration != null && + (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() - logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time) + logInfo("Marking RDD " + newRDD.id + " for time " + time + + " for checkpointing at time " + time) } generatedRDDs.put(time, newRDD) Some(newRDD) @@ -401,7 +407,8 @@ abstract class DStream[T: ClassTag] ( } } } else { - throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.") + throw new java.io.NotSerializableException( + "Graph is unexpectedly null when DStream is being serialized.") } } @@ -651,8 +658,8 @@ abstract class DStream[T: ClassTag] ( /** * Return a new DStream in which each RDD has a single element generated by counting the number - * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with - * Spark's default number of partitions. + * of elements in a sliding window over this DStream. Hash partitioning is used to generate + * the RDDs with Spark's default number of partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -709,10 +716,12 @@ abstract class DStream[T: ClassTag] ( */ def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { - logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") + logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + + slideDuration + ")") } if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { - logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") + logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + + slideDuration + ")") } val alignedToTime = toTime.floor(slideDuration) val alignedFromTime = fromTime.floor(slideDuration) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 37c46b26a50b5..8a6051622e2d5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -39,24 +39,22 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData - // Latest file mod time seen till any point of time - private val prevModTimeFiles = new HashSet[String]() - private var prevModTime = 0L + // files found in the last interval + private val lastFoundFiles = new HashSet[String] + + // Files with mod time earlier than this is ignored. This is updated every interval + // such that in the current interval, files older than any file found in the + // previous interval will be ignored. Obviously this time keeps moving forward. + private var ignoreTime = if (newFilesOnly) 0L else System.currentTimeMillis() + // Latest file mod time seen till any point of time @transient private var path_ : Path = null @transient private var fs_ : FileSystem = null @transient private[streaming] var files = new HashMap[Time, Array[String]] @transient private var fileModTimes = new TimeStampedHashMap[String, Long](true) @transient private var lastNewFileFindingTime = 0L - override def start() { - if (newFilesOnly) { - prevModTime = graph.zeroTime.milliseconds - } else { - prevModTime = 0 - } - logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly) - } + override def start() { } override def stop() { } @@ -70,20 +68,16 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas * the previous call. */ override def compute(validTime: Time): Option[RDD[(K, V)]] = { - assert(validTime.milliseconds >= prevModTime, - "Trying to get new files for really old time [" + validTime + " < " + prevModTime + "]") + assert(validTime.milliseconds >= ignoreTime, + "Trying to get new files for a really old time [" + validTime + " < " + ignoreTime + "]") // Find new files - val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds) + val (newFiles, minNewFileModTime) = findNewFiles(validTime.milliseconds) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) - if (newFiles.length > 0) { - // Update the modification time and the files processed for that modification time - if (prevModTime < latestModTime) { - prevModTime = latestModTime - prevModTimeFiles.clear() - } - prevModTimeFiles ++= latestModTimeFiles - logDebug("Last mod time updated to " + prevModTime) + if (!newFiles.isEmpty) { + lastFoundFiles.clear() + lastFoundFiles ++= newFiles + ignoreTime = minNewFileModTime } files += ((validTime, newFiles.toArray)) Some(filesToRDD(newFiles)) @@ -92,7 +86,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas /** Clear the old time-to-files mappings along with old RDDs */ protected[streaming] override def clearMetadata(time: Time) { super.clearMetadata(time) - val oldFiles = files.filter(_._1 <= (time - rememberDuration)) + val oldFiles = files.filter(_._1 < (time - rememberDuration)) files --= oldFiles.keys logInfo("Cleared " + oldFiles.size + " old files that were older than " + (time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) @@ -106,7 +100,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas * Find files which have modification timestamp <= current time and return a 3-tuple of * (new files found, latest modification time among them, files with latest modification time) */ - private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = { + private def findNewFiles(currentTime: Long): (Seq[String], Long) = { logDebug("Trying to get new files for time " + currentTime) lastNewFileFindingTime = System.currentTimeMillis val filter = new CustomPathFilter(currentTime) @@ -121,7 +115,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas "files in the monitored directory." ) } - (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq) + (newFiles, filter.minNewFileModTime) } /** Generate one RDD from an array of files */ @@ -200,38 +194,42 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } /** - * Custom PathFilter class to find new files that have modification timestamps <= current time, - * but have not been seen before (i.e. the file should not be in lastModTimeFiles) + * Custom PathFilter class to find new files that + * ... have modification time more than ignore time + * ... have not been seen in the last interval + * ... have modification time less than maxModTime */ private[streaming] class CustomPathFilter(maxModTime: Long) extends PathFilter { - // Latest file mod time seen in this round of fetching files and its corresponding files - var latestModTime = 0L - val latestModTimeFiles = new HashSet[String]() + + // Minimum of the mod times of new files found in the current interval + var minNewFileModTime = -1L + def accept(path: Path): Boolean = { try { if (!filter(path)) { // Reject file if it does not satisfy filter logDebug("Rejected by filter " + path) return false } + // Reject file if it was found in the last interval + if (lastFoundFiles.contains(path.toString)) { + logDebug("Mod time equal to last mod time, but file considered already") + return false + } val modTime = getFileModTime(path) logDebug("Mod time for " + path + " is " + modTime) - if (modTime < prevModTime) { - logDebug("Mod time less than last mod time") - return false // If the file was created before the last time it was called - } else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) { - logDebug("Mod time equal to last mod time, but file considered already") - return false // If the file was created exactly as lastModTime but not reported yet + if (modTime < ignoreTime) { + // Reject file if it was created before the ignore time (or, before last interval) + logDebug("Mod time " + modTime + " less than ignore time " + ignoreTime) + return false } else if (modTime > maxModTime) { + // Reject file if it is too new that considering it may give errors logDebug("Mod time more than ") - return false // If the file is too new that considering it may give errors + return false } - if (modTime > latestModTime) { - latestModTime = modTime - latestModTimeFiles.clear() - logDebug("Latest mod time updated to " + latestModTime) + if (minNewFileModTime < 0 || modTime < minNewFileModTime) { + minNewFileModTime = modTime } - latestModTimeFiles += path.toString logDebug("Accepted " + path) } catch { case fnfe: java.io.FileNotFoundException =>