From 172358de10a61f296e52fa347c2e40aa87490ecf Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 20 Oct 2014 19:52:55 -0700 Subject: [PATCH] Pulled WriteAheadLog-related stuff from tdas/spark/tree/driver-ha-working --- .../spark/streaming/storage/FileSegment.scala | 19 ++ .../spark/streaming/storage/HdfsUtils.scala | 72 +++++ .../storage/WriteAheadLogManager.scala | 176 +++++++++++ .../storage/WriteAheadLogRandomReader.scala | 50 ++++ .../storage/WriteAheadLogReader.scala | 76 +++++ .../storage/WriteAheadLogWriter.scala | 100 +++++++ .../storage/WriteAheadLogSuite.scala | 277 ++++++++++++++++++ 7 files changed, 770 insertions(+) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala new file mode 100644 index 0000000000000..eb9c07e9cf61f --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +private[streaming] case class FileSegment (path: String, offset: Long, length: Int) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala new file mode 100644 index 0000000000000..efb12b82ae949 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path} + +private[streaming] object HdfsUtils { + + def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = { + // HDFS is not thread-safe when getFileSystem is called, so synchronize on that + + val dfsPath = new Path(path) + val dfs = + this.synchronized { + dfsPath.getFileSystem(conf) + } + // If the file exists and we have append support, append instead of creating a new file + val stream: FSDataOutputStream = { + if (dfs.isFile(dfsPath)) { + if (conf.getBoolean("hdfs.append.support", false)) { + dfs.append(dfsPath) + } else { + throw new IllegalStateException("File exists and there is no append support!") + } + } else { + dfs.create(dfsPath) + } + } + stream + } + + def getInputStream(path: String, conf: Configuration): FSDataInputStream = { + val dfsPath = new Path(path) + val dfs = this.synchronized { + dfsPath.getFileSystem(conf) + } + val instream = dfs.open(dfsPath) + instream + } + + def checkState(state: Boolean, errorMsg: => String) { + if(!state) { + throw new IllegalStateException(errorMsg) + } + } + + def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = { + val dfsPath = new Path(path) + val dfs = + this.synchronized { + dfsPath.getFileSystem(conf) + } + val fileStatus = dfs.getFileStatus(dfsPath) + val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)) + blockLocs.map(_.flatMap(_.getHosts)) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala new file mode 100644 index 0000000000000..c70ecb0da4e54 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala @@ -0,0 +1,176 @@ +package org.apache.spark.streaming.storage + +import java.nio.ByteBuffer + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{ExecutionContext, Future} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.spark.Logging +import org.apache.spark.streaming.storage.WriteAheadLogManager._ +import org.apache.spark.streaming.util.{Clock, SystemClock} +import org.apache.spark.util.Utils + +private[streaming] class WriteAheadLogManager( + logDirectory: String, + hadoopConf: Configuration, + rollingIntervalSecs: Int = 60, + maxFailures: Int = 3, + callerName: String = "", + clock: Clock = new SystemClock + ) extends Logging { + + private val pastLogs = new ArrayBuffer[LogInfo] + private val callerNameTag = + if (callerName != null && callerName.nonEmpty) s" for $callerName" else "" + private val threadpoolName = s"WriteAheadLogManager $callerNameTag" + implicit private val executionContext = ExecutionContext.fromExecutorService( + Utils.newDaemonFixedThreadPool(1, threadpoolName)) + override protected val logName = s"WriteAheadLogManager $callerNameTag" + + private var currentLogPath: String = null + private var currentLogWriter: WriteAheadLogWriter = null + private var currentLogWriterStartTime: Long = -1L + private var currentLogWriterStopTime: Long = -1L + + initializeOrRecover() + + def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized { + var fileSegment: FileSegment = null + var failures = 0 + var lastException: Exception = null + var succeeded = false + while (!succeeded && failures < maxFailures) { + try { + fileSegment = getLogWriter(clock.currentTime).write(byteBuffer) + succeeded = true + } catch { + case ex: Exception => + lastException = ex + logWarning("Failed to ...") + resetWriter() + failures += 1 + } + } + if (fileSegment == null) { + throw lastException + } + fileSegment + } + + def readFromLog(): Iterator[ByteBuffer] = synchronized { + val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath) + logInfo("Reading from the logs: " + logFilesToRead.mkString("\n")) + logFilesToRead.iterator.map { file => + logDebug(s"Creating log reader with $file") + new WriteAheadLogReader(file, hadoopConf) + } flatMap { x => x } + } + + /** + * Delete the log files that are older than the threshold time. + * + * Its important to note that the threshold time is based on the time stamps used in the log + * files, and is therefore based on the local system time. So if there is coordination necessary + * between the node calculating the threshTime (say, driver node), and the local system time + * (say, worker node), the caller has to take account of possible time skew. + */ + def cleanupOldLogs(threshTime: Long): Unit = { + val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } } + logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " + + s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}") + + def deleteFiles() { + oldLogFiles.foreach { logInfo => + try { + val path = new Path(logInfo.path) + val fs = hadoopConf.synchronized { path.getFileSystem(hadoopConf) } + fs.delete(path, true) + synchronized { pastLogs -= logInfo } + logDebug(s"Cleared log file $logInfo") + } catch { + case ex: Exception => + logWarning(s"Error clearing log file $logInfo", ex) + } + } + logInfo(s"Cleared log files in $logDirectory older than $threshTime") + } + if (!executionContext.isShutdown) { + Future { deleteFiles() } + } + } + + def stop(): Unit = synchronized { + if (currentLogWriter != null) { + currentLogWriter.close() + } + executionContext.shutdown() + logInfo("Stopped log manager") + } + + private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized { + if (currentLogWriter == null || currentTime > currentLogWriterStopTime) { + resetWriter() + if (currentLogPath != null) { + pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, currentLogPath) + } + currentLogWriterStartTime = currentTime + currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000) + val newLogPath = new Path(logDirectory, + timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime)) + currentLogPath = newLogPath.toString + currentLogWriter = new WriteAheadLogWriter(currentLogPath, hadoopConf) + } + currentLogWriter + } + + private def initializeOrRecover(): Unit = synchronized { + val logDirectoryPath = new Path(logDirectory) + val fileSystem = logDirectoryPath.getFileSystem(hadoopConf) + + if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { + val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath }) + pastLogs.clear() + pastLogs ++= logFileInfo + logInfo(s"Recovered ${logFileInfo.size} log files from $logDirectory") + logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}") + } else { + fileSystem.mkdirs(logDirectoryPath, + FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)) + logInfo(s"Created ${logDirectory} for log files") + } + } + + private def resetWriter(): Unit = synchronized { + if (currentLogWriter != null) { + currentLogWriter.close() + currentLogWriter = null + } + } +} + +private[storage] object WriteAheadLogManager { + + case class LogInfo(startTime: Long, endTime: Long, path: String) + + val logFileRegex = """log-(\d+)-(\d+)""".r + + def timeToLogFile(startTime: Long, stopTime: Long): String = { + s"log-$startTime-$stopTime" + } + + def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = { + files.flatMap { file => + logFileRegex.findFirstIn(file.getName()) match { + case Some(logFileRegex(startTimeStr, stopTimeStr)) => + val startTime = startTimeStr.toLong + val stopTime = stopTimeStr.toLong + Some(LogInfo(startTime, stopTime, file.toString)) + case None => + None + } + }.sortBy { _.startTime } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala new file mode 100644 index 0000000000000..3df024834f7a4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io.Closeable +import java.nio.ByteBuffer + +import org.apache.hadoop.conf.Configuration + +private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration) + extends Closeable { + + private val instream = HdfsUtils.getInputStream(path, conf) + private var closed = false + + def read(segment: FileSegment): ByteBuffer = synchronized { + assertOpen() + instream.seek(segment.offset) + val nextLength = instream.readInt() + HdfsUtils.checkState(nextLength == segment.length, + "Expected message length to be " + segment.length + ", " + "but was " + nextLength) + val buffer = new Array[Byte](nextLength) + instream.readFully(buffer) + ByteBuffer.wrap(buffer) + } + + override def close(): Unit = synchronized { + closed = true + instream.close() + } + + private def assertOpen() { + HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.") + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala new file mode 100644 index 0000000000000..5e0dc1d49a89a --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io.{EOFException, Closeable} +import java.nio.ByteBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.Logging + +private[streaming] class WriteAheadLogReader(path: String, conf: Configuration) + extends Iterator[ByteBuffer] with Closeable with Logging { + + private val instream = HdfsUtils.getInputStream(path, conf) + private var closed = false + private var nextItem: Option[ByteBuffer] = None + + override def hasNext: Boolean = synchronized { + if (closed) { + return false + } + + if (nextItem.isDefined) { // handle the case where hasNext is called without calling next + true + } else { + try { + val length = instream.readInt() + val buffer = new Array[Byte](length) + instream.readFully(buffer) + nextItem = Some(ByteBuffer.wrap(buffer)) + logTrace("Read next item " + nextItem.get) + true + } catch { + case e: EOFException => + logDebug("Error reading next item, EOF reached", e) + close() + false + case e: Exception => + logDebug("Error reading next item, EOF reached", e) + close() + throw e + } + } + } + + override def next(): ByteBuffer = synchronized { + val data = nextItem.getOrElse { + close() + throw new IllegalStateException( + "next called without calling hasNext or after hasNext returned false") + } + nextItem = None // Ensure the next hasNext call loads new data. + data + } + + override def close(): Unit = synchronized { + if (!closed) { + instream.close() + } + closed = true + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala new file mode 100644 index 0000000000000..68a1172d7d282 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io._ +import java.net.URI +import java.nio.ByteBuffer + +import scala.util.Try + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem} +import org.apache.spark.streaming.storage.FileSegment + +private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration) extends Closeable { + private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = { + val uri = new URI(path) + val defaultFs = FileSystem.getDefaultUri(conf).getScheme + val isDefaultLocal = defaultFs == null || defaultFs == "file" + + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { + assert(!new File(uri.getPath).exists) + Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath)))) + } else { + Right(HdfsUtils.getOutputStream(path, conf)) + } + } + + private lazy val hadoopFlushMethod = { + val cls = classOf[FSDataOutputStream] + Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption + } + + private var nextOffset = getPosition() + private var closed = false + + + // Data is always written as: + // - Length - Long + // - Data - of length = Length + def write(data: ByteBuffer): FileSegment = synchronized { + assertOpen() + data.rewind() // Rewind to ensure all data in the buffer is retrieved + val lengthToWrite = data.remaining() + val segment = new FileSegment(path, nextOffset, lengthToWrite) + stream.writeInt(lengthToWrite) + if (data.hasArray) { + stream.write(data.array()) + } else { + // If the buffer is not backed by an array we need to write the data byte by byte + while (data.hasRemaining) { + stream.write(data.get()) + } + } + flush() + nextOffset = getPosition() + segment + } + + override private[streaming] def close(): Unit = synchronized { + closed = true + stream.close() + } + + private def stream(): DataOutputStream = { + underlyingStream.fold(x => x, x => x) + } + + private def getPosition(): Long = { + underlyingStream match { + case Left(localStream) => localStream.size + case Right(dfsStream) => dfsStream.getPos() + } + } + + private def flush() { + underlyingStream match { + case Left(localStream) => localStream.flush + case Right(dfsStream) => hadoopFlushMethod.foreach { _.invoke(dfsStream) } + } + } + + private def assertOpen() { + HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.") + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala new file mode 100644 index 0000000000000..88b2b5095ceb6 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.storage + +import java.io.{DataInputStream, FileInputStream, File, RandomAccessFile} +import java.nio.ByteBuffer + +import scala.util.Random + +import scala.collection.mutable.ArrayBuffer +import scala.language.implicitConversions + +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.apache.commons.io.FileUtils +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.util.Utils +import WriteAheadLogSuite._ + +class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { + + val hadoopConf = new Configuration() + var tempDirectory: File = null + + before { + tempDirectory = Files.createTempDir() + } + + after { + if (tempDirectory != null && tempDirectory.exists()) { + FileUtils.deleteDirectory(tempDirectory) + tempDirectory = null + } + } + + test("WriteAheadLogWriter - writing data") { + val file = new File(tempDirectory, Random.nextString(10)) + val dataToWrite = generateRandomData() + val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf) + val segments = dataToWrite.map(data => writer.write(data)) + writer.close() + val writtenData = readDataManually(file, segments) + assert(writtenData.toArray === dataToWrite.toArray) + } + + test("WriteAheadLogWriter - syncing of data by writing and reading immediately") { + val file = new File(tempDirectory, Random.nextString(10)) + val dataToWrite = generateRandomData() + val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf) + dataToWrite.foreach { data => + val segment = writer.write(data) + assert(readDataManually(file, Seq(segment)).head === data) + } + writer.close() + } + + test("WriteAheadLogReader - sequentially reading data") { + // Write data manually for testing the sequential reader + val file = File.createTempFile("TestSequentialReads", "", tempDirectory) + val writtenData = generateRandomData() + writeDataManually(writtenData, file) + val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf) + val readData = reader.toSeq.map(byteBufferToString) + assert(readData.toList === writtenData.toList) + assert(reader.hasNext === false) + intercept[Exception] { + reader.next() + } + reader.close() + } + + test("WriteAheadLogReader - sequentially reading data written with writer") { + // Write data manually for testing the sequential reader + val file = new File(tempDirectory, "TestWriter") + val dataToWrite = generateRandomData() + writeDataUsingWriter(file, dataToWrite) + val iter = dataToWrite.iterator + val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf) + reader.foreach { byteBuffer => + assert(byteBufferToString(byteBuffer) === iter.next()) + } + reader.close() + } + + test("WriteAheadLogRandomReader - reading data using random reader") { + // Write data manually for testing the random reader + val file = File.createTempFile("TestRandomReads", "", tempDirectory) + val writtenData = generateRandomData() + val segments = writeDataManually(writtenData, file) + + // Get a random order of these segments and read them back + val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten + val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf) + writtenDataAndSegments.foreach { case (data, segment) => + assert(data === byteBufferToString(reader.read(segment))) + } + reader.close() + } + + test("WriteAheadLogRandomReader - reading data using random reader written with writer") { + // Write data using writer for testing the random reader + val file = new File(tempDirectory, "TestRandomReads") + val data = generateRandomData() + val segments = writeDataUsingWriter(file, data) + + // Read a random sequence of segments and verify read data + val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten + val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf) + dataAndSegments.foreach { case(data, segment) => + assert(data === byteBufferToString(reader.read(segment))) + } + reader.close() + } + + test("WriteAheadLogManager - write rotating logs") { + // Write data using manager + val dataToWrite = generateRandomData(10) + writeDataUsingManager(tempDirectory, dataToWrite) + + // Read data manually to verify the written data + val logFiles = getLogFilesInDirectory(tempDirectory) + assert(logFiles.size > 1) + val writtenData = logFiles.flatMap { file => readDataManually(file) } + assert(writtenData.toList === dataToWrite.toList) + } + + test("WriteAheadLogManager - read rotating logs") { + // Write data manually for testing reading through manager + val writtenData = (1 to 10).map { i => + val data = generateRandomData(10) + val file = new File(tempDirectory, s"log-$i-${i + 1}") + writeDataManually(data, file) + data + }.flatten + + // Read data using manager and verify + val readData = readDataUsingManager(tempDirectory) + assert(readData.toList === writtenData.toList) + } + + test("WriteAheadLogManager - recover past logs when creating new manager") { + // Write data with manager, recover with new manager and verify + val dataToWrite = generateRandomData(100) + writeDataUsingManager(tempDirectory, dataToWrite) + val logFiles = getLogFilesInDirectory(tempDirectory) + assert(logFiles.size > 1) + val readData = readDataUsingManager(tempDirectory) + assert(dataToWrite.toList === readData.toList) + } + + // TODO (Hari, TD): Test different failure conditions of writers and readers. + // - Failure in the middle of write + // - Failure while reading incomplete/corrupt file +} + +object WriteAheadLogSuite { + + private val hadoopConf = new Configuration() + + /** + * Write data to the file and returns the an array of the bytes written. + * This is used to test the WAL reader independently of the WAL writer. + */ + def writeDataManually(data: Seq[String], file: File): Seq[FileSegment] = { + val segments = new ArrayBuffer[FileSegment]() + val writer = new RandomAccessFile(file, "rw") + data.foreach { item => + val offset = writer.getFilePointer() + val bytes = Utils.serialize(item) + writer.writeInt(bytes.size) + writer.write(bytes) + segments += FileSegment(file.toString, offset, bytes.size) + } + writer.close() + segments + } + + def writeDataUsingWriter(file: File, data: Seq[String]): Seq[FileSegment] = { + val writer = new WriteAheadLogWriter(file.toString, hadoopConf) + val segments = data.map { + item => writer.write(item) + } + writer.close() + segments + } + + def writeDataUsingManager(logDirectory: File, data: Seq[String]) { + val fakeClock = new ManualClock + val manager = new WriteAheadLogManager(logDirectory.toString, hadoopConf, + rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) + data.foreach { item => + fakeClock.addToTime(500) + manager.writeToLog(item) + } + manager.stop() + } + + /** + * Read data from the given segments of log file and returns the read list of byte buffers. + * This is used to test the WAL writer independently of the WAL reader. + */ + def readDataManually(file: File, segments: Seq[FileSegment]): Seq[String] = { + val reader = new RandomAccessFile(file, "r") + segments.map { x => + reader.seek(x.offset) + val data = new Array[Byte](x.length) + reader.readInt() + reader.readFully(data) + Utils.deserialize[String](data) + } + } + + def readDataManually(file: File): Seq[String] = { + val reader = new DataInputStream(new FileInputStream(file)) + val buffer = new ArrayBuffer[String] + try { + while (reader.available > 0) { + val length = reader.readInt() + val bytes = new Array[Byte](length) + reader.read(bytes) + buffer += Utils.deserialize[String](bytes) + } + } finally { + reader.close() + } + buffer + } + + def readDataUsingManager(logDirectory: File): Seq[String] = { + val manager = new WriteAheadLogManager(logDirectory.toString, hadoopConf, + callerName = "WriteAheadLogSuite") + val data = manager.readFromLog().map(byteBufferToString).toSeq + manager.stop() + data + } + + def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = { + (1 to numItems).map { _.toString } + } + + def getLogFilesInDirectory(directory: File): Seq[File] = { + if (directory.exists) { + directory.listFiles().filter(_.getName().startsWith("log-")) + .sortBy(_.getName.split("-")(1).toLong) + } else { + Seq.empty + } + } + + def printData(data: Seq[String]) { + println("# items in data = " + data.size) + println(data.mkString("\n")) + } + + implicit def stringToByteBuffer(str: String): ByteBuffer = { + ByteBuffer.wrap(Utils.serialize(str)) + } + + implicit def byteBufferToString(byteBuffer: ByteBuffer): String = { + Utils.deserialize[String](byteBuffer.array) + } +}