Skip to content

Commit

Permalink
Added unit tests to test reading of corrupted data and other minor edits
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Oct 23, 2014
1 parent 3881706 commit 9514dc8
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
*/
package org.apache.spark.streaming.util

private[streaming] case class FileSegment (path: String, offset: Long, length: Int)
/** Class for representing a segment of data in a write ahead log file */
private[streaming] case class WriteAheadLogFileSegment (path: String, offset: Long, length: Int)
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ private[streaming] class WriteAheadLogManager(
* ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
* to HDFS, and will be available for readers to read.
*/
def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
var fileSegment: FileSegment = null
def writeToLog(byteBuffer: ByteBuffer): WriteAheadLogFileSegment = synchronized {
var fileSegment: WriteAheadLogFileSegment = null
var failures = 0
var lastException: Exception = null
var succeeded = false
Expand Down Expand Up @@ -112,8 +112,8 @@ private[streaming] class WriteAheadLogManager(
val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
logFilesToRead.iterator.map { file =>
logDebug(s"Creating log reader with $file")
new WriteAheadLogReader(file, hadoopConf)
logDebug(s"Creating log reader with $file")
new WriteAheadLogReader(file, hadoopConf)
} flatMap { x => x }
}

Expand Down Expand Up @@ -208,6 +208,7 @@ private[util] object WriteAheadLogManager {
s"log-$startTime-$stopTime"
}

/** Convert a sequence of files to a sequence of sorted LogInfo objects */
def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = {
files.flatMap { file =>
logFileRegex.findFirstIn(file.getName()) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configura
private val instream = HdfsUtils.getInputStream(path, conf)
private var closed = false

def read(segment: FileSegment): ByteBuffer = synchronized {
def read(segment: WriteAheadLogFileSegment): ByteBuffer = synchronized {
assertOpen()
instream.seek(segment.offset)
val nextLength = instream.readInt()
HdfsUtils.checkState(nextLength == segment.length,
"Expected message length to be " + segment.length + ", " + "but was " + nextLength)
s"Expected message length to be ${segment.length}, but was $nextLength")
val buffer = new Array[Byte](nextLength)
instream.readFully(buffer)
ByteBuffer.wrap(buffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,49 +34,46 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura
private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)

private lazy val hadoopFlushMethod = {
// Use reflection to get the right flush operation
val cls = classOf[FSDataOutputStream]
Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption
}

private var nextOffset = getPosition()
private var nextOffset = stream.getPos()
private var closed = false


/** Write the bytebuffer to the log file */
def write(data: ByteBuffer): FileSegment = synchronized {
def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized {
assertOpen()
data.rewind() // Rewind to ensure all data in the buffer is retrieved
val lengthToWrite = data.remaining()
val segment = new FileSegment(path, nextOffset, lengthToWrite)
val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite)
stream.writeInt(lengthToWrite)
if (data.hasArray) {
stream.write(data.array())
} else {
// If the buffer is not backed by an array we need to write the data byte by byte
// If the buffer is not backed by an array, we transfer using temp array
// Note that despite the extra array copy, this should be faster than byte-by-byte copy
while (data.hasRemaining) {
stream.write(data.get())
val array = new Array[Byte](data.remaining)
data.get(array)
stream.write(array)
}
}
flush()
nextOffset = getPosition()
nextOffset = stream.getPos()
segment
}

override private[streaming] def close(): Unit = synchronized {
override def close(): Unit = synchronized {
closed = true
stream.close()
}


private def getPosition(): Long = {
stream.getPos()
}

private def flush() {
hadoopFlushMethod.foreach { _.invoke(stream) }
// Useful for local file system where hflush/sync does not work (HADOOP-7844)
stream.getWrappedStream.flush()
hadoopFlushMethod.foreach {
_.invoke(stream)
}
}

private def assertOpen() {
Expand Down
Loading

0 comments on commit 9514dc8

Please sign in to comment.