Skip to content

Commit

Permalink
address matei's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
haoyuan committed Apr 2, 2014
1 parent 91fa09d commit 3dcace4
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object ExecutorExitCode {
/** DiskStore failed to create a local temporary directory after many attempts. */
val DISK_STORE_FAILED_TO_CREATE_DIR = 53

/** TachyonStore failed to create a local temporary directory after many attempts. */
/** TachyonStore failed to initialize after many attempts. */
val TACHYON_STORE_FAILED_TO_INITIALIZE = 54

/** TachyonStore failed to create a local temporary directory after many attempts. */
Expand All @@ -54,6 +54,9 @@ object ExecutorExitCode {
case OOM => "OutOfMemoryError"
case DISK_STORE_FAILED_TO_CREATE_DIR =>
"Failed to create local directory (bad spark.local.dir?)"
case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize."
case TACHYON_STORE_FAILED_TO_CREATE_DIR =>
"TachyonStore failed to create a local temporary directory."
case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
if (exitCode > 128) {
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[spark] class BlockManager(
val storeDir = conf.get("spark.tachyonStore.baseDir", System.getProperty("java.io.tmpdir"))
val appFolderName = conf.get("spark.tachyonStore.folderName")
val tachyonStorePath = s"${storeDir}/${appFolderName}/${this.executorId}"
val tachyonMaster = conf.get("spark.tachyonStore.URL", "tachyon://localhost:19998")
val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998")
val tachyonBlockManager = new TachyonBlockManager(
shuffleBlockManager, tachyonStorePath, tachyonMaster)
tachyonInitialized = true
Expand Down Expand Up @@ -276,7 +276,7 @@ private[spark] class BlockManager(
(StorageLevel.NONE, 0L, 0L, 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val inTachyon = level.useTachyon && tachyonStore.contains(blockId)
val inTachyon = level.useOffHeap && tachyonStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
val replication = if (inMem || inTachyon || onDisk) level.replication else 1
Expand Down Expand Up @@ -369,7 +369,7 @@ private[spark] class BlockManager(
}

// Look for the block in Tachyon
if (level.useTachyon) {
if (level.useOffHeap) {
logDebug("Getting block " + blockId + " from tachyon")
if (tachyonStore.contains(blockId)) {
tachyonStore.getBytes(blockId) match {
Expand Down Expand Up @@ -651,7 +651,7 @@ private[spark] class BlockManager(
}
// Keep track of which blocks are dropped from memory
res.droppedBlocks.foreach { block => updatedBlocks += block }
} else if (level.useTachyon) {
} else if (level.useOffHeap) {
// Save to Tachyon.
val askForBytes = level.replication > 1
val res = data match {
Expand Down Expand Up @@ -757,7 +757,7 @@ private[spark] class BlockManager(
var cachedPeers: Seq[BlockManagerId] = null
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) {
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useTachyon, level.deserialized, 1)
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
if (cachedPeers == null) {
cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
}
Expand Down Expand Up @@ -922,7 +922,7 @@ private[spark] class BlockManager(
if (level.useDisk) {
diskStore.remove(id)
}
if (level.useTachyon) {
if (level.useOffHeap) {
tachyonStore.remove(id)
}
iterator.remove()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ private[spark] class BlockManagerInfo(
logInfo("Added %s on disk on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
if (storageLevel.useTachyon) {
if (storageLevel.useOffHeap) {
_blocks.put(blockId, BlockStatus(storageLevel, 0, 0, tachyonSize))
logInfo("Added %s on tachyon on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(tachyonSize)))
Expand All @@ -394,7 +394,7 @@ private[spark] class BlockManagerInfo(
logInfo("Removed %s on %s on disk (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
}
if (blockStatus.storageLevel.useTachyon) {
if (blockStatus.storageLevel.useOffHeap) {
logInfo("Removed %s on %s on tachyon (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize)))
}
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
class StorageLevel private(
private var useDisk_ : Boolean,
private var useMemory_ : Boolean,
private var useTachyon_ : Boolean,
private var useOffHeap_ : Boolean,
private var deserialized_ : Boolean,
private var replication_ : Int = 1)
extends Externalizable {
Expand All @@ -45,27 +45,27 @@ class StorageLevel private(

def useDisk = useDisk_
def useMemory = useMemory_
def useTachyon = useTachyon_
def useOffHeap = useOffHeap_
def deserialized = deserialized_
def replication = replication_

assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes")

override def clone(): StorageLevel = new StorageLevel(
this.useDisk, this.useMemory, this.useTachyon, this.deserialized, this.replication)
this.useDisk, this.useMemory, this.useOffHeap, this.deserialized, this.replication)

override def equals(other: Any): Boolean = other match {
case s: StorageLevel =>
s.useDisk == useDisk &&
s.useMemory == useMemory &&
s.useTachyon == useTachyon &&
s.useOffHeap == useOffHeap &&
s.deserialized == deserialized &&
s.replication == replication
case _ =>
false
}

def isValid = ((useMemory || useDisk || useTachyon) && (replication > 0))
def isValid = ((useMemory || useDisk || useOffHeap) && (replication > 0))

def toInt: Int = {
var ret = 0
Expand All @@ -75,7 +75,7 @@ class StorageLevel private(
if (useMemory_) {
ret |= 4
}
if (useTachyon_) {
if (useOffHeap_) {
ret |= 2
}
if (deserialized_) {
Expand All @@ -93,7 +93,7 @@ class StorageLevel private(
val flags = in.readByte()
useDisk_ = (flags & 8) != 0
useMemory_ = (flags & 4) != 0
useTachyon_ = (flags & 2) != 0
useOffHeap_ = (flags & 2) != 0
deserialized_ = (flags & 1) != 0
replication_ = in.readByte()
}
Expand All @@ -102,14 +102,14 @@ class StorageLevel private(
private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)

override def toString: String = "StorageLevel(%b, %b, %b, %b, %d)".format(
useDisk, useMemory, useTachyon, deserialized, replication)
useDisk, useMemory, useOffHeap, deserialized, replication)

override def hashCode(): Int = toInt * 41 + replication
def description : String = {
var result = ""
result += (if (useDisk) "Disk " else "")
result += (if (useMemory) "Memory " else "")
result += (if (useTachyon) "Tachyon " else "")
result += (if (useOffHeap) "Tachyon " else "")
result += (if (deserialized) "Deserialized " else "Serialized ")
result += "%sx Replicated".format(replication)
result
Expand All @@ -135,10 +135,10 @@ object StorageLevel {
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)

/** Create a new StorageLevel object without setting useTachyon*/
def apply(useDisk: Boolean, useMemory: Boolean, useTachyon: Boolean,
/** Create a new StorageLevel object without setting useOffHeap*/
def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean,
deserialized: Boolean, replication: Int) = getCachedStorageLevel(
new StorageLevel(useDisk, useMemory, useTachyon, deserialized, replication))
new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication))

/** Create a new StorageLevel object */
def apply(useDisk: Boolean, useMemory: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ private[spark] object JsonProtocol {
def storageLevelToJson(storageLevel: StorageLevel): JValue = {
("Use Disk" -> storageLevel.useDisk) ~
("Use Memory" -> storageLevel.useMemory) ~
("Use Tachyon" -> storageLevel.useTachyon) ~
("Use Tachyon" -> storageLevel.useOffHeap) ~
("Deserialized" -> storageLevel.deserialized) ~
("Replication" -> storageLevel.replication)
}
Expand Down
4 changes: 2 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.tachyonStore.baseDir</td>
<td>System.getProperty("java.io.tmpdir")</td>
<td>
Directories of the Tachyon File System that store RDDs. The Tachyon file system's URL is set by <code>spark.tachyonStore.URL</code>.
Directories of the Tachyon File System that store RDDs. The Tachyon file system's URL is set by <code>spark.tachyonStore.url</code>.
It can also be a comma-separated list of multiple directories on Tachyon file system.
</td>
</tr>
<tr>
<td>spark.tachyonStore.URL</td>
<td>spark.tachyonStore.url</td>
<td>tachyon://localhost:19998</td>
<td>
The URL of the underlying Tachyon file system in the TachyonStore.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import scala.math.random
import org.apache.spark._
import org.apache.spark.storage.StorageLevel

/** Computes an approximation to pi
/**
* Computes an approximation to pi
* This example uses Tachyon to persist rdds during computation.
*/
object SparkTachyonPi {
Expand All @@ -44,16 +45,7 @@ object SparkTachyonPi {
val y = random * 2 - 1
if (x * x + y * y < 1) 1 else 0
}.reduce(_ + _)
println("1- Pi is roughly " + 4.0 * count / n)

val rdd2 = spark.parallelize(1 to n, slices)
rdd2.persist(StorageLevel.OFF_HEAP)
val count2 = rdd2.map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x * x + y * y < 1) 1 else 0
}.reduce(_ + _)
println("2- Pi is roughly " + 4.0 * count2 / n)
println("Pi is roughly " + 4.0 * count / n)

spark.stop()
}
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/storagelevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, useDisk, useMemory, useTachyon, deserialized, replication = 1

def __repr__(self):
return "StorageLevel(%s, %s, %s, %s, %s)" % (
self.useDisk, self.useMemory, self.useTachyon, self.deserialized, self.replication)
self.useDisk, self.useMemory, self.useOffHeap, self.deserialized, self.replication)

StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
Expand All @@ -46,4 +46,4 @@ def __repr__(self):
StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, True, 2)
StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False)
StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
StorageLevel.TACHYON = StorageLevel(False, False, True, False, 1)
StorageLevel.OFF_HEAP = StorageLevel(False, False, True, False, 1)

0 comments on commit 3dcace4

Please sign in to comment.