Skip to content

Commit

Permalink
various minor fixes and clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
haoyuan committed Mar 22, 2014
1 parent e3ddbba commit 8859371
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 60 deletions.
39 changes: 14 additions & 25 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,16 @@ private[spark] class BlockManager(

private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore = new DiskStore(this, diskBlockManager)

var tachyonInitialized = false
private[storage] var innerTachyonStore: TachyonStore = null
private[storage] lazy val tachyonStore : TachyonStore = {
val storeDir = conf.get("spark.tachyonstore.dir", System.getProperty("java.io.tmpdir"))
val tachyonStorePath = s"${storeDir}/${appId}/${this.executorId}"
val tachyonMaster = conf.get("spark.tachyonmaster.address", "localhost:19998")
val tachyonBlockManager = new TachyonBlockManager(
shuffleBlockManager, tachyonStorePath, tachyonMaster)
tachyonInitialized = true
new TachyonStore(this, tachyonBlockManager)
}

// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
private val nettyPort: Int = {
Expand Down Expand Up @@ -101,23 +108,6 @@ private[spark] class BlockManager(
var asyncReregisterTask: Future[Unit] = null
val asyncReregisterLock = new Object

private[storage] lazy val tachyonStore : TachyonStore = {
if (!tachyonInitialized) {
initializeTachyonStore()
}
this.innerTachyonStore
}

private def initializeTachyonStore() {
val storeDir = conf.get("spark.tachyonstore.dir", System.getProperty("java.io.tmpdir"))
val tachyonStorePath = s"${storeDir}/${appId}/${this.executorId}"
val tachyonMaster = conf.get("spark.tachyonmaster.address", "localhost:19998")
val tachyonBlockManager = new TachyonBlockManager(
shuffleBlockManager, tachyonStorePath, tachyonMaster)
this.innerTachyonStore = new TachyonStore(this, tachyonBlockManager)
this.tachyonInitialized = true
}

private def heartBeat() {
if (!master.sendHeartBeat(blockManagerId)) {
reregister()
Expand Down Expand Up @@ -636,7 +626,7 @@ private[spark] class BlockManager(
case Right(newBytes) => bytesAfterPut = newBytes
case _ =>
}
}else {
} else {
// Save directly to disk.
// Don't get back the bytes unless we replicate them.
val askForBytes = level.replication > 1
Expand All @@ -658,7 +648,7 @@ private[spark] class BlockManager(
}
}

// Now that the block is in either the memory, tachyon or disk store,
// Now that the block is in either the memory, tachyon, or disk store,
// let other threads read it, and tell the master about it.
marked = true
myInfo.markReady(size)
Expand Down Expand Up @@ -822,11 +812,10 @@ private[spark] class BlockManager(
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
val removedFromTachyon =
if (tachyonInitialized == true) tachyonStore.remove(blockId) else false
val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
logWarning("Block " + blockId + " could not be removed as it was not found in either " +
"the disk, memory or tachyon store")
"the disk, memory, or tachyon store")
}
blockInfo.remove(blockId)
if (tellMaster && info.tellMaster) {
Expand Down Expand Up @@ -939,7 +928,7 @@ private[spark] class BlockManager(
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
if(tachyonInitialized == true) {
if(tachyonInitialized) {
tachyonStore.clear()
}
metadataCleaner.cancel()
Expand Down
19 changes: 6 additions & 13 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,10 @@ case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
extends Ordered[RDDInfo] {
override def toString = {
import Utils.bytesToString
("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s;" +
"TachyonSize: %s; DiskSize: %s").format(
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))

("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s;" +
"TachyonSize: %s; DiskSize: %s").format(
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
}

override def compare(that: RDDInfo) = {
Expand Down Expand Up @@ -96,14 +95,8 @@ object StorageUtils {
sc.persistentRdds.get(rddId).map { r =>
val rddName = Option(r.name).getOrElse(rddId.toString)
val rddStorageLevel = r.getStorageLevel
RDDInfo(rddId,
rddName,
rddStorageLevel,
rddBlocks.length,
r.partitions.size,
memSize,
tachyonSize,
diskSize)
RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size, memSize,
tachyonSize, diskSize)
}
}.flatten.toArray

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.storage

import java.io.File
import java.text.SimpleDateFormat
import java.util.{Date, Random}

Expand Down Expand Up @@ -47,7 +46,7 @@ private[spark] class TachyonBlockManager(
val client = if (master != null && master != "") TachyonFS.get(master) else null
if (client == null) {
logError("Failed to connect to the Tachyon as the master address is not configured")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}

private val MAX_DIR_CREATION_ATTEMPTS = 10
Expand Down Expand Up @@ -136,8 +135,8 @@ private[spark] class TachyonBlockManager(
}
}
if (!foundLocalDir) {
logError("Failed " + MAX_DIR_CREATION_ATTEMPTS +
" attempts to create tachyon dir in " + rootDir)
logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " +
rootDir)
System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR)
}
logInfo("Created tachyon directory at " + tachyonDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@

package org.apache.spark.storage

import java.io.File
import tachyon.client.TachyonFile

/**
* References a particular segment of a file (potentially the entire file),
* based off an offset and a length.
* References a particular segment of a file (potentially the entire file), based off an offset and
* a length.
*/

private[spark] class TachyonFileSegment(val file: TachyonFile, val offset: Long, val length: Long)
{
private[spark] class TachyonFileSegment(val file: TachyonFile, val offset: Long, val length: Long) {
override def toString = "(name=%s, offset=%d, length=%d)".format(file.getPath(), offset, length)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,21 @@

package org.apache.spark.storage

import java.io.RandomAccessFile
import java.nio.ByteBuffer
import java.util.LinkedHashMap
import java.io.FileOutputStream
import java.nio.channels.FileChannel.MapMode

import scala.collection.mutable.ArrayBuffer

import tachyon.client.OutStream
import tachyon.client.WriteType
import tachyon.client.ReadType
import tachyon.client.InStream

import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.spark.serializer.Serializer


private class Entry(val size: Long)


/**
* Stores BlockManager blocks on Tachyon.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(master.getLocations("a1").size === 0, "master did not remove a1")
assert(master.getLocations("a2").size === 0, "master did not remove a2")
}

test("master + 2 managers interaction") {
store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf, securityMgr)
store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer(conf), 2000, conf,
securityMgr)

val peers = master.getPeers(store.blockManagerId, 1)
assert(peers.size === 1, "master did not return the other manager as a peer")
assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager")
Expand Down Expand Up @@ -407,7 +407,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store")
assert(store.memoryStore.contains(rdd(0, 3)), "rdd_0_3 was not in store")
}

test("tachyon storage") {
val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false)
if (tachyonUnitTestEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.spark.examples

import scala.math.random

import org.apache.spark._
import SparkContext._

/** Computes an approximation to pi */
object SparkPi {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.spark.examples

import scala.math.random

import org.apache.spark._
import SparkContext._
import org.apache.spark.storage.StorageLevel

/** Computes an approximation to pi */
Expand Down
2 changes: 1 addition & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ object SparkBuild extends Build {
"com.codahale.metrics" % "metrics-graphite" % "3.0.0",
"com.twitter" %% "chill" % "0.3.1" excludeAll(excludeAsm),
"com.twitter" % "chill-java" % "0.3.1" excludeAll(excludeAsm),
"org.tachyonproject" % "tachyon" % "0.4.0" excludeAll(excludeKyro, excludeHadoop, excludeCurator, excludeJackson, excludeNetty, excludeAsm),
"org.tachyonproject" % "tachyon" % "0.4.1" excludeAll(excludeKyro, excludeHadoop, excludeCurator, excludeJackson, excludeNetty, excludeAsm),
"com.clearspring.analytics" % "stream" % "2.5.1"
),
libraryDependencies ++= maybeAvro
Expand Down

0 comments on commit 8859371

Please sign in to comment.