Skip to content

Commit

Permalink
Pulled WriteAheadLog-related stuff from tdas/spark/tree/driver-ha-wor…
Browse files Browse the repository at this point in the history
…king
  • Loading branch information
tdas committed Oct 21, 2014
1 parent 342b57d commit 172358d
Show file tree
Hide file tree
Showing 7 changed files with 770 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

private[streaming] case class FileSegment (path: String, offset: Long, length: Int)
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}

private[streaming] object HdfsUtils {

def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that

val dfsPath = new Path(path)
val dfs =
this.synchronized {
dfsPath.getFileSystem(conf)
}
// If the file exists and we have append support, append instead of creating a new file
val stream: FSDataOutputStream = {
if (dfs.isFile(dfsPath)) {
if (conf.getBoolean("hdfs.append.support", false)) {
dfs.append(dfsPath)
} else {
throw new IllegalStateException("File exists and there is no append support!")
}
} else {
dfs.create(dfsPath)
}
}
stream
}

def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
val dfsPath = new Path(path)
val dfs = this.synchronized {
dfsPath.getFileSystem(conf)
}
val instream = dfs.open(dfsPath)
instream
}

def checkState(state: Boolean, errorMsg: => String) {
if(!state) {
throw new IllegalStateException(errorMsg)
}
}

def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
val dfsPath = new Path(path)
val dfs =
this.synchronized {
dfsPath.getFileSystem(conf)
}
val fileStatus = dfs.getFileStatus(dfsPath)
val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
blockLocs.map(_.flatMap(_.getHosts))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package org.apache.spark.streaming.storage

import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, Future}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.spark.Logging
import org.apache.spark.streaming.storage.WriteAheadLogManager._
import org.apache.spark.streaming.util.{Clock, SystemClock}
import org.apache.spark.util.Utils

private[streaming] class WriteAheadLogManager(
logDirectory: String,
hadoopConf: Configuration,
rollingIntervalSecs: Int = 60,
maxFailures: Int = 3,
callerName: String = "",
clock: Clock = new SystemClock
) extends Logging {

private val pastLogs = new ArrayBuffer[LogInfo]
private val callerNameTag =
if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
implicit private val executionContext = ExecutionContext.fromExecutorService(
Utils.newDaemonFixedThreadPool(1, threadpoolName))
override protected val logName = s"WriteAheadLogManager $callerNameTag"

private var currentLogPath: String = null
private var currentLogWriter: WriteAheadLogWriter = null
private var currentLogWriterStartTime: Long = -1L
private var currentLogWriterStopTime: Long = -1L

initializeOrRecover()

def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
var fileSegment: FileSegment = null
var failures = 0
var lastException: Exception = null
var succeeded = false
while (!succeeded && failures < maxFailures) {
try {
fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
succeeded = true
} catch {
case ex: Exception =>
lastException = ex
logWarning("Failed to ...")
resetWriter()
failures += 1
}
}
if (fileSegment == null) {
throw lastException
}
fileSegment
}

def readFromLog(): Iterator[ByteBuffer] = synchronized {
val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
logFilesToRead.iterator.map { file =>
logDebug(s"Creating log reader with $file")
new WriteAheadLogReader(file, hadoopConf)
} flatMap { x => x }
}

/**
* Delete the log files that are older than the threshold time.
*
* Its important to note that the threshold time is based on the time stamps used in the log
* files, and is therefore based on the local system time. So if there is coordination necessary
* between the node calculating the threshTime (say, driver node), and the local system time
* (say, worker node), the caller has to take account of possible time skew.
*/
def cleanupOldLogs(threshTime: Long): Unit = {
val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")

def deleteFiles() {
oldLogFiles.foreach { logInfo =>
try {
val path = new Path(logInfo.path)
val fs = hadoopConf.synchronized { path.getFileSystem(hadoopConf) }
fs.delete(path, true)
synchronized { pastLogs -= logInfo }
logDebug(s"Cleared log file $logInfo")
} catch {
case ex: Exception =>
logWarning(s"Error clearing log file $logInfo", ex)
}
}
logInfo(s"Cleared log files in $logDirectory older than $threshTime")
}
if (!executionContext.isShutdown) {
Future { deleteFiles() }
}
}

def stop(): Unit = synchronized {
if (currentLogWriter != null) {
currentLogWriter.close()
}
executionContext.shutdown()
logInfo("Stopped log manager")
}

private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
resetWriter()
if (currentLogPath != null) {
pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, currentLogPath)
}
currentLogWriterStartTime = currentTime
currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
val newLogPath = new Path(logDirectory,
timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
currentLogPath = newLogPath.toString
currentLogWriter = new WriteAheadLogWriter(currentLogPath, hadoopConf)
}
currentLogWriter
}

private def initializeOrRecover(): Unit = synchronized {
val logDirectoryPath = new Path(logDirectory)
val fileSystem = logDirectoryPath.getFileSystem(hadoopConf)

if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
pastLogs.clear()
pastLogs ++= logFileInfo
logInfo(s"Recovered ${logFileInfo.size} log files from $logDirectory")
logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
} else {
fileSystem.mkdirs(logDirectoryPath,
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort))
logInfo(s"Created ${logDirectory} for log files")
}
}

private def resetWriter(): Unit = synchronized {
if (currentLogWriter != null) {
currentLogWriter.close()
currentLogWriter = null
}
}
}

private[storage] object WriteAheadLogManager {

case class LogInfo(startTime: Long, endTime: Long, path: String)

val logFileRegex = """log-(\d+)-(\d+)""".r

def timeToLogFile(startTime: Long, stopTime: Long): String = {
s"log-$startTime-$stopTime"
}

def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = {
files.flatMap { file =>
logFileRegex.findFirstIn(file.getName()) match {
case Some(logFileRegex(startTimeStr, stopTimeStr)) =>
val startTime = startTimeStr.toLong
val stopTime = stopTimeStr.toLong
Some(LogInfo(startTime, stopTime, file.toString))
case None =>
None
}
}.sortBy { _.startTime }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

import java.io.Closeable
import java.nio.ByteBuffer

import org.apache.hadoop.conf.Configuration

private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration)
extends Closeable {

private val instream = HdfsUtils.getInputStream(path, conf)
private var closed = false

def read(segment: FileSegment): ByteBuffer = synchronized {
assertOpen()
instream.seek(segment.offset)
val nextLength = instream.readInt()
HdfsUtils.checkState(nextLength == segment.length,
"Expected message length to be " + segment.length + ", " + "but was " + nextLength)
val buffer = new Array[Byte](nextLength)
instream.readFully(buffer)
ByteBuffer.wrap(buffer)
}

override def close(): Unit = synchronized {
closed = true
instream.close()
}

private def assertOpen() {
HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.")
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

import java.io.{EOFException, Closeable}
import java.nio.ByteBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.spark.Logging

private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
extends Iterator[ByteBuffer] with Closeable with Logging {

private val instream = HdfsUtils.getInputStream(path, conf)
private var closed = false
private var nextItem: Option[ByteBuffer] = None

override def hasNext: Boolean = synchronized {
if (closed) {
return false
}

if (nextItem.isDefined) { // handle the case where hasNext is called without calling next
true
} else {
try {
val length = instream.readInt()
val buffer = new Array[Byte](length)
instream.readFully(buffer)
nextItem = Some(ByteBuffer.wrap(buffer))
logTrace("Read next item " + nextItem.get)
true
} catch {
case e: EOFException =>
logDebug("Error reading next item, EOF reached", e)
close()
false
case e: Exception =>
logDebug("Error reading next item, EOF reached", e)
close()
throw e
}
}
}

override def next(): ByteBuffer = synchronized {
val data = nextItem.getOrElse {
close()
throw new IllegalStateException(
"next called without calling hasNext or after hasNext returned false")
}
nextItem = None // Ensure the next hasNext call loads new data.
data
}

override def close(): Unit = synchronized {
if (!closed) {
instream.close()
}
closed = true
}
}
Loading

0 comments on commit 172358d

Please sign in to comment.