diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala similarity index 83% rename from streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala rename to streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala index fb33507768e6d..1005a2c8ec303 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala @@ -16,4 +16,5 @@ */ package org.apache.spark.streaming.util -private[streaming] case class FileSegment (path: String, offset: Long, length: Int) +/** Class for representing a segment of data in a write ahead log file */ +private[streaming] case class WriteAheadLogFileSegment (path: String, offset: Long, length: Int) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala index 2dc2507b33cb5..f0c552e9593c4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala @@ -75,8 +75,8 @@ private[streaming] class WriteAheadLogManager( * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed * to HDFS, and will be available for readers to read. */ - def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized { - var fileSegment: FileSegment = null + def writeToLog(byteBuffer: ByteBuffer): WriteAheadLogFileSegment = synchronized { + var fileSegment: WriteAheadLogFileSegment = null var failures = 0 var lastException: Exception = null var succeeded = false @@ -112,8 +112,8 @@ private[streaming] class WriteAheadLogManager( val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath logInfo("Reading from the logs: " + logFilesToRead.mkString("\n")) logFilesToRead.iterator.map { file => - logDebug(s"Creating log reader with $file") - new WriteAheadLogReader(file, hadoopConf) + logDebug(s"Creating log reader with $file") + new WriteAheadLogReader(file, hadoopConf) } flatMap { x => x } } @@ -208,6 +208,7 @@ private[util] object WriteAheadLogManager { s"log-$startTime-$stopTime" } + /** Convert a sequence of files to a sequence of sorted LogInfo objects */ def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = { files.flatMap { file => logFileRegex.findFirstIn(file.getName()) match { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala index 16ad8279528aa..92bad7a882a65 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala @@ -32,12 +32,12 @@ private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configura private val instream = HdfsUtils.getInputStream(path, conf) private var closed = false - def read(segment: FileSegment): ByteBuffer = synchronized { + def read(segment: WriteAheadLogFileSegment): ByteBuffer = synchronized { assertOpen() instream.seek(segment.offset) val nextLength = instream.readInt() HdfsUtils.checkState(nextLength == segment.length, - "Expected message length to be " + segment.length + ", " + "but was " + nextLength) + s"Expected message length to be ${segment.length}, but was $nextLength") val buffer = new Array[Byte](nextLength) instream.readFully(buffer) ByteBuffer.wrap(buffer) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala index d8341f0b1c936..679f6a6dfd7c1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala @@ -34,49 +34,46 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf) private lazy val hadoopFlushMethod = { + // Use reflection to get the right flush operation val cls = classOf[FSDataOutputStream] Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption } - private var nextOffset = getPosition() + private var nextOffset = stream.getPos() private var closed = false - /** Write the bytebuffer to the log file */ - def write(data: ByteBuffer): FileSegment = synchronized { + def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized { assertOpen() data.rewind() // Rewind to ensure all data in the buffer is retrieved val lengthToWrite = data.remaining() - val segment = new FileSegment(path, nextOffset, lengthToWrite) + val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite) stream.writeInt(lengthToWrite) if (data.hasArray) { stream.write(data.array()) } else { - // If the buffer is not backed by an array we need to write the data byte by byte + // If the buffer is not backed by an array, we transfer using temp array + // Note that despite the extra array copy, this should be faster than byte-by-byte copy while (data.hasRemaining) { - stream.write(data.get()) + val array = new Array[Byte](data.remaining) + data.get(array) + stream.write(array) } } flush() - nextOffset = getPosition() + nextOffset = stream.getPos() segment } - override private[streaming] def close(): Unit = synchronized { + override def close(): Unit = synchronized { closed = true stream.close() } - - private def getPosition(): Long = { - stream.getPos() - } - private def flush() { + hadoopFlushMethod.foreach { _.invoke(stream) } + // Useful for local file system where hflush/sync does not work (HADOOP-7844) stream.getWrappedStream.flush() - hadoopFlushMethod.foreach { - _.invoke(stream) - } } private def assertOpen() { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index bd21f462b46b4..f86998b5a66b9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -19,33 +19,36 @@ package org.apache.spark.streaming.util import java.io._ import java.nio.ByteBuffer -import org.apache.hadoop.fs.Path - import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ -import scala.language.implicitConversions -import scala.language.postfixOps +import scala.language.{implicitConversions, postfixOps} import scala.util.Random import WriteAheadLogSuite._ import com.google.common.io.Files import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.apache.hadoop.fs.Path import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.concurrent.Eventually._ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { val hadoopConf = new Configuration() var tempDir: File = null - var dirForTest: String = null - var fileForTest: String = null + var testDir: String = null + var testFile: String = null + var manager: WriteAheadLogManager = null before { tempDir = Files.createTempDir() - dirForTest = "file:///" + tempDir.toString - fileForTest = "file:///" + new File(tempDir, getRandomString()).toString + testDir = tempDir.toString + testFile = new File(tempDir, Random.nextString(10)).toString + if (manager != null) { + manager.stop() + manager = null + } } after { @@ -54,32 +57,28 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { test("WriteAheadLogWriter - writing data") { val dataToWrite = generateRandomData() - val writer = new WriteAheadLogWriter(fileForTest, hadoopConf) - val segments = dataToWrite.map(data => writer.write(data)) - writer.close() - val writtenData = readDataManually(fileForTest, segments) - assert(writtenData.toArray === dataToWrite.toArray) + val segments = writeDataUsingWriter(testFile, dataToWrite) + val writtenData = readDataManually(testFile, segments) + assert(writtenData === dataToWrite) } test("WriteAheadLogWriter - syncing of data by writing and reading immediately") { val dataToWrite = generateRandomData() - val writer = new WriteAheadLogWriter(fileForTest, hadoopConf) + val writer = new WriteAheadLogWriter(testFile, hadoopConf) dataToWrite.foreach { data => val segment = writer.write(stringToByteBuffer(data)) - val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf) - val dataRead = reader.read(segment) - assert(data === byteBufferToString(dataRead)) + val dataRead = readDataManually(testFile, Seq(segment)).head + assert(data === dataRead) } writer.close() } test("WriteAheadLogReader - sequentially reading data") { - // Write data manually for testing the sequential reader val writtenData = generateRandomData() - writeDataManually(writtenData, fileForTest) - val reader = new WriteAheadLogReader(fileForTest, hadoopConf) + writeDataManually(writtenData, testFile) + val reader = new WriteAheadLogReader(testFile, hadoopConf) val readData = reader.toSeq.map(byteBufferToString) - assert(readData.toList === writtenData.toList) + assert(readData === writtenData) assert(reader.hasNext === false) intercept[Exception] { reader.next() @@ -88,25 +87,43 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { } test("WriteAheadLogReader - sequentially reading data written with writer") { + val dataToWrite = generateRandomData() + writeDataUsingWriter(testFile, dataToWrite) + val readData = readDataUsingReader(testFile) + assert(readData === dataToWrite) + } + + test("WriteAheadLogReader - reading data written with writer after corrupted write") { // Write data manually for testing the sequential reader val dataToWrite = generateRandomData() - writeDataUsingWriter(fileForTest, dataToWrite) - val iter = dataToWrite.iterator - val reader = new WriteAheadLogReader(fileForTest, hadoopConf) - reader.foreach { byteBuffer => - assert(byteBufferToString(byteBuffer) === iter.next()) - } - reader.close() + writeDataUsingWriter(testFile, dataToWrite) + val fileLength = new File(testFile).length() + + // Append some garbage data to get the effect of a corrupted write + val fw = new FileWriter(testFile, true) + fw.append("This line appended to file!") + fw.close() + + // Verify the data can be read and is same as the one correctly written + assert(readDataUsingReader(testFile) === dataToWrite) + + // Corrupt the last correctly written file + val raf = new FileOutputStream(testFile, true).getChannel() + raf.truncate(fileLength - 1) + raf.close() + + // Verify all the data except the last can be read + assert(readDataUsingReader(testFile) === (dataToWrite.dropRight(1))) } test("WriteAheadLogRandomReader - reading data using random reader") { // Write data manually for testing the random reader val writtenData = generateRandomData() - val segments = writeDataManually(writtenData, fileForTest) + val segments = writeDataManually(writtenData, testFile) // Get a random order of these segments and read them back val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten - val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf) + val reader = new WriteAheadLogRandomReader(testFile, hadoopConf) writtenDataAndSegments.foreach { case (data, segment) => assert(data === byteBufferToString(reader.read(segment))) } @@ -116,11 +133,11 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { test("WriteAheadLogRandomReader - reading data using random reader written with writer") { // Write data using writer for testing the random reader val data = generateRandomData() - val segments = writeDataUsingWriter(fileForTest, data) + val segments = writeDataUsingWriter(testFile, data) // Read a random sequence of segments and verify read data val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten - val reader = new WriteAheadLogRandomReader(fileForTest, hadoopConf) + val reader = new WriteAheadLogRandomReader(testFile, hadoopConf) dataAndSegments.foreach { case (data, segment) => assert(data === byteBufferToString(reader.read(segment))) } @@ -130,91 +147,112 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { test("WriteAheadLogManager - write rotating logs") { // Write data using manager val dataToWrite = generateRandomData() - writeDataUsingManager(dirForTest, dataToWrite) + writeDataUsingManager(testDir, dataToWrite) // Read data manually to verify the written data - val logFiles = getLogFilesInDirectory(dirForTest) + val logFiles = getLogFilesInDirectory(testDir) assert(logFiles.size > 1) val writtenData = logFiles.flatMap { file => readDataManually(file)} - assert(writtenData.toList === dataToWrite.toList) + assert(writtenData === dataToWrite) } test("WriteAheadLogManager - read rotating logs") { // Write data manually for testing reading through manager val writtenData = (1 to 10).map { i => val data = generateRandomData() - val file = dirForTest + s"/log-$i-$i" + val file = testDir + s"/log-$i-$i" writeDataManually(data, file) data }.flatten - val logDirectoryPath = new Path(dirForTest) + val logDirectoryPath = new Path(testDir) val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) assert(fileSystem.exists(logDirectoryPath) === true) // Read data using manager and verify - val readData = readDataUsingManager(dirForTest) - assert(readData.toList === writtenData.toList) + val readData = readDataUsingManager(testDir) + assert(readData === writtenData) } test("WriteAheadLogManager - recover past logs when creating new manager") { // Write data with manager, recover with new manager and verify val dataToWrite = generateRandomData() - writeDataUsingManager(dirForTest, dataToWrite) - val logFiles = getLogFilesInDirectory(dirForTest) + writeDataUsingManager(testDir, dataToWrite) + val logFiles = getLogFilesInDirectory(testDir) assert(logFiles.size > 1) - val readData = readDataUsingManager(dirForTest) - assert(dataToWrite.toList === readData.toList) + val readData = readDataUsingManager(testDir) + assert(dataToWrite === readData) } test("WriteAheadLogManager - cleanup old logs") { // Write data with manager, recover with new manager and verify + val manualClock = new ManualClock val dataToWrite = generateRandomData() - val fakeClock = new ManualClock - val manager = new WriteAheadLogManager(dirForTest, hadoopConf, - rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) - dataToWrite.foreach { item => - fakeClock.addToTime(500) // half second for each - manager.writeToLog(item) - } - val logFiles = getLogFilesInDirectory(dirForTest) + manager = writeDataUsingManager(testDir, dataToWrite, manualClock, stopManager = false) + val logFiles = getLogFilesInDirectory(testDir) assert(logFiles.size > 1) - manager.cleanupOldLogs(fakeClock.currentTime() / 2) + manager.cleanupOldLogs(manualClock.currentTime() / 2) eventually(timeout(1 second), interval(10 milliseconds)) { - assert(getLogFilesInDirectory(dirForTest).size < logFiles.size) + assert(getLogFilesInDirectory(testDir).size < logFiles.size) + } + } + + test("WriteAheadLogManager - handling file errors while reading rotating logs") { + // Generate a set of log files + val manualClock = new ManualClock + val dataToWrite1 = generateRandomData() + writeDataUsingManager(testDir, dataToWrite1, manualClock) + val logFiles1 = getLogFilesInDirectory(testDir) + assert(logFiles1.size > 1) + + + // Recover old files and generate a second set of log files + val dataToWrite2 = generateRandomData() + manualClock.addToTime(100000) + writeDataUsingManager(testDir, dataToWrite2, manualClock) + val logFiles2 = getLogFilesInDirectory(testDir) + assert(logFiles2.size > logFiles1.size) + + // Read the files and verify that all the written data can be read + val readData1 = readDataUsingManager(testDir) + assert(readData1 === (dataToWrite1 ++ dataToWrite2)) + + // Corrupt the first set of files so that they are basically unreadable + logFiles1.foreach { f => + val raf = new FileOutputStream(f, true).getChannel() + raf.truncate(1) + raf.close() } + + // Verify that the corrupted files do not prevent reading of the second set of data + val readData = readDataUsingManager(testDir) + assert(readData === dataToWrite2) } - // TODO (Hari, TD): Test different failure conditions of writers and readers. - // - Failure while reading incomplete/corrupt file } object WriteAheadLogSuite { private val hadoopConf = new Configuration() - /** - * Write data to the file and returns the an array of the bytes written. - * This is used to test the WAL reader independently of the WAL writer. - */ - def writeDataManually(data: Seq[String], file: String): Seq[FileSegment] = { - val segments = new ArrayBuffer[FileSegment]() + /** Write data to a file directly and return an array of the file segments written. */ + def writeDataManually(data: Seq[String], file: String): Seq[WriteAheadLogFileSegment] = { + val segments = new ArrayBuffer[WriteAheadLogFileSegment]() val writer = HdfsUtils.getOutputStream(file, hadoopConf) data.foreach { item => val offset = writer.getPos val bytes = Utils.serialize(item) writer.writeInt(bytes.size) writer.write(bytes) - segments += FileSegment(file, offset, bytes.size) + segments += WriteAheadLogFileSegment(file, offset, bytes.size) } writer.close() segments } - def getRandomString(): String = { - new String(Random.alphanumeric.take(6).toArray) - } - - def writeDataUsingWriter(filePath: String, data: Seq[String]): Seq[FileSegment] = { + /** + * Write data to a file using the writer class and return an array of the file segments written. + */ + def writeDataUsingWriter(filePath: String, data: Seq[String]): Seq[WriteAheadLogFileSegment] = { val writer = new WriteAheadLogWriter(filePath, hadoopConf) val segments = data.map { item => writer.write(item) @@ -223,24 +261,27 @@ object WriteAheadLogSuite { segments } - def writeDataUsingManager(logDirectory: String, data: Seq[String]) { - val fakeClock = new ManualClock - fakeClock.setTime(1000000) + /** Write data to rotating files in log directory using the manager class. */ + def writeDataUsingManager( + logDirectory: String, + data: Seq[String], + manualClock: ManualClock = new ManualClock, + stopManager: Boolean = true + ): WriteAheadLogManager = { + if (manualClock.currentTime < 100000) manualClock.setTime(10000) val manager = new WriteAheadLogManager(logDirectory, hadoopConf, - rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) + rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = manualClock) // Ensure that 500 does not get sorted after 2000, so put a high base value. data.foreach { item => - fakeClock.addToTime(500) + manualClock.addToTime(500) manager.writeToLog(item) } - manager.stop() + if (stopManager) manager.stop() + manager } - /** - * Read data from the given segments of log file and returns the read list of byte buffers. - * This is used to test the WAL writer independently of the WAL reader. - */ - def readDataManually(file: String, segments: Seq[FileSegment]): Seq[String] = { + /** Read data from a segments of a log file directly and return the list of byte buffers.*/ + def readDataManually(file: String, segments: Seq[WriteAheadLogFileSegment]): Seq[String] = { val reader = HdfsUtils.getInputStream(file, hadoopConf) segments.map { x => reader.seek(x.offset) @@ -251,6 +292,7 @@ object WriteAheadLogSuite { } } + /** Read all the data from a log file directly and return the list of byte buffers. */ def readDataManually(file: String): Seq[String] = { val reader = HdfsUtils.getInputStream(file, hadoopConf) val buffer = new ArrayBuffer[String] @@ -270,6 +312,15 @@ object WriteAheadLogSuite { buffer } + /** Read all the data from a log file using reader class and return the list of byte buffers. */ + def readDataUsingReader(file: String): Seq[String] = { + val reader = new WriteAheadLogReader(file, hadoopConf) + val readData = reader.toList.map(byteBufferToString) + reader.close() + readData + } + + /** Read all the data in the log file in a directory using the manager class. */ def readDataUsingManager(logDirectory: String): Seq[String] = { val manager = new WriteAheadLogManager(logDirectory, hadoopConf, callerName = "WriteAheadLogSuite") @@ -278,25 +329,24 @@ object WriteAheadLogSuite { data } - def generateRandomData(): Seq[String] = { - (1 to 50).map { - _.toString - } - } - + /** Get the log files in a direction */ def getLogFilesInDirectory(directory: String): Seq[String] = { val logDirectoryPath = new Path(directory) val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { fileSystem.listStatus(logDirectoryPath).map { - _.getPath.toString + _.getPath.toString.stripPrefix("file:") }.sorted } else { Seq.empty } } + def generateRandomData(): Seq[String] = { + (1 to 100).map { _.toString } + } + implicit def stringToByteBuffer(str: String): ByteBuffer = { ByteBuffer.wrap(Utils.serialize(str)) }