Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into spark-1403
Browse files Browse the repository at this point in the history
  • Loading branch information
Bharath Bhushan committed Apr 5, 2014
2 parents 044027d + 7c18428 commit 728beca
Show file tree
Hide file tree
Showing 41 changed files with 1,048 additions and 227 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ work
.*\.q
golden
test.out/*
.*iml
47 changes: 47 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,53 @@
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon</artifactId>
<version>0.4.1-thrift</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-jsp</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
</exclusion>
<exclusion>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
Expand Down
46 changes: 33 additions & 13 deletions core/src/main/java/org/apache/spark/api/java/StorageLevels.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@
* Expose some commonly useful storage level constants.
*/
public class StorageLevels {
public static final StorageLevel NONE = create(false, false, false, 1);
public static final StorageLevel DISK_ONLY = create(true, false, false, 1);
public static final StorageLevel DISK_ONLY_2 = create(true, false, false, 2);
public static final StorageLevel MEMORY_ONLY = create(false, true, true, 1);
public static final StorageLevel MEMORY_ONLY_2 = create(false, true, true, 2);
public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, 1);
public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, 2);
public static final StorageLevel MEMORY_AND_DISK = create(true, true, true, 1);
public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, true, 2);
public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, 1);
public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, 2);
public static final StorageLevel NONE = create(false, false, false, false, 1);
public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1);
public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2);
public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1);
public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2);
public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1);
public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, false, 2);
public static final StorageLevel MEMORY_AND_DISK = create(true, true, false, true, 1);
public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, false, true, 2);
public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, false, 1);
public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, false, 2);
public static final StorageLevel OFF_HEAP = create(false, false, true, false, 1);

/**
* Create a new StorageLevel object.
Expand All @@ -42,7 +43,26 @@ public class StorageLevels {
* @param deserialized saved as deserialized objects, if true
* @param replication replication factor
*/
public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) {
return StorageLevel.apply(useDisk, useMemory, deserialized, replication);
@Deprecated
public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized,
int replication) {
return StorageLevel.apply(useDisk, useMemory, false, deserialized, replication);
}

/**
* Create a new StorageLevel object.
* @param useDisk saved to disk, if true
* @param useMemory saved to memory, if true
* @param useOffHeap saved to Tachyon, if true
* @param deserialized saved as deserialized objects, if true
* @param replication replication factor
*/
public static StorageLevel create(
boolean useDisk,
boolean useMemory,
boolean useOffHeap,
boolean deserialized,
int replication) {
return StorageLevel.apply(useDisk, useMemory, useOffHeap, deserialized, replication);
}
}
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package org.apache.spark

import java.io._
import java.net.URI
import java.util.{Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger

import java.util.{Properties, UUID}
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
import scala.collection.generic.Growable
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.reflect.{ClassTag, classTag}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
Expand Down Expand Up @@ -130,6 +129,11 @@ class SparkContext(
val master = conf.get("spark.master")
val appName = conf.get("spark.app.name")

// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
val tachyonFolderName = "spark-" + randomUUID.toString()
conf.set("spark.tachyonStore.folderName", tachyonFolderName)

val isLocal = (master == "local" || master.startsWith("local["))

if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,
false)

case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
Expand Down Expand Up @@ -105,7 +106,8 @@ private[spark] object CoarseGrainedExecutorBackend {
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
sparkHostPort, cores),
name = "Executor")
workerUrl.foreach{ url =>
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,22 @@ object ExecutorExitCode {
/** DiskStore failed to create a local temporary directory after many attempts. */
val DISK_STORE_FAILED_TO_CREATE_DIR = 53

/** TachyonStore failed to initialize after many attempts. */
val TACHYON_STORE_FAILED_TO_INITIALIZE = 54

/** TachyonStore failed to create a local temporary directory after many attempts. */
val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55

def explainExitCode(exitCode: Int): String = {
exitCode match {
case UNCAUGHT_EXCEPTION => "Uncaught exception"
case UNCAUGHT_EXCEPTION_TWICE => "Uncaught exception, and logging the exception failed"
case OOM => "OutOfMemoryError"
case DISK_STORE_FAILED_TO_CREATE_DIR =>
"Failed to create local directory (bad spark.local.dir?)"
case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize."
case TACHYON_STORE_FAILED_TO_CREATE_DIR =>
"TachyonStore failed to create a local temporary directory."
case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
if (exitCode > 128) {
Expand Down
86 changes: 71 additions & 15 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,20 @@ package org.apache.spark.storage

import java.io.{File, InputStream, OutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.Random

import akka.actor.{ActorSystem, Cancellable, Props}
import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
import sun.nio.ch.DirectBuffer

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
import org.apache.spark.util._


sealed trait Values

case class ByteBufferValues(buffer: ByteBuffer) extends Values
Expand All @@ -59,6 +57,17 @@ private[spark] class BlockManager(

private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore = new DiskStore(this, diskBlockManager)
var tachyonInitialized = false
private[storage] lazy val tachyonStore: TachyonStore = {
val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
val appFolderName = conf.get("spark.tachyonStore.folderName")
val tachyonStorePath = s"${storeDir}/${appFolderName}/${this.executorId}"
val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998")
val tachyonBlockManager = new TachyonBlockManager(
shuffleBlockManager, tachyonStorePath, tachyonMaster)
tachyonInitialized = true
new TachyonStore(this, tachyonBlockManager)
}

// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
private val nettyPort: Int = {
Expand Down Expand Up @@ -248,8 +257,10 @@ private[spark] class BlockManager(
if (info.tellMaster) {
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
val inTachyonSize = status.tachyonSize
val onDiskSize = status.diskSize
master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
master.updateBlockInfo(
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize)
} else true
}

Expand All @@ -259,22 +270,24 @@ private[spark] class BlockManager(
* and the updated in-memory and on-disk sizes.
*/
private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {
val (newLevel, inMemSize, onDiskSize) = info.synchronized {
val (newLevel, inMemSize, onDiskSize, inTachyonSize) = info.synchronized {
info.level match {
case null =>
(StorageLevel.NONE, 0L, 0L)
(StorageLevel.NONE, 0L, 0L, 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val inTachyon = level.useOffHeap && tachyonStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
val replication = if (inMem || onDisk) level.replication else 1
val storageLevel = StorageLevel(onDisk, inMem, deserialized, replication)
val replication = if (inMem || inTachyon || onDisk) level.replication else 1
val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
(storageLevel, memSize, diskSize)
(storageLevel, memSize, diskSize, tachyonSize)
}
}
BlockStatus(newLevel, inMemSize, onDiskSize)
BlockStatus(newLevel, inMemSize, onDiskSize, inTachyonSize)
}

/**
Expand Down Expand Up @@ -354,6 +367,24 @@ private[spark] class BlockManager(
logDebug("Block " + blockId + " not found in memory")
}
}

// Look for the block in Tachyon
if (level.useOffHeap) {
logDebug("Getting block " + blockId + " from tachyon")
if (tachyonStore.contains(blockId)) {
tachyonStore.getBytes(blockId) match {
case Some(bytes) => {
if (!asValues) {
return Some(bytes)
} else {
return Some(dataDeserialize(blockId, bytes))
}
}
case None =>
logDebug("Block " + blockId + " not found in tachyon")
}
}
}

// Look for block on disk, potentially storing it back into memory if required:
if (level.useDisk) {
Expand Down Expand Up @@ -620,6 +651,23 @@ private[spark] class BlockManager(
}
// Keep track of which blocks are dropped from memory
res.droppedBlocks.foreach { block => updatedBlocks += block }
} else if (level.useOffHeap) {
// Save to Tachyon.
val res = data match {
case IteratorValues(iterator) =>
tachyonStore.putValues(blockId, iterator, level, false)
case ArrayBufferValues(array) =>
tachyonStore.putValues(blockId, array, level, false)
case ByteBufferValues(bytes) => {
bytes.rewind();
tachyonStore.putBytes(blockId, bytes, level)
}
}
size = res.size
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case _ =>
}
} else {
// Save directly to disk.
// Don't get back the bytes unless we replicate them.
Expand All @@ -644,8 +692,8 @@ private[spark] class BlockManager(

val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
// Now that the block is in either the memory or disk store, let other threads read it,
// and tell the master about it.
// Now that the block is in either the memory, tachyon, or disk store,
// let other threads read it, and tell the master about it.
marked = true
putBlockInfo.markReady(size)
if (tellMaster) {
Expand Down Expand Up @@ -707,7 +755,8 @@ private[spark] class BlockManager(
*/
var cachedPeers: Seq[BlockManagerId] = null
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) {
val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
if (cachedPeers == null) {
cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
}
Expand Down Expand Up @@ -832,9 +881,10 @@ private[spark] class BlockManager(
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
if (!removedFromMemory && !removedFromDisk) {
val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
logWarning("Block " + blockId + " could not be removed as it was not found in either " +
"the disk or memory store")
"the disk, memory, or tachyon store")
}
blockInfo.remove(blockId)
if (tellMaster && info.tellMaster) {
Expand Down Expand Up @@ -871,6 +921,9 @@ private[spark] class BlockManager(
if (level.useDisk) {
diskStore.remove(id)
}
if (level.useOffHeap) {
tachyonStore.remove(id)
}
iterator.remove()
logInfo("Dropped block " + id)
}
Expand Down Expand Up @@ -946,6 +999,9 @@ private[spark] class BlockManager(
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
if (tachyonInitialized) {
tachyonStore.clear()
}
metadataCleaner.cancel()
broadcastCleaner.cancel()
logInfo("BlockManager stopped")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
diskSize: Long,
tachyonSize: Long): Boolean = {
val res = askDriverWithReply[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
logInfo("Updated info of block " + blockId)
res
}
Expand Down
Loading

0 comments on commit 728beca

Please sign in to comment.