diff --git a/.rat-excludes b/.rat-excludes index 17cf6d0ed1cf3..85bfad60fcadc 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -39,3 +39,4 @@ work .*\.q golden test.out/* +.*iml diff --git a/core/pom.xml b/core/pom.xml index e4c32eff0cd77..66f9fc4961b03 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -200,6 +200,53 @@ derby test + + org.tachyonproject + tachyon + 0.4.1-thrift + + + org.apache.hadoop + hadoop-client + + + org.apache.curator + curator-recipes + + + org.eclipse.jetty + jetty-jsp + + + org.eclipse.jetty + jetty-webapp + + + org.eclipse.jetty + jetty-server + + + org.eclipse.jetty + jetty-servlet + + + junit + junit + + + org.powermock + powermock-module-junit4 + + + org.powermock + powermock-api-mockito + + + org.apache.curator + curator-test + + + org.scalatest scalatest_${scala.binary.version} diff --git a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java index 9f13b39909481..840a1bd93bfbb 100644 --- a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java +++ b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java @@ -23,17 +23,18 @@ * Expose some commonly useful storage level constants. */ public class StorageLevels { - public static final StorageLevel NONE = create(false, false, false, 1); - public static final StorageLevel DISK_ONLY = create(true, false, false, 1); - public static final StorageLevel DISK_ONLY_2 = create(true, false, false, 2); - public static final StorageLevel MEMORY_ONLY = create(false, true, true, 1); - public static final StorageLevel MEMORY_ONLY_2 = create(false, true, true, 2); - public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, 1); - public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, 2); - public static final StorageLevel MEMORY_AND_DISK = create(true, true, true, 1); - public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, true, 2); - public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, 1); - public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, 2); + public static final StorageLevel NONE = create(false, false, false, false, 1); + public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1); + public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2); + public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1); + public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2); + public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1); + public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, false, 2); + public static final StorageLevel MEMORY_AND_DISK = create(true, true, false, true, 1); + public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, false, true, 2); + public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, false, 1); + public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, false, 2); + public static final StorageLevel OFF_HEAP = create(false, false, true, false, 1); /** * Create a new StorageLevel object. @@ -42,7 +43,26 @@ public class StorageLevels { * @param deserialized saved as deserialized objects, if true * @param replication replication factor */ - public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) { - return StorageLevel.apply(useDisk, useMemory, deserialized, replication); + @Deprecated + public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, + int replication) { + return StorageLevel.apply(useDisk, useMemory, false, deserialized, replication); + } + + /** + * Create a new StorageLevel object. + * @param useDisk saved to disk, if true + * @param useMemory saved to memory, if true + * @param useOffHeap saved to Tachyon, if true + * @param deserialized saved as deserialized objects, if true + * @param replication replication factor + */ + public static StorageLevel create( + boolean useDisk, + boolean useMemory, + boolean useOffHeap, + boolean deserialized, + int replication) { + return StorageLevel.apply(useDisk, useMemory, useOffHeap, deserialized, replication); } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 835cffe37a938..fcf16ce1b278e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -19,14 +19,13 @@ package org.apache.spark import java.io._ import java.net.URI -import java.util.{Properties, UUID} import java.util.concurrent.atomic.AtomicInteger - +import java.util.{Properties, UUID} +import java.util.UUID.randomUUID import scala.collection.{Map, Set} import scala.collection.generic.Growable import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.reflect.{ClassTag, classTag} - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} @@ -130,6 +129,11 @@ class SparkContext( val master = conf.get("spark.master") val appName = conf.get("spark.app.name") + // Generate the random name for a temp folder in Tachyon + // Add a timestamp as the suffix here to make it more safe + val tachyonFolderName = "spark-" + randomUUID.toString() + conf.set("spark.tachyonStore.folderName", tachyonFolderName) + val isLocal = (master == "local" || master.startsWith("local[")) if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 3486092a140fb..16887d8892b31 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -53,7 +53,8 @@ private[spark] class CoarseGrainedExecutorBackend( case RegisteredExecutor(sparkProperties) => logInfo("Successfully registered with driver") // Make this host instead of hostPort ? - executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties) + executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties, + false) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) @@ -105,7 +106,8 @@ private[spark] object CoarseGrainedExecutorBackend { // set it val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( - Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores), + Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, + sparkHostPort, cores), name = "Executor") workerUrl.foreach{ url => actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala index 210f3dbeebaca..ceff3a067d72a 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala @@ -41,6 +41,12 @@ object ExecutorExitCode { /** DiskStore failed to create a local temporary directory after many attempts. */ val DISK_STORE_FAILED_TO_CREATE_DIR = 53 + /** TachyonStore failed to initialize after many attempts. */ + val TACHYON_STORE_FAILED_TO_INITIALIZE = 54 + + /** TachyonStore failed to create a local temporary directory after many attempts. */ + val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55 + def explainExitCode(exitCode: Int): String = { exitCode match { case UNCAUGHT_EXCEPTION => "Uncaught exception" @@ -48,6 +54,9 @@ object ExecutorExitCode { case OOM => "OutOfMemoryError" case DISK_STORE_FAILED_TO_CREATE_DIR => "Failed to create local directory (bad spark.local.dir?)" + case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize." + case TACHYON_STORE_FAILED_TO_CREATE_DIR => + "TachyonStore failed to create a local temporary directory." case _ => "Unknown executor exit code (" + exitCode + ")" + ( if (exitCode > 128) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 71584b6eb102a..19138d9dde697 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -19,22 +19,20 @@ package org.apache.spark.storage import java.io.{File, InputStream, OutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} - import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.util.Random - import akka.actor.{ActorSystem, Cancellable, Props} import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream} import sun.nio.ch.DirectBuffer - import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException} import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ import org.apache.spark.serializer.Serializer import org.apache.spark.util._ + sealed trait Values case class ByteBufferValues(buffer: ByteBuffer) extends Values @@ -59,6 +57,17 @@ private[spark] class BlockManager( private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) private[storage] val diskStore = new DiskStore(this, diskBlockManager) + var tachyonInitialized = false + private[storage] lazy val tachyonStore: TachyonStore = { + val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon") + val appFolderName = conf.get("spark.tachyonStore.folderName") + val tachyonStorePath = s"${storeDir}/${appFolderName}/${this.executorId}" + val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998") + val tachyonBlockManager = new TachyonBlockManager( + shuffleBlockManager, tachyonStorePath, tachyonMaster) + tachyonInitialized = true + new TachyonStore(this, tachyonBlockManager) + } // If we use Netty for shuffle, start a new Netty-based shuffle sender service. private val nettyPort: Int = { @@ -248,8 +257,10 @@ private[spark] class BlockManager( if (info.tellMaster) { val storageLevel = status.storageLevel val inMemSize = Math.max(status.memSize, droppedMemorySize) + val inTachyonSize = status.tachyonSize val onDiskSize = status.diskSize - master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) + master.updateBlockInfo( + blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize) } else true } @@ -259,22 +270,24 @@ private[spark] class BlockManager( * and the updated in-memory and on-disk sizes. */ private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = { - val (newLevel, inMemSize, onDiskSize) = info.synchronized { + val (newLevel, inMemSize, onDiskSize, inTachyonSize) = info.synchronized { info.level match { case null => - (StorageLevel.NONE, 0L, 0L) + (StorageLevel.NONE, 0L, 0L, 0L) case level => val inMem = level.useMemory && memoryStore.contains(blockId) + val inTachyon = level.useOffHeap && tachyonStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId) val deserialized = if (inMem) level.deserialized else false - val replication = if (inMem || onDisk) level.replication else 1 - val storageLevel = StorageLevel(onDisk, inMem, deserialized, replication) + val replication = if (inMem || inTachyon || onDisk) level.replication else 1 + val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication) val memSize = if (inMem) memoryStore.getSize(blockId) else 0L + val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L - (storageLevel, memSize, diskSize) + (storageLevel, memSize, diskSize, tachyonSize) } } - BlockStatus(newLevel, inMemSize, onDiskSize) + BlockStatus(newLevel, inMemSize, onDiskSize, inTachyonSize) } /** @@ -354,6 +367,24 @@ private[spark] class BlockManager( logDebug("Block " + blockId + " not found in memory") } } + + // Look for the block in Tachyon + if (level.useOffHeap) { + logDebug("Getting block " + blockId + " from tachyon") + if (tachyonStore.contains(blockId)) { + tachyonStore.getBytes(blockId) match { + case Some(bytes) => { + if (!asValues) { + return Some(bytes) + } else { + return Some(dataDeserialize(blockId, bytes)) + } + } + case None => + logDebug("Block " + blockId + " not found in tachyon") + } + } + } // Look for block on disk, potentially storing it back into memory if required: if (level.useDisk) { @@ -620,6 +651,23 @@ private[spark] class BlockManager( } // Keep track of which blocks are dropped from memory res.droppedBlocks.foreach { block => updatedBlocks += block } + } else if (level.useOffHeap) { + // Save to Tachyon. + val res = data match { + case IteratorValues(iterator) => + tachyonStore.putValues(blockId, iterator, level, false) + case ArrayBufferValues(array) => + tachyonStore.putValues(blockId, array, level, false) + case ByteBufferValues(bytes) => { + bytes.rewind(); + tachyonStore.putBytes(blockId, bytes, level) + } + } + size = res.size + res.data match { + case Right(newBytes) => bytesAfterPut = newBytes + case _ => + } } else { // Save directly to disk. // Don't get back the bytes unless we replicate them. @@ -644,8 +692,8 @@ private[spark] class BlockManager( val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) if (putBlockStatus.storageLevel != StorageLevel.NONE) { - // Now that the block is in either the memory or disk store, let other threads read it, - // and tell the master about it. + // Now that the block is in either the memory, tachyon, or disk store, + // let other threads read it, and tell the master about it. marked = true putBlockInfo.markReady(size) if (tellMaster) { @@ -707,7 +755,8 @@ private[spark] class BlockManager( */ var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) { - val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1) + val tLevel = StorageLevel( + level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) if (cachedPeers == null) { cachedPeers = master.getPeers(blockManagerId, level.replication - 1) } @@ -832,9 +881,10 @@ private[spark] class BlockManager( // Removals are idempotent in disk store and memory store. At worst, we get a warning. val removedFromMemory = memoryStore.remove(blockId) val removedFromDisk = diskStore.remove(blockId) - if (!removedFromMemory && !removedFromDisk) { + val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false + if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) { logWarning("Block " + blockId + " could not be removed as it was not found in either " + - "the disk or memory store") + "the disk, memory, or tachyon store") } blockInfo.remove(blockId) if (tellMaster && info.tellMaster) { @@ -871,6 +921,9 @@ private[spark] class BlockManager( if (level.useDisk) { diskStore.remove(id) } + if (level.useOffHeap) { + tachyonStore.remove(id) + } iterator.remove() logInfo("Dropped block " + id) } @@ -946,6 +999,9 @@ private[spark] class BlockManager( blockInfo.clear() memoryStore.clear() diskStore.clear() + if (tachyonInitialized) { + tachyonStore.clear() + } metadataCleaner.cancel() broadcastCleaner.cancel() logInfo("BlockManager stopped") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index ed6937851b836..4bc1b407ad106 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -63,9 +63,10 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long): Boolean = { + diskSize: Long, + tachyonSize: Long): Boolean = { val res = askDriverWithReply[Boolean]( - UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)) + UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize)) logInfo("Updated info of block " + blockId) res } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index ff2652b640272..378f4cadc17d7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -73,10 +73,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus register(blockManagerId, maxMemSize, slaveActor) sender ! true - case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => + case UpdateBlockInfo( + blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) => // TODO: Ideally we want to handle all the message replies in receive instead of in the // individual private methods. - updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) + updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) case GetLocations(blockId) => sender ! getLocations(blockId) @@ -246,7 +247,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long) { + diskSize: Long, + tachyonSize: Long) { if (!blockManagerInfo.contains(blockManagerId)) { if (blockManagerId.executorId == "" && !isLocal) { @@ -265,7 +267,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus return } - blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize) + blockManagerInfo(blockManagerId).updateBlockInfo( + blockId, storageLevel, memSize, diskSize, tachyonSize) var locations: mutable.HashSet[BlockManagerId] = null if (blockLocations.containsKey(blockId)) { @@ -309,8 +312,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } } - -private[spark] case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) +private[spark] case class BlockStatus( + storageLevel: StorageLevel, + memSize: Long, + diskSize: Long, + tachyonSize: Long) private[spark] class BlockManagerInfo( val blockManagerId: BlockManagerId, @@ -336,7 +342,8 @@ private[spark] class BlockManagerInfo( blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long) { + diskSize: Long, + tachyonSize: Long) { updateLastSeenMs() @@ -350,23 +357,29 @@ private[spark] class BlockManagerInfo( } if (storageLevel.isValid) { - /* isValid means it is either stored in-memory or on-disk. + /* isValid means it is either stored in-memory, on-disk or on-Tachyon. * But the memSize here indicates the data size in or dropped from memory, + * tachyonSize here indicates the data size in or dropped from Tachyon, * and the diskSize here indicates the data size in or dropped to disk. * They can be both larger than 0, when a block is dropped from memory to disk. * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */ if (storageLevel.useMemory) { - _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0)) + _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0, 0)) _remainingMem -= memSize logInfo("Added %s in memory on %s (size: %s, free: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(memSize), Utils.bytesToString(_remainingMem))) } if (storageLevel.useDisk) { - _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize)) + _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize, 0)) logInfo("Added %s on disk on %s (size: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize))) } + if (storageLevel.useOffHeap) { + _blocks.put(blockId, BlockStatus(storageLevel, 0, 0, tachyonSize)) + logInfo("Added %s on tachyon on %s (size: %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(tachyonSize))) + } } else if (_blocks.containsKey(blockId)) { // If isValid is not true, drop the block. val blockStatus: BlockStatus = _blocks.get(blockId) @@ -381,6 +394,10 @@ private[spark] class BlockManagerInfo( logInfo("Removed %s on %s on disk (size: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize))) } + if (blockStatus.storageLevel.useOffHeap) { + logInfo("Removed %s on %s on tachyon (size: %s)".format( + blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize))) + } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index bbb9529b5a0ca..8a36b5cc42dfd 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -53,11 +53,12 @@ private[storage] object BlockManagerMessages { var blockId: BlockId, var storageLevel: StorageLevel, var memSize: Long, - var diskSize: Long) + var diskSize: Long, + var tachyonSize: Long) extends ToBlockManagerMaster with Externalizable { - def this() = this(null, null, null, 0, 0) // For deserialization only + def this() = this(null, null, null, 0, 0, 0) // For deserialization only override def writeExternal(out: ObjectOutput) { blockManagerId.writeExternal(out) @@ -65,6 +66,7 @@ private[storage] object BlockManagerMessages { storageLevel.writeExternal(out) out.writeLong(memSize) out.writeLong(diskSize) + out.writeLong(tachyonSize) } override def readExternal(in: ObjectInput) { @@ -73,6 +75,7 @@ private[storage] object BlockManagerMessages { storageLevel = StorageLevel(in) memSize = in.readLong() diskSize = in.readLong() + tachyonSize = in.readLong() } } @@ -81,13 +84,15 @@ private[storage] object BlockManagerMessages { blockId: BlockId, storageLevel: StorageLevel, memSize: Long, - diskSize: Long): UpdateBlockInfo = { - new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize) + diskSize: Long, + tachyonSize: Long): UpdateBlockInfo = { + new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize) } // For pattern-matching - def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, BlockId, StorageLevel, Long, Long)] = { - Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize)) + def unapply(h: UpdateBlockInfo) + : Option[(BlockManagerId, BlockId, StorageLevel, Long, Long, Long)] = { + Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize, h.tachyonSize)) } } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 4212a539dab4b..95e71de2d3f1d 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -21,8 +21,9 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} /** * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, - * whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory - * in a serialized format, and whether to replicate the RDD partitions on multiple nodes. + * or Tachyon, whether to drop the RDD to disk if it falls out of memory or Tachyon , whether to + * keep the data in memory in a serialized format, and whether to replicate the RDD partitions on + * multiple nodes. * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants * for commonly useful storage levels. To create your own storage level object, use the * factory method of the singleton object (`StorageLevel(...)`). @@ -30,45 +31,58 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} class StorageLevel private( private var useDisk_ : Boolean, private var useMemory_ : Boolean, + private var useOffHeap_ : Boolean, private var deserialized_ : Boolean, private var replication_ : Int = 1) extends Externalizable { // TODO: Also add fields for caching priority, dataset ID, and flushing. private def this(flags: Int, replication: Int) { - this((flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication) + this((flags & 8) != 0, (flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication) } - def this() = this(false, true, false) // For deserialization + def this() = this(false, true, false, false) // For deserialization def useDisk = useDisk_ def useMemory = useMemory_ + def useOffHeap = useOffHeap_ def deserialized = deserialized_ def replication = replication_ assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes") + if (useOffHeap) { + require(useDisk == false, "Off-heap storage level does not support using disk") + require(useMemory == false, "Off-heap storage level does not support using heap memory") + require(deserialized == false, "Off-heap storage level does not support deserialized storage") + require(replication == 1, "Off-heap storage level does not support multiple replication") + } + override def clone(): StorageLevel = new StorageLevel( - this.useDisk, this.useMemory, this.deserialized, this.replication) + this.useDisk, this.useMemory, this.useOffHeap, this.deserialized, this.replication) override def equals(other: Any): Boolean = other match { case s: StorageLevel => s.useDisk == useDisk && s.useMemory == useMemory && + s.useOffHeap == useOffHeap && s.deserialized == deserialized && s.replication == replication case _ => false } - def isValid = ((useMemory || useDisk) && (replication > 0)) + def isValid = ((useMemory || useDisk || useOffHeap) && (replication > 0)) def toInt: Int = { var ret = 0 if (useDisk_) { - ret |= 4 + ret |= 8 } if (useMemory_) { + ret |= 4 + } + if (useOffHeap_) { ret |= 2 } if (deserialized_) { @@ -84,8 +98,9 @@ class StorageLevel private( override def readExternal(in: ObjectInput) { val flags = in.readByte() - useDisk_ = (flags & 4) != 0 - useMemory_ = (flags & 2) != 0 + useDisk_ = (flags & 8) != 0 + useMemory_ = (flags & 4) != 0 + useOffHeap_ = (flags & 2) != 0 deserialized_ = (flags & 1) != 0 replication_ = in.readByte() } @@ -93,14 +108,15 @@ class StorageLevel private( @throws(classOf[IOException]) private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this) - override def toString: String = - "StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication) + override def toString: String = "StorageLevel(%b, %b, %b, %b, %d)".format( + useDisk, useMemory, useOffHeap, deserialized, replication) override def hashCode(): Int = toInt * 41 + replication def description : String = { var result = "" result += (if (useDisk) "Disk " else "") result += (if (useMemory) "Memory " else "") + result += (if (useOffHeap) "Tachyon " else "") result += (if (deserialized) "Deserialized " else "Serialized ") result += "%sx Replicated".format(replication) result @@ -113,22 +129,28 @@ class StorageLevel private( * new storage levels. */ object StorageLevel { - val NONE = new StorageLevel(false, false, false) - val DISK_ONLY = new StorageLevel(true, false, false) - val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) - val MEMORY_ONLY = new StorageLevel(false, true, true) - val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) - val MEMORY_ONLY_SER = new StorageLevel(false, true, false) - val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) - val MEMORY_AND_DISK = new StorageLevel(true, true, true) - val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) - val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) - val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2) + val NONE = new StorageLevel(false, false, false, false) + val DISK_ONLY = new StorageLevel(true, false, false, false) + val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) + val MEMORY_ONLY = new StorageLevel(false, true, false, true) + val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) + val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) + val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) + val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) + val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) + val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) + val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) + val OFF_HEAP = new StorageLevel(false, false, true, false) + + /** Create a new StorageLevel object without setting useOffHeap */ + def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean, + deserialized: Boolean, replication: Int) = getCachedStorageLevel( + new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)) /** Create a new StorageLevel object */ - def apply(useDisk: Boolean, useMemory: Boolean, deserialized: Boolean, - replication: Int = 1): StorageLevel = - getCachedStorageLevel(new StorageLevel(useDisk, useMemory, deserialized, replication)) + def apply(useDisk: Boolean, useMemory: Boolean, + deserialized: Boolean, replication: Int = 1) = getCachedStorageLevel( + new StorageLevel(useDisk, useMemory, false, deserialized, replication)) /** Create a new StorageLevel object from its integer representation */ def apply(flags: Int, replication: Int): StorageLevel = diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index 26565f56ad858..7a174959037be 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -44,7 +44,7 @@ private[spark] class StorageStatusListener extends SparkListener { storageStatusList.foreach { storageStatus => val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId) unpersistedBlocksIds.foreach { blockId => - storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L) + storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 6153dfe0b7e13..ff6e84cf9819a 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -48,17 +48,23 @@ class StorageStatus( } private[spark] -class RDDInfo(val id: Int, val name: String, val numPartitions: Int, val storageLevel: StorageLevel) - extends Ordered[RDDInfo] { +class RDDInfo( + val id: Int, + val name: String, + val numPartitions: Int, + val storageLevel: StorageLevel) extends Ordered[RDDInfo] { var numCachedPartitions = 0 var memSize = 0L var diskSize = 0L + var tachyonSize= 0L override def toString = { - ("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " + - "DiskSize: %s").format(name, id, storageLevel.toString, numCachedPartitions, - numPartitions, Utils.bytesToString(memSize), Utils.bytesToString(diskSize)) + import Utils.bytesToString + ("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s;" + + "TachyonSize: %s; DiskSize: %s").format( + name, id, storageLevel.toString, numCachedPartitions, numPartitions, + bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize)) } override def compare(that: RDDInfo) = { @@ -105,14 +111,17 @@ object StorageUtils { val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) => - // Add up memory and disk sizes - val persistedBlocks = blocks.filter { status => status.memSize + status.diskSize > 0 } + // Add up memory, disk and Tachyon sizes + val persistedBlocks = + blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 } val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L) val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L) + val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L) rddInfoMap.get(rddId).map { rddInfo => rddInfo.numCachedPartitions = persistedBlocks.length rddInfo.memSize = memSize rddInfo.diskSize = diskSize + rddInfo.tachyonSize = tachyonSize rddInfo } }.toArray diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala new file mode 100644 index 0000000000000..b0b9674856568 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.text.SimpleDateFormat +import java.util.{Date, Random} + +import tachyon.client.TachyonFS +import tachyon.client.TachyonFile + +import org.apache.spark.Logging +import org.apache.spark.executor.ExecutorExitCode +import org.apache.spark.network.netty.ShuffleSender +import org.apache.spark.util.Utils + + +/** + * Creates and maintains the logical mapping between logical blocks and tachyon fs locations. By + * default, one block is mapped to one file with a name given by its BlockId. + * + * @param rootDirs The directories to use for storing block files. Data will be hashed among these. + */ +private[spark] class TachyonBlockManager( + shuffleManager: ShuffleBlockManager, + rootDirs: String, + val master: String) + extends Logging { + + val client = if (master != null && master != "") TachyonFS.get(master) else null + + if (client == null) { + logError("Failed to connect to the Tachyon as the master address is not configured") + System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_INITIALIZE) + } + + private val MAX_DIR_CREATION_ATTEMPTS = 10 + private val subDirsPerTachyonDir = + shuffleManager.conf.get("spark.tachyonStore.subDirectories", "64").toInt + + // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName; + // then, inside this directory, create multiple subdirectories that we will hash files into, + // in order to avoid having really large inodes at the top level in Tachyon. + private val tachyonDirs: Array[TachyonFile] = createTachyonDirs() + private val subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir)) + + addShutdownHook() + + def removeFile(file: TachyonFile): Boolean = { + client.delete(file.getPath(), false) + } + + def fileExists(file: TachyonFile): Boolean = { + client.exist(file.getPath()) + } + + def getFile(filename: String): TachyonFile = { + // Figure out which tachyon directory it hashes to, and which subdirectory in that + val hash = Utils.nonNegativeHash(filename) + val dirId = hash % tachyonDirs.length + val subDirId = (hash / tachyonDirs.length) % subDirsPerTachyonDir + + // Create the subdirectory if it doesn't already exist + var subDir = subDirs(dirId)(subDirId) + if (subDir == null) { + subDir = subDirs(dirId).synchronized { + val old = subDirs(dirId)(subDirId) + if (old != null) { + old + } else { + val path = tachyonDirs(dirId) + "/" + "%02x".format(subDirId) + client.mkdir(path) + val newDir = client.getFile(path) + subDirs(dirId)(subDirId) = newDir + newDir + } + } + } + val filePath = subDir + "/" + filename + if(!client.exist(filePath)) { + client.createFile(filePath) + } + val file = client.getFile(filePath) + file + } + + def getFile(blockId: BlockId): TachyonFile = getFile(blockId.name) + + // TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore. + private def createTachyonDirs(): Array[TachyonFile] = { + logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'") + val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") + rootDirs.split(",").map { rootDir => + var foundLocalDir = false + var tachyonDir: TachyonFile = null + var tachyonDirId: String = null + var tries = 0 + val rand = new Random() + while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) { + tries += 1 + try { + tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) + val path = rootDir + "/" + "spark-tachyon-" + tachyonDirId + if (!client.exist(path)) { + foundLocalDir = client.mkdir(path) + tachyonDir = client.getFile(path) + } + } catch { + case e: Exception => + logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e) + } + } + if (!foundLocalDir) { + logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " + + rootDir) + System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR) + } + logInfo("Created tachyon directory at " + tachyonDir) + tachyonDir + } + } + + private def addShutdownHook() { + tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir)) + Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") { + override def run() { + logDebug("Shutdown hook called") + tachyonDirs.foreach { tachyonDir => + try { + if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) { + Utils.deleteRecursively(tachyonDir, client) + } + } catch { + case t: Throwable => + logError("Exception while deleting tachyon spark dir: " + tachyonDir, t) + } + } + } + }) + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala b/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala new file mode 100644 index 0000000000000..b86abbda1d3e7 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import tachyon.client.TachyonFile + +/** + * References a particular segment of a file (potentially the entire file), based off an offset and + * a length. + */ +private[spark] class TachyonFileSegment(val file: TachyonFile, val offset: Long, val length: Long) { + override def toString = "(name=%s, offset=%d, length=%d)".format(file.getPath(), offset, length) +} diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala new file mode 100644 index 0000000000000..c37e76f893605 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.IOException +import java.nio.ByteBuffer + +import scala.collection.mutable.ArrayBuffer + +import tachyon.client.{WriteType, ReadType} + +import org.apache.spark.Logging +import org.apache.spark.util.Utils +import org.apache.spark.serializer.Serializer + + +private class Entry(val size: Long) + + +/** + * Stores BlockManager blocks on Tachyon. + */ +private class TachyonStore( + blockManager: BlockManager, + tachyonManager: TachyonBlockManager) + extends BlockStore(blockManager: BlockManager) with Logging { + + logInfo("TachyonStore started") + + override def getSize(blockId: BlockId): Long = { + tachyonManager.getFile(blockId.name).length + } + + override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = { + putToTachyonStore(blockId, bytes, true) + } + + override def putValues( + blockId: BlockId, + values: ArrayBuffer[Any], + level: StorageLevel, + returnValues: Boolean): PutResult = { + return putValues(blockId, values.toIterator, level, returnValues) + } + + override def putValues( + blockId: BlockId, + values: Iterator[Any], + level: StorageLevel, + returnValues: Boolean): PutResult = { + logDebug("Attempting to write values for block " + blockId) + val _bytes = blockManager.dataSerialize(blockId, values) + putToTachyonStore(blockId, _bytes, returnValues) + } + + private def putToTachyonStore( + blockId: BlockId, + bytes: ByteBuffer, + returnValues: Boolean): PutResult = { + // So that we do not modify the input offsets ! + // duplicate does not copy buffer, so inexpensive + val byteBuffer = bytes.duplicate() + byteBuffer.rewind() + logDebug("Attempting to put block " + blockId + " into Tachyon") + val startTime = System.currentTimeMillis + val file = tachyonManager.getFile(blockId) + val os = file.getOutStream(WriteType.TRY_CACHE) + os.write(byteBuffer.array()) + os.close() + val finishTime = System.currentTimeMillis + logDebug("Block %s stored as %s file in Tachyon in %d ms".format( + blockId, Utils.bytesToString(byteBuffer.limit), (finishTime - startTime))) + + if (returnValues) { + PutResult(bytes.limit(), Right(bytes.duplicate())) + } else { + PutResult(bytes.limit(), null) + } + } + + override def remove(blockId: BlockId): Boolean = { + val file = tachyonManager.getFile(blockId) + if (tachyonManager.fileExists(file)) { + tachyonManager.removeFile(file) + } else { + false + } + } + + override def getValues(blockId: BlockId): Option[Iterator[Any]] = { + getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) + } + + + override def getBytes(blockId: BlockId): Option[ByteBuffer] = { + val file = tachyonManager.getFile(blockId) + if (file == null || file.getLocationHosts().size == 0) { + return None + } + val is = file.getInStream(ReadType.CACHE) + var buffer: ByteBuffer = null + try { + if (is != null) { + val size = file.length + val bs = new Array[Byte](size.asInstanceOf[Int]) + val fetchSize = is.read(bs, 0, size.asInstanceOf[Int]) + buffer = ByteBuffer.wrap(bs) + if (fetchSize != size) { + logWarning("Failed to fetch the block " + blockId + " from Tachyon : Size " + size + + " is not equal to fetched size " + fetchSize) + return None + } + } + } catch { + case ioe: IOException => { + logWarning("Failed to fetch the block " + blockId + " from Tachyon", ioe) + return None + } + } + Some(buffer) + } + + override def contains(blockId: BlockId): Boolean = { + val file = tachyonManager.getFile(blockId) + tachyonManager.fileExists(file) + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala index b2732de51058a..0fa461e5e9d27 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala @@ -33,6 +33,7 @@ private[ui] class IndexPage(parent: BlockManagerUI) { private lazy val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { + val rdds = listener.rddInfoList val content = UIUtils.listingTable(rddHeader, rddRow, rdds) UIUtils.headerSparkPage(content, basePath, appName, "Storage ", Storage) @@ -45,6 +46,7 @@ private[ui] class IndexPage(parent: BlockManagerUI) { "Cached Partitions", "Fraction Cached", "Size in Memory", + "Size in Tachyon", "Size on Disk") /** Render an HTML row representing an RDD */ @@ -60,6 +62,7 @@ private[ui] class IndexPage(parent: BlockManagerUI) { {rdd.numCachedPartitions} {"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)} {Utils.bytesToString(rdd.memSize)} + {Utils.bytesToString(rdd.tachyonSize)} {Utils.bytesToString(rdd.diskSize)} } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index d9a6af61872d1..2155a8888c85c 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -274,12 +274,14 @@ private[spark] object JsonProtocol { ("Number of Partitions" -> rddInfo.numPartitions) ~ ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~ ("Memory Size" -> rddInfo.memSize) ~ + ("Tachyon Size" -> rddInfo.tachyonSize) ~ ("Disk Size" -> rddInfo.diskSize) } def storageLevelToJson(storageLevel: StorageLevel): JValue = { ("Use Disk" -> storageLevel.useDisk) ~ ("Use Memory" -> storageLevel.useMemory) ~ + ("Use Tachyon" -> storageLevel.useOffHeap) ~ ("Deserialized" -> storageLevel.deserialized) ~ ("Replication" -> storageLevel.replication) } @@ -288,6 +290,7 @@ private[spark] object JsonProtocol { val storageLevel = storageLevelToJson(blockStatus.storageLevel) ("Storage Level" -> storageLevel) ~ ("Memory Size" -> blockStatus.memSize) ~ + ("Tachyon Size" -> blockStatus.tachyonSize) ~ ("Disk Size" -> blockStatus.diskSize) } @@ -570,11 +573,13 @@ private[spark] object JsonProtocol { val numPartitions = (json \ "Number of Partitions").extract[Int] val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int] val memSize = (json \ "Memory Size").extract[Long] + val tachyonSize = (json \ "Tachyon Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel) rddInfo.numCachedPartitions = numCachedPartitions rddInfo.memSize = memSize + rddInfo.tachyonSize = tachyonSize rddInfo.diskSize = diskSize rddInfo } @@ -582,16 +587,18 @@ private[spark] object JsonProtocol { def storageLevelFromJson(json: JValue): StorageLevel = { val useDisk = (json \ "Use Disk").extract[Boolean] val useMemory = (json \ "Use Memory").extract[Boolean] + val useTachyon = (json \ "Use Tachyon").extract[Boolean] val deserialized = (json \ "Deserialized").extract[Boolean] val replication = (json \ "Replication").extract[Int] - StorageLevel(useDisk, useMemory, deserialized, replication) + StorageLevel(useDisk, useMemory, useTachyon, deserialized, replication) } def blockStatusFromJson(json: JValue): BlockStatus = { val storageLevel = storageLevelFromJson(json \ "Storage Level") val memorySize = (json \ "Memory Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] - BlockStatus(storageLevel, memorySize, diskSize) + val tachyonSize = (json \ "Tachyon Size").extract[Long] + BlockStatus(storageLevel, memorySize, diskSize, tachyonSize) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 737b765e2aed6..d3c39dee330b2 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -34,11 +34,13 @@ import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.json4s._ +import tachyon.client.{TachyonFile,TachyonFS} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} + /** * Various utility methods used by Spark. */ @@ -153,6 +155,7 @@ private[spark] object Utils extends Logging { } private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]() + private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]() // Register the path to be deleted via shutdown hook def registerShutdownDeleteDir(file: File) { @@ -162,6 +165,14 @@ private[spark] object Utils extends Logging { } } + // Register the tachyon path to be deleted via shutdown hook + def registerShutdownDeleteDir(tachyonfile: TachyonFile) { + val absolutePath = tachyonfile.getPath() + shutdownDeleteTachyonPaths.synchronized { + shutdownDeleteTachyonPaths += absolutePath + } + } + // Is the path already registered to be deleted via a shutdown hook ? def hasShutdownDeleteDir(file: File): Boolean = { val absolutePath = file.getAbsolutePath() @@ -170,6 +181,14 @@ private[spark] object Utils extends Logging { } } + // Is the path already registered to be deleted via a shutdown hook ? + def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = { + val absolutePath = file.getPath() + shutdownDeletePaths.synchronized { + shutdownDeletePaths.contains(absolutePath) + } + } + // Note: if file is child of some registered path, while not equal to it, then return true; // else false. This is to ensure that two shutdown hooks do not try to delete each others // paths - resulting in IOException and incomplete cleanup. @@ -186,6 +205,22 @@ private[spark] object Utils extends Logging { retval } + // Note: if file is child of some registered path, while not equal to it, then return true; + // else false. This is to ensure that two shutdown hooks do not try to delete each others + // paths - resulting in Exception and incomplete cleanup. + def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = { + val absolutePath = file.getPath() + val retval = shutdownDeletePaths.synchronized { + shutdownDeletePaths.find { path => + !absolutePath.equals(path) && absolutePath.startsWith(path) + }.isDefined + } + if (retval) { + logInfo("path = " + file + ", already present as root for deletion.") + } + retval + } + /** Create a temporary directory inside the given parent directory */ def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = { var attempts = 0 @@ -541,7 +576,16 @@ private[spark] object Utils extends Logging { } /** - * Check to see if file is a symbolic link. + * Delete a file or directory and its contents recursively. + */ + def deleteRecursively(dir: TachyonFile, client: TachyonFS) { + if (!client.delete(dir.getPath(), true)) { + throw new IOException("Failed to delete the tachyon dir: " + dir) + } + } + + /** + * Check to see if file is a symbolic link. */ def isSymlink(file: File): Boolean = { if (file == null) throw new NullPointerException("File must not be null") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index e83cd55e73691..b6dd0526105a0 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -96,9 +96,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("StorageLevel object caching") { - val level1 = StorageLevel(false, false, false, 3) - val level2 = StorageLevel(false, false, false, 3) // this should return the same object as level1 - val level3 = StorageLevel(false, false, false, 2) // this should return a different object + val level1 = StorageLevel(false, false, false, false, 3) + val level2 = StorageLevel(false, false, false, false, 3) // this should return the same object as level1 + val level3 = StorageLevel(false, false, false, false, 2) // this should return a different object assert(level2 === level1, "level2 is not same as level1") assert(level2.eq(level1), "level2 is not the same object as level1") assert(level3 != level1, "level3 is same as level1") @@ -410,6 +410,25 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.memoryStore.contains(rdd(0, 3)), "rdd_0_3 was not in store") } + test("tachyon storage") { + // TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 testing jar. + val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false) + if (tachyonUnitTestEnabled) { + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.OFF_HEAP) + store.putSingle("a2", a2, StorageLevel.OFF_HEAP) + store.putSingle("a3", a3, StorageLevel.OFF_HEAP) + assert(store.getSingle("a3").isDefined, "a3 was in store") + assert(store.getSingle("a2").isDefined, "a2 was in store") + assert(store.getSingle("a1").isDefined, "a1 was in store") + } else { + info("tachyon storage test disabled.") + } + } + test("on-disk storage") { store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) val a1 = new Array[Byte](400) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 40c29014c4b59..054eb01a64c11 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -456,7 +456,7 @@ class JsonProtocolSuite extends FunSuite { t.shuffleWriteMetrics = Some(sw) // Make at most 6 blocks t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i => - (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i)) + (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i, c%i)) }.toSeq) t } @@ -470,19 +470,19 @@ class JsonProtocolSuite extends FunSuite { """ {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name": "greetings","Number of Tasks":200,"RDD Info":{"RDD ID":100,"Name":"mayor","Storage - Level":{"Use Disk":true,"Use Memory":true,"Deserialized":true,"Replication":1}, - "Number of Partitions":200,"Number of Cached Partitions":300,"Memory Size":400, - "Disk Size":500},"Emitted Task Size Warning":false},"Properties":{"France":"Paris", - "Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}} + Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true, + "Replication":1},"Number of Partitions":200,"Number of Cached Partitions":300, + "Memory Size":400,"Disk Size":500,"Tachyon Size":0},"Emitted Task Size Warning":false}, + "Properties":{"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}} """ private val stageCompletedJsonString = """ {"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":101,"Stage Name": "greetings","Number of Tasks":201,"RDD Info":{"RDD ID":101,"Name":"mayor","Storage - Level":{"Use Disk":true,"Use Memory":true,"Deserialized":true,"Replication":1}, - "Number of Partitions":201,"Number of Cached Partitions":301,"Memory Size":401, - "Disk Size":501},"Emitted Task Size Warning":false}} + Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true, + "Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301, + "Memory Size":401,"Disk Size":501,"Tachyon Size":0},"Emitted Task Size Warning":false}} """ private val taskStartJsonString = @@ -515,8 +515,8 @@ class JsonProtocolSuite extends FunSuite { 700,"Fetch Wait Time":900,"Remote Bytes Read":1000},"Shuffle Write Metrics": {"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Updated Blocks": [{"Block ID":{"Type":"RDDBlockId","RDD ID":0,"Split Index":0},"Status": - {"Storage Level":{"Use Disk":true,"Use Memory":true,"Deserialized":false, - "Replication":2},"Memory Size":0,"Disk Size":0}}]}} + {"Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false, + "Replication":2},"Memory Size":0,"Disk Size":0,"Tachyon Size":0}}]}} """ private val jobStartJsonString = diff --git a/dev/run-tests b/dev/run-tests index fff949e04fcd7..6ad674a2ba127 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -26,13 +26,12 @@ rm -rf ./work # Fail fast set -e - +set -o pipefail if test -x "$JAVA_HOME/bin/java"; then declare java_cmd="$JAVA_HOME/bin/java" else declare java_cmd=java fi - JAVA_VERSION=$($java_cmd -version 2>&1 | sed 's/java version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q') [ "$JAVA_VERSION" -ge 18 ] && echo "" || echo "[Warn] Java 8 tests will not run because JDK version is < 1.8." @@ -49,7 +48,9 @@ dev/scalastyle echo "=========================================================================" echo "Running Spark unit tests" echo "=========================================================================" -sbt/sbt assembly test +# echo "q" is needed because sbt on encountering a build file with failure (either resolution or compilation) +# prompts the user for input either q, r, etc to quit or retry. This echo is there to make it not block. +echo -e "q\n" | sbt/sbt assembly test | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" echo "=========================================================================" echo "Running PySpark tests" @@ -63,5 +64,4 @@ echo "=========================================================================" echo "Detecting binary incompatibilites with MiMa" echo "=========================================================================" ./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore -sbt/sbt mima-report-binary-issues | grep -v -e "info.*Resolving" - +echo -e "q\n" | sbt/sbt mima-report-binary-issues | grep -v -e "info.*Resolving" diff --git a/dev/scalastyle b/dev/scalastyle index 5a18f4d672825..19955b9aaaad3 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -17,8 +17,8 @@ # limitations under the License. # -sbt/sbt clean scalastyle > scalastyle.txt -ERRORS=$(cat scalastyle.txt | grep -e "error file") +echo -e "q\n" | sbt/sbt clean scalastyle > scalastyle.txt +ERRORS=$(cat scalastyle.txt | grep -e "\") if test ! -z "$ERRORS"; then echo -e "Scalastyle checks failed at following occurrences:\n$ERRORS" exit 1 diff --git a/docs/configuration.md b/docs/configuration.md index 1ff0150567255..b6005acac8b93 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -122,6 +122,21 @@ Apart from these, the following properties are also available, and may be useful spark.storage.memoryFraction. + + spark.tachyonStore.baseDir + System.getProperty("java.io.tmpdir") + + Directories of the Tachyon File System that store RDDs. The Tachyon file system's URL is set by spark.tachyonStore.url. + It can also be a comma-separated list of multiple directories on Tachyon file system. + + + + spark.tachyonStore.url + tachyon://localhost:19998 + + The URL of the underlying Tachyon file system in the TachyonStore. + + spark.mesos.coarse false @@ -161,13 +176,13 @@ Apart from these, the following properties are also available, and may be useful spark.ui.acls.enable false - Whether spark web ui acls should are enabled. If enabled, this checks to see if the user has + Whether spark web ui acls should are enabled. If enabled, this checks to see if the user has access permissions to view the web ui. See spark.ui.view.acls for more details. Also note this requires the user to be known, if the user comes across as null no checks are done. Filters can be used to authenticate and set the user. - + spark.ui.view.acls Empty @@ -276,10 +291,10 @@ Apart from these, the following properties are also available, and may be useful spark.serializer.objectStreamReset 10000 - When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches - objects to prevent writing redundant data, however that stops garbage collection of those - objects. By calling 'reset' you flush that info from the serializer, and allow old - objects to be collected. To turn off this periodic reset set it to a value of <= 0. + When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches + objects to prevent writing redundant data, however that stops garbage collection of those + objects. By calling 'reset' you flush that info from the serializer, and allow old + objects to be collected. To turn off this periodic reset set it to a value of <= 0. By default it will reset the serializer every 10,000 objects. @@ -375,7 +390,7 @@ Apart from these, the following properties are also available, and may be useful spark.akka.heartbeat.interval 1000 - This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. + This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. @@ -430,7 +445,7 @@ Apart from these, the following properties are also available, and may be useful spark.broadcast.blockSize 4096 - Size of each piece of a block in kilobytes for TorrentBroadcastFactory. + Size of each piece of a block in kilobytes for TorrentBroadcastFactory. Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, BlockManager might take a performance hit. @@ -555,7 +570,7 @@ Apart from these, the following properties are also available, and may be useful the driver. - + spark.authenticate false @@ -563,7 +578,7 @@ Apart from these, the following properties are also available, and may be useful running on Yarn. - + spark.authenticate.secret None @@ -571,12 +586,12 @@ Apart from these, the following properties are also available, and may be useful not running on Yarn and authentication is enabled. - + spark.core.connection.auth.wait.timeout 30 Number of seconds for the connection to wait for authentication to occur before timing - out and giving up. + out and giving up. diff --git a/docs/quick-start.md b/docs/quick-start.md index 13df6beea16e8..60e8b1ba0eb46 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -124,7 +124,7 @@ object SimpleApp { } {% endhighlight %} -This program just counts the number of lines containing 'a' and the number containing 'b' in the Spark README. Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. Unlike the earlier examples with the Spark shell, which initializes its own SparkContext, we initialize a SparkContext as part of the proogram. We pass the SparkContext constructor four arguments, the type of scheduler we want to use (in this case, a local scheduler), a name for the application, the directory where Spark is installed, and a name for the jar file containing the application's code. The final two arguments are needed in a distributed setting, where Spark is running across several nodes, so we include them for completeness. Spark will automatically ship the jar files you list to slave nodes. +This program just counts the number of lines containing 'a' and the number containing 'b' in the Spark README. Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. Unlike the earlier examples with the Spark shell, which initializes its own SparkContext, we initialize a SparkContext as part of the program. We pass the SparkContext constructor four arguments, the type of scheduler we want to use (in this case, a local scheduler), a name for the application, the directory where Spark is installed, and a name for the jar file containing the application's code. The final two arguments are needed in a distributed setting, where Spark is running across several nodes, so we include them for completeness. Spark will automatically ship the jar files you list to slave nodes. This file depends on the Spark API, so we'll also include an sbt configuration file, `simple.sbt` which explains that Spark is a dependency. This file also adds a repository that Spark depends on: diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md index 99412733d4268..77373890eead7 100644 --- a/docs/scala-programming-guide.md +++ b/docs/scala-programming-guide.md @@ -23,7 +23,7 @@ To write a Spark application, you need to add a dependency on Spark. If you use groupId = org.apache.spark artifactId = spark-core_{{site.SCALA_BINARY_VERSION}} - version = {{site.SPARK_VERSION}} + version = {{site.SPARK_VERSION}} In addition, if you wish to access an HDFS cluster, you need to add a dependency on `hadoop-client` for your version of HDFS: @@ -73,14 +73,14 @@ The master URL passed to Spark can be in one of the following formats: - - -
Master URLMeaning
local Run Spark locally with one worker thread (i.e. no parallelism at all).
local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine). +
local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
spark://HOST:PORT Connect to the given Spark standalone - cluster master. The port must be whichever one your master is configured to use, which is 7077 by default. +
spark://HOST:PORT Connect to the given Spark standalone + cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.
mesos://HOST:PORT Connect to the given Mesos cluster. - The host parameter is the hostname of the Mesos master. The port must be whichever one the master is configured to use, - which is 5050 by default. +
mesos://HOST:PORT Connect to the given Mesos cluster. + The host parameter is the hostname of the Mesos master. The port must be whichever one the master is configured to use, + which is 5050 by default.
@@ -265,11 +265,25 @@ A complete list of actions is available in the [RDD API doc](api/core/index.html ## RDD Persistence -One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory across operations. When you persist an RDD, each node stores any slices of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for building iterative algorithms with Spark and for interactive use from the interpreter. - -You can mark an RDD to be persisted using the `persist()` or `cache()` methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. The cache is fault-tolerant -- if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it. - -In addition, each RDD can be stored using a different *storage level*, allowing you, for example, to persist the dataset on disk, or persist it in memory but as serialized Java objects (to save space), or even replicate it across nodes. These levels are chosen by passing a [`org.apache.spark.storage.StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel) object to `persist()`. The `cache()` method is a shorthand for using the default storage level, which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The complete set of available storage levels is: +One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory +across operations. When you persist an RDD, each node stores any slices of it that it computes in +memory and reuses them in other actions on that dataset (or datasets derived from it). This allows +future actions to be much faster (often by more than 10x). Caching is a key tool for building +iterative algorithms with Spark and for interactive use from the interpreter. + +You can mark an RDD to be persisted using the `persist()` or `cache()` methods on it. The first time +it is computed in an action, it will be kept in memory on the nodes. The cache is fault-tolerant -- +if any partition of an RDD is lost, it will automatically be recomputed using the transformations +that originally created it. + +In addition, each RDD can be stored using a different *storage level*, allowing you, for example, to +persist the dataset on disk, or persist it in memory but as serialized Java objects (to save space), +or replicate it across nodes, or store the data in off-heap memory in [Tachyon](http://tachyon-project.org/). +These levels are chosen by passing a +[`org.apache.spark.storage.StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel) +object to `persist()`. The `cache()` method is a shorthand for using the default storage level, +which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The complete set of +available storage levels is: @@ -292,8 +306,16 @@ In addition, each RDD can be stored using a different *storage level*, allowing - + + + + + @@ -307,30 +329,59 @@ In addition, each RDD can be stored using a different *storage level*, allowing ### Which Storage Level to Choose? -Spark's storage levels are meant to provide different tradeoffs between memory usage and CPU efficiency. -We recommend going through the following process to select one: - -* If your RDDs fit comfortably with the default storage level (`MEMORY_ONLY`), leave them that way. This is the most - CPU-efficient option, allowing operations on the RDDs to run as fast as possible. -* If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization library](tuning.html) to make the objects - much more space-efficient, but still reasonably fast to access. -* Don't spill to disk unless the functions that computed your datasets are expensive, or they filter a large - amount of the data. Otherwise, recomputing a partition is about as fast as reading it from disk. -* Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web - application). *All* the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones - let you continue running tasks on the RDD without waiting to recompute a lost partition. - -If you want to define your own storage level (say, with replication factor of 3 instead of 2), then use the function factor method `apply()` of the [`StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel$) singleton object. +Spark's storage levels are meant to provide different trade-offs between memory usage and CPU +efficiency. It allows uses to choose memory, disk, or Tachyon for storing data. We recommend going +through the following process to select one: + +* If your RDDs fit comfortably with the default storage level (`MEMORY_ONLY`), leave them that way. + This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible. + +* If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization library](tuning.html) to +make the objects much more space-efficient, but still reasonably fast to access. You can also use +`OFF_HEAP` mode to store the data off the heap in [Tachyon](http://tachyon-project.org/). This will +significantly reduce JVM GC overhead. + +* Don't spill to disk unless the functions that computed your datasets are expensive, or they filter +a large amount of the data. Otherwise, recomputing a partition is about as fast as reading it from +disk. + +* Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve +requests from a web application). *All* the storage levels provide full fault tolerance by +recomputing lost data, but the replicated ones let you continue running tasks on the RDD without +waiting to recompute a lost partition. + +If you want to define your own storage level (say, with replication factor of 3 instead of 2), then +use the function factor method `apply()` of the +[`StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel$) singleton object. + +Spark has a block manager inside the Executors that let you chose memory, disk, or off-heap. The +latter is for storing RDDs off-heap outside the Executor JVM on top of the memory management system +[Tachyon](http://tachyon-project.org/). This mode has the following advantages: + +* Cached data will not be lost if individual executors crash. +* Executors can have a smaller memory footprint, allowing you to run more executors on the same +machine as the bulk of the memory will be inside Tachyon. +* Reduced GC overhead since data is stored in Tachyon. # Shared Variables -Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of *shared variables* for two common usage patterns: broadcast variables and accumulators. +Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a +remote cluster node, it works on separate copies of all the variables used in the function. These +variables are copied to each machine, and no updates to the variables on the remote machine are +propagated back to the driver program. Supporting general, read-write shared variables across tasks +would be inefficient. However, Spark does provide two limited types of *shared variables* for two +common usage patterns: broadcast variables and accumulators. ## Broadcast Variables -Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost. +Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather +than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a +large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables +using efficient broadcast algorithms to reduce communication cost. -Broadcast variables are created from a variable `v` by calling `SparkContext.broadcast(v)`. The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `value` method. The interpreter session below shows this: +Broadcast variables are created from a variable `v` by calling `SparkContext.broadcast(v)`. The +broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `value` +method. The interpreter session below shows this: {% highlight scala %} scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) @@ -340,13 +391,21 @@ scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3) {% endhighlight %} -After the broadcast variable is created, it should be used instead of the value `v` in any functions run on the cluster so that `v` is not shipped to the nodes more than once. In addition, the object `v` should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later). +After the broadcast variable is created, it should be used instead of the value `v` in any functions +run on the cluster so that `v` is not shipped to the nodes more than once. In addition, the object +`v` should not be modified after it is broadcast in order to ensure that all nodes get the same +value of the broadcast variable (e.g. if the variable is shipped to a new node later). ## Accumulators -Accumulators are variables that are only "added" to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric value types and standard mutable collections, and programmers can add support for new types. +Accumulators are variables that are only "added" to through an associative operation and can +therefore be efficiently supported in parallel. They can be used to implement counters (as in +MapReduce) or sums. Spark natively supports accumulators of numeric value types and standard mutable +collections, and programmers can add support for new types. -An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks running on the cluster can then add to it using the `+=` operator. However, they cannot read its value. Only the driver program can read the accumulator's value, using its `value` method. +An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks +running on the cluster can then add to it using the `+=` operator. However, they cannot read its +value. Only the driver program can read the accumulator's value, using its `value` method. The interpreter session below shows an accumulator being used to add up the elements of an array: diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala index e5a09ecec006f..d3babc3ed12c8 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala @@ -18,8 +18,8 @@ package org.apache.spark.examples import scala.math.random + import org.apache.spark._ -import SparkContext._ /** Computes an approximation to pi */ object SparkPi { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala new file mode 100644 index 0000000000000..53b303d658386 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples + +import java.util.Random +import scala.math.exp +import org.apache.spark.util.Vector +import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.scheduler.InputFormatInfo +import org.apache.spark.storage.StorageLevel + +/** + * Logistic regression based classification. + * This example uses Tachyon to persist rdds during computation. + */ +object SparkTachyonHdfsLR { + val D = 10 // Numer of dimensions + val rand = new Random(42) + + case class DataPoint(x: Vector, y: Double) + + def parsePoint(line: String): DataPoint = { + val tok = new java.util.StringTokenizer(line, " ") + var y = tok.nextToken.toDouble + var x = new Array[Double](D) + var i = 0 + while (i < D) { + x(i) = tok.nextToken.toDouble; i += 1 + } + DataPoint(new Vector(x), y) + } + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: SparkTachyonHdfsLR ") + System.exit(1) + } + val inputPath = args(1) + val conf = SparkHadoopUtil.get.newConfiguration() + val sc = new SparkContext(args(0), "SparkTachyonHdfsLR", + System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(), + InputFormatInfo.computePreferredLocations( + Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath)) + )) + val lines = sc.textFile(inputPath) + val points = lines.map(parsePoint _).persist(StorageLevel.OFF_HEAP) + val ITERATIONS = args(2).toInt + + // Initialize w to a random value + var w = Vector(D, _ => 2 * rand.nextDouble - 1) + println("Initial w: " + w) + + for (i <- 1 to ITERATIONS) { + println("On iteration " + i) + val gradient = points.map { p => + (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x + }.reduce(_ + _) + w -= gradient + } + + println("Final w: " + w) + System.exit(0) + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala new file mode 100644 index 0000000000000..ce78f0876ed7c --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples + +import scala.math.random + +import org.apache.spark._ +import org.apache.spark.storage.StorageLevel + +/** + * Computes an approximation to pi + * This example uses Tachyon to persist rdds during computation. + */ +object SparkTachyonPi { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: SparkTachyonPi []") + System.exit(1) + } + val spark = new SparkContext(args(0), "SparkTachyonPi", + System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) + + val slices = if (args.length > 1) args(1).toInt else 2 + val n = 100000 * slices + + val rdd = spark.parallelize(1 to n, slices) + rdd.persist(StorageLevel.OFF_HEAP) + val count = rdd.map { i => + val x = random * 2 - 1 + val y = random * 2 - 1 + if (x * x + y * y < 1) 1 else 0 + }.reduce(_ + _) + println("Pi is roughly " + 4.0 * count / n) + + spark.stop() + } +} diff --git a/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala index abcc1f04d4279..62329bde84481 100644 --- a/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala @@ -33,20 +33,20 @@ object HiveFromSpark { val hiveContext = new LocalHiveContext(sc) import hiveContext._ - sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") - sql("LOAD DATA LOCAL INPATH 'src/main/resources/kv1.txt' INTO TABLE src") + hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") + hql("LOAD DATA LOCAL INPATH 'src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL println("Result of 'SELECT *': ") - sql("SELECT * FROM src").collect.foreach(println) + hql("SELECT * FROM src").collect.foreach(println) // Aggregation queries are also supported. - val count = sql("SELECT COUNT(*) FROM src").collect().head.getInt(0) + val count = hql("SELECT COUNT(*) FROM src").collect().head.getInt(0) println(s"COUNT(*): $count") // The results of SQL queries are themselves RDDs and support all normal RDD functions. The // items in the RDD are of type Row, which allows you to access each column by ordinal. - val rddFromSql = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") + val rddFromSql = hql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") println("Result of RDD.map:") val rddAsStrings = rddFromSql.map { @@ -59,6 +59,6 @@ object HiveFromSpark { // Queries can then join RDD data with data stored in Hive. println("Result of SELECT *:") - sql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println) + hql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println) } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index c5c697e8e2427..843a874fbfdb0 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -30,7 +30,7 @@ import scala.collection.JavaConversions._ // import com.jsuereth.pgp.sbtplugin.PgpKeys._ object SparkBuild extends Build { - val SPARK_VERSION = "1.0.0-SNAPSHOT" + val SPARK_VERSION = "1.0.0-SNAPSHOT" // Hadoop version to build against. For example, "1.0.4" for Apache releases, or // "2.0.0-mr1-cdh4.2.0" for Cloudera Hadoop. Note that these variables can be set @@ -185,15 +185,14 @@ object SparkBuild extends Build { concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), resolvers ++= Seq( - // HTTPS is unavailable for Maven Central "Maven Repository" at "http://repo.maven.apache.org/maven2", "Apache Repository" at "https://repository.apache.org/content/repositories/releases", "JBoss Repository" at "https://repository.jboss.org/nexus/content/repositories/releases/", "MQTT Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/", - "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/", + "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/", // For Sonatype publishing - //"sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", - //"sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/", + // "sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", + // "sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/", // also check the local Maven repository ~/.m2 Resolver.mavenLocal ), @@ -280,13 +279,18 @@ object SparkBuild extends Build { val slf4jVersion = "1.7.5" val excludeNetty = ExclusionRule(organization = "org.jboss.netty") + val excludeEclipseJetty = ExclusionRule(organization = "org.eclipse.jetty") val excludeAsm = ExclusionRule(organization = "org.ow2.asm") val excludeOldAsm = ExclusionRule(organization = "asm") val excludeCommonsLogging = ExclusionRule(organization = "commons-logging") val excludeSLF4J = ExclusionRule(organization = "org.slf4j") val excludeScalap = ExclusionRule(organization = "org.scala-lang", artifact = "scalap") + val excludeHadoop = ExclusionRule(organization = "org.apache.hadoop") + val excludeCurator = ExclusionRule(organization = "org.apache.curator") + val excludePowermock = ExclusionRule(organization = "org.powermock") - def sparkPreviousArtifact(id: String, organization: String = "org.apache.spark", + + def sparkPreviousArtifact(id: String, organization: String = "org.apache.spark", version: String = "0.9.0-incubating", crossVersion: String = "2.10"): Option[sbt.ModuleID] = { val fullId = if (crossVersion.isEmpty) id else id + "_" + crossVersion Some(organization % fullId % version) // the artifact to compare binary compatibility with @@ -323,6 +327,7 @@ object SparkBuild extends Build { "com.codahale.metrics" % "metrics-graphite" % "3.0.0", "com.twitter" %% "chill" % "0.3.1" excludeAll(excludeAsm), "com.twitter" % "chill-java" % "0.3.1" excludeAll(excludeAsm), + "org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock), "com.clearspring.analytics" % "stream" % "2.5.1" ), libraryDependencies ++= maybeAvro diff --git a/python/pyspark/context.py b/python/pyspark/context.py index ff1023bbfa539..d8667e84fedff 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -423,8 +423,11 @@ def _getJavaStorageLevel(self, storageLevel): raise Exception("storageLevel must be of type pyspark.StorageLevel") newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel - return newStorageLevel(storageLevel.useDisk, storageLevel.useMemory, - storageLevel.deserialized, storageLevel.replication) + return newStorageLevel(storageLevel.useDisk, + storageLevel.useMemory, + storageLevel.useOffHeap, + storageLevel.deserialized, + storageLevel.replication) def setJobGroup(self, groupId, description): """ diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 9943296b927dc..fb27863e07f55 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1302,11 +1302,12 @@ def getStorageLevel(self): Get the RDD's current storage level. >>> rdd1 = sc.parallelize([1,2]) >>> rdd1.getStorageLevel() - StorageLevel(False, False, False, 1) + StorageLevel(False, False, False, False, 1) """ java_storage_level = self._jrdd.getStorageLevel() storage_level = StorageLevel(java_storage_level.useDisk(), java_storage_level.useMemory(), + java_storage_level.useOffHeap(), java_storage_level.deserialized(), java_storage_level.replication()) return storage_level diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py index c3e3a44e8e7ab..7b6660eab231b 100644 --- a/python/pyspark/storagelevel.py +++ b/python/pyspark/storagelevel.py @@ -25,23 +25,25 @@ class StorageLevel: Also contains static constants for some commonly used storage levels, such as MEMORY_ONLY. """ - def __init__(self, useDisk, useMemory, deserialized, replication = 1): + def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication = 1): self.useDisk = useDisk self.useMemory = useMemory + self.useOffHeap = useOffHeap self.deserialized = deserialized self.replication = replication def __repr__(self): - return "StorageLevel(%s, %s, %s, %s)" % ( - self.useDisk, self.useMemory, self.deserialized, self.replication) + return "StorageLevel(%s, %s, %s, %s, %s)" % ( + self.useDisk, self.useMemory, self.useOffHeap, self.deserialized, self.replication) -StorageLevel.DISK_ONLY = StorageLevel(True, False, False) -StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, 2) -StorageLevel.MEMORY_ONLY = StorageLevel(False, True, True) -StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, True, 2) -StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False) -StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, 2) -StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, True) -StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, True, 2) -StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False) -StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, 2) +StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False) +StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2) +StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, True) +StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, True, 2) +StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False, False) +StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2) +StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, True) +StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, True, 2) +StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False) +StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2) +StorageLevel.OFF_HEAP = StorageLevel(False, False, True, False, 1) \ No newline at end of file diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index ff8eaacded4c8..f66a667c0a942 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -67,14 +67,13 @@ class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) { class HiveContext(sc: SparkContext) extends SQLContext(sc) { self => - override def parseSql(sql: String): LogicalPlan = HiveQl.parseSql(sql) - override def executePlan(plan: LogicalPlan): this.QueryExecution = + override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution { val logical = plan } /** * Executes a query expressed in HiveQL using Spark, returning the result as a SchemaRDD. */ - def hql(hqlQuery: String): SchemaRDD = { + def hiveql(hqlQuery: String): SchemaRDD = { val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery)) // We force query optimization to happen right away instead of letting it happen lazily like // when using the query DSL. This is so DDL commands behave as expected. This is only @@ -83,6 +82,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { result } + /** An alias for `hiveql`. */ + def hql(hqlQuery: String): SchemaRDD = hiveql(hqlQuery) + // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. @transient protected val outputBuffer = new java.io.OutputStream { @@ -120,7 +122,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { /* A catalyst metadata catalog that points to the Hive Metastore. */ @transient - override lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog { + override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog { override def lookupRelation( databaseName: Option[String], tableName: String, @@ -132,7 +134,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { /* An analyzer that uses the Hive metastore. */ @transient - override lazy val analyzer = new Analyzer(catalog, HiveFunctionRegistry, caseSensitive = false) + override protected[sql] lazy val analyzer = + new Analyzer(catalog, HiveFunctionRegistry, caseSensitive = false) /** * Runs the specified SQL query using Hive. @@ -214,14 +217,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } @transient - override val planner = hivePlanner + override protected[sql] val planner = hivePlanner @transient protected lazy val emptyResult = sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1) /** Extends QueryExecution with hive specific features. */ - abstract class QueryExecution extends super.QueryExecution { + protected[sql] abstract class QueryExecution extends super.QueryExecution { // TODO: Create mixin for the analyzer instead of overriding things here. override lazy val optimizedPlan = optimizer(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 0a6bea0162430..2fea9702954d7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -110,10 +110,10 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { val describedTable = "DESCRIBE (\\w+)".r - class SqlQueryExecution(sql: String) extends this.QueryExecution { - lazy val logical = HiveQl.parseSql(sql) - def hiveExec() = runSqlHive(sql) - override def toString = sql + "\n" + super.toString + protected[hive] class HiveQLQueryExecution(hql: String) extends this.QueryExecution { + lazy val logical = HiveQl.parseSql(hql) + def hiveExec() = runSqlHive(hql) + override def toString = hql + "\n" + super.toString } /** @@ -140,8 +140,8 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { case class TestTable(name: String, commands: (()=>Unit)*) - implicit class SqlCmd(sql: String) { - def cmd = () => new SqlQueryExecution(sql).stringResult(): Unit + protected[hive] implicit class SqlCmd(sql: String) { + def cmd = () => new HiveQLQueryExecution(sql).stringResult(): Unit } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 18654b308d234..3cc4562a88d66 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -125,7 +125,7 @@ abstract class HiveComparisonTest } protected def prepareAnswer( - hiveQuery: TestHive.type#SqlQueryExecution, + hiveQuery: TestHive.type#HiveQLQueryExecution, answer: Seq[String]): Seq[String] = { val orderedAnswer = hiveQuery.logical match { // Clean out non-deterministic time schema info. @@ -227,7 +227,7 @@ abstract class HiveComparisonTest try { // MINOR HACK: You must run a query before calling reset the first time. - TestHive.sql("SHOW TABLES") + TestHive.hql("SHOW TABLES") if (reset) { TestHive.reset() } val hiveCacheFiles = queryList.zipWithIndex.map { @@ -256,7 +256,7 @@ abstract class HiveComparisonTest hiveCachedResults } else { - val hiveQueries = queryList.map(new TestHive.SqlQueryExecution(_)) + val hiveQueries = queryList.map(new TestHive.HiveQLQueryExecution(_)) // Make sure we can at least parse everything before attempting hive execution. hiveQueries.foreach(_.logical) val computedResults = (queryList.zipWithIndex, hiveQueries, hiveCacheFiles).zipped.map { @@ -302,7 +302,7 @@ abstract class HiveComparisonTest // Run w/ catalyst val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) => - val query = new TestHive.SqlQueryExecution(queryString) + val query = new TestHive.HiveQLQueryExecution(queryString) try { (query, prepareAnswer(query, query.stringResult())) } catch { case e: Exception => val errorMessage = @@ -359,7 +359,7 @@ abstract class HiveComparisonTest // When we encounter an error we check to see if the environment is still okay by running a simple query. // If this fails then we halt testing since something must have gone seriously wrong. try { - new TestHive.SqlQueryExecution("SELECT key FROM src").stringResult() + new TestHive.HiveQLQueryExecution("SELECT key FROM src").stringResult() TestHive.runSqlHive("SELECT key FROM src") } catch { case e: Exception => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index c184ebe288af4..0c27498a93507 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -23,6 +23,16 @@ import org.apache.spark.sql.hive.TestHive._ * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution. */ class HiveQuerySuite extends HiveComparisonTest { + + test("Query expressed in SQL") { + assert(sql("SELECT 1").collect() === Array(Seq(1))) + } + + test("Query expressed in HiveQL") { + hql("FROM src SELECT key").collect() + hiveql("FROM src SELECT key").collect() + } + createQueryTest("Simple Average", "SELECT AVG(key) FROM src") @@ -133,7 +143,7 @@ class HiveQuerySuite extends HiveComparisonTest { "SELECT * FROM src LATERAL VIEW explode(map(key+3,key+4)) D as k, v") test("sampling") { - sql("SELECT * FROM src TABLESAMPLE(0.1 PERCENT) s") + hql("SELECT * FROM src TABLESAMPLE(0.1 PERCENT) s") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala index 40c4e23f90fb8..8883e5b16d4da 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala @@ -56,7 +56,7 @@ class HiveResolutionSuite extends HiveComparisonTest { TestHive.sparkContext.parallelize(Data(1, 2, Nested(1,2)) :: Nil) .registerAsTable("caseSensitivityTest") - sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest") + hql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest") } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index 1318ac1968dad..d9ccb93e23923 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -136,7 +136,7 @@ class PruningSuite extends HiveComparisonTest { expectedScannedColumns: Seq[String], expectedPartValues: Seq[Seq[String]]) = { test(s"$testCaseName - pruning test") { - val plan = new TestHive.SqlQueryExecution(sql).executedPlan + val plan = new TestHive.HiveQLQueryExecution(sql).executedPlan val actualOutputColumns = plan.output.map(_.name) val (actualScannedColumns, actualPartValues) = plan.collect { case p @ HiveTableScan(columns, relation, _) => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala index 314ca48ad8f6a..aade62eb8f84e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala @@ -57,34 +57,34 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft } test("SELECT on Parquet table") { - val rdd = sql("SELECT * FROM testsource").collect() + val rdd = hql("SELECT * FROM testsource").collect() assert(rdd != null) assert(rdd.forall(_.size == 6)) } test("Simple column projection + filter on Parquet table") { - val rdd = sql("SELECT myboolean, mylong FROM testsource WHERE myboolean=true").collect() + val rdd = hql("SELECT myboolean, mylong FROM testsource WHERE myboolean=true").collect() assert(rdd.size === 5, "Filter returned incorrect number of rows") assert(rdd.forall(_.getBoolean(0)), "Filter returned incorrect Boolean field value") } test("Converting Hive to Parquet Table via saveAsParquetFile") { - sql("SELECT * FROM src").saveAsParquetFile(dirname.getAbsolutePath) + hql("SELECT * FROM src").saveAsParquetFile(dirname.getAbsolutePath) parquetFile(dirname.getAbsolutePath).registerAsTable("ptable") - val rddOne = sql("SELECT * FROM src").collect().sortBy(_.getInt(0)) - val rddTwo = sql("SELECT * from ptable").collect().sortBy(_.getInt(0)) + val rddOne = hql("SELECT * FROM src").collect().sortBy(_.getInt(0)) + val rddTwo = hql("SELECT * from ptable").collect().sortBy(_.getInt(0)) compareRDDs(rddOne, rddTwo, "src (Hive)", Seq("key:Int", "value:String")) } test("INSERT OVERWRITE TABLE Parquet table") { - sql("SELECT * FROM testsource").saveAsParquetFile(dirname.getAbsolutePath) + hql("SELECT * FROM testsource").saveAsParquetFile(dirname.getAbsolutePath) parquetFile(dirname.getAbsolutePath).registerAsTable("ptable") // let's do three overwrites for good measure - sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() - sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() - sql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() - val rddCopy = sql("SELECT * FROM ptable").collect() - val rddOrig = sql("SELECT * FROM testsource").collect() + hql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() + hql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() + hql("INSERT OVERWRITE TABLE ptable SELECT * FROM testsource").collect() + val rddCopy = hql("SELECT * FROM ptable").collect() + val rddOrig = hql("SELECT * FROM testsource").collect() assert(rddCopy.size === rddOrig.size, "INSERT OVERWRITE changed size of table??") compareRDDs(rddOrig, rddCopy, "testsource", ParquetTestData.testSchemaFieldNames) } @@ -93,13 +93,13 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType)) .registerAsTable("tmp") val rddCopy = - sql("INSERT INTO TABLE tmp SELECT * FROM src") + hql("INSERT INTO TABLE tmp SELECT * FROM src") .collect() .sortBy[Int](_.apply(0) match { case x: Int => x case _ => 0 }) - val rddOrig = sql("SELECT * FROM src") + val rddOrig = hql("SELECT * FROM src") .collect() .sortBy(_.getInt(0)) compareRDDs(rddOrig, rddCopy, "src (Hive)", Seq("key:Int", "value:String")) @@ -108,22 +108,22 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft test("Appending to Parquet table") { createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType)) .registerAsTable("tmpnew") - sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect() - sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect() - sql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect() - val rddCopies = sql("SELECT * FROM tmpnew").collect() - val rddOrig = sql("SELECT * FROM src").collect() + hql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect() + hql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect() + hql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect() + val rddCopies = hql("SELECT * FROM tmpnew").collect() + val rddOrig = hql("SELECT * FROM src").collect() assert(rddCopies.size === 3 * rddOrig.size, "number of copied rows via INSERT INTO did not match correct number") } test("Appending to and then overwriting Parquet table") { createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType)) .registerAsTable("tmp") - sql("INSERT INTO TABLE tmp SELECT * FROM src").collect() - sql("INSERT INTO TABLE tmp SELECT * FROM src").collect() - sql("INSERT OVERWRITE TABLE tmp SELECT * FROM src").collect() - val rddCopies = sql("SELECT * FROM tmp").collect() - val rddOrig = sql("SELECT * FROM src").collect() + hql("INSERT INTO TABLE tmp SELECT * FROM src").collect() + hql("INSERT INTO TABLE tmp SELECT * FROM src").collect() + hql("INSERT OVERWRITE TABLE tmp SELECT * FROM src").collect() + val rddCopies = hql("SELECT * FROM tmp").collect() + val rddOrig = hql("SELECT * FROM src").collect() assert(rddCopies.size === rddOrig.size, "INSERT OVERWRITE did not actually overwrite") }
Storage LevelMeaning
MEMORY_AND_DISK_SER Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them - on the fly each time they're needed. Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of + recomputing them on the fly each time they're needed.
OFF_HEAP Store RDD in a serialized format in Tachyon. + This is generally more space-efficient than deserialized objects, especially when using a + fast serializer, but more CPU-intensive to read. + This also significantly reduces the overheads of GC. +
DISK_ONLY